压缩原则
MR支持的压缩编码
压缩方式选择
1. Gzip压缩
2. Bzip2压缩
3. Lzo压缩
4. Snappy压缩
压缩位置选择
压缩参数配置
io.compression.codecs(在core-site.xml中配置)
org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec
输入压缩
Hadoop使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compress(在mapred-site.xml中配置)
false
mapper输出
这个参数设为true启用压缩
mapreduce.map.output.compress.codec(在mapred-site.xml中配置)
org.apache.hadoop.io.compress.DefaultCodec
mapper输出
企业多使用LZO或Snappy编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置)
false
reducer输出
这个参数设为true启用压缩
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置)
org.apache.hadoop.io.compress. DefaultCodec
reducer输出
使用标准工具或者编解码器,如gzip和bzip2
mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置)
RECORD
reducer输出
SequenceFile输出使用的压缩类型:NONE和BLOCK
数据流的压缩和解压缩
JavaAPI操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 1 public class TestCompress {
2
3 public static void main(String[] args) throws Exception {
4 compress("e:/hello.txt","org.apache.hadoop.io.compress.BZip2Codec");
5// decompress("e:/hello.txt.bz2");
6 }
7
8 // 1、压缩
9 private static void compress(String filename, String method) throws Exception {
10
11 // (1)获取输入流
12 FileInputStream fis = new FileInputStream(new File(filename));
13
14 Class codecClass = Class.forName(method);
15
16 CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());
17
18 // (2)获取输出流
19 FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));
20 CompressionOutputStream cos = codec.createOutputStream(fos);
21
22 // (3)流的对拷
23 IOUtils.copyBytes(fis, cos, 1024*1024*5, false);
24
25// (4)关闭资源
26 cos.close();
27 fos.close();
28fis.close();
29 }
30
31 // 2、解压缩
32 private static void decompress(String filename) throws FileNotFoundException, IOException {
33
34 // (0)校验是否能解压缩
35 CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
36
37 CompressionCodec codec = factory.getCodec(new Path(filename));
38
39 if (codec == null) {
40 System.out.println("cannot find codec for file " + filename);
41 return;
42 }
43
44 // (1)获取输入流
45 CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));
46
47 // (2)获取输出流
48 FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded"));
49
50 // (3)流的对拷
51 IOUtils.copyBytes(cis, fos, 1024*1024*5, false);
52
53 // (4)关闭资源
54 cis.close();
55 fos.close();
56 }
57}
58
59
60
Map输出端采用压缩
job设置
1
2
3
4
5
6
7 1 // 开启map端输出压缩
2 configuration.setBoolean("mapreduce.map.output.compress", true);
3 // 设置map端输出压缩方式
4 configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
5
6
7
Reduce输出端采用压缩
job设置
1
2
3
4
5
6
7
8 1 // 设置reduce端输出压缩开启
2 FileOutputFormat.setCompressOutput(job, true);
3
4 // 设置压缩的方式
5 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
6
7
8