hadoop生态系统学习之路(二)如何编写MR以及运行测试

释放双眼,带上耳机,听听看~!

最近一直太忙,都没时间写博客了。首先是平时需要带我的一个哥们,他底子比我稍弱,于是我便从mybatis、spring、springMVC、html、css、js、jquery一个一个的教他,在教的过程中笔者也发现了很多之前自己没有弄明白的问题,所以说想把一样东西学好并不容易。另外笔者也参与了公司的大数据项目,学会怎么写一个MR,以及hdfs、hbase、hive、impala、zookeeper的基本使用,今天就与大家分享一下MR的编写,之后的博文中再与大家一一进行分享。当然,大数据相关的东西实在太多了,也不可能都会使用,并且用得很深,所以笔者也会再接再厉。同时,由于周末笔者还要学驾照,所以真是身心疲惫,但是也是对自己的锻炼。
好了,不说废话了,直入正题。
首先,笔者给大家介绍一下这个MR的大致业务:其实,就是一个etl过程,对数据进行抽取、转换以及加载到目的端,这里目的端,既可以是hdfs,然后交给下一个MR进行处理,也可以是hbase数据仓库,还可以是hive或者imapla的数据库,这里面hive和impala的数据还可以进行同步。这个MR是从ftp上拉取文件,直接存到hdfs,然后经过MR将数据存到hdfs中,提供给另一个MR进行处理。为了介绍简单,这里笔者将从ftp上拉取数据的过程改为直接从hdfs上读取。关于如果从ftp上拉取文件直接存到hdfs,后面的博文笔者再进行介绍。
好了,笔者将分以下几步进行讲解:

一、文件以及maven环境准备

这里,笔者使用的maven依赖,所有hadoop相关的包通过dependency依赖,pom.xml如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
1<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2  <modelVersion>4.0.0</modelVersion>
3  <groupId>org.qiyongkang</groupId>
4  <artifactId>mr-demo</artifactId>
5  <version>0.0.1-SNAPSHOT</version>
6  <name>mr-demo</name>
7  <description>mr-demo</description>
8  <packaging>jar</packaging>
9
10  <repositories>
11      <!-- 注意,这里使用cloudera公司的maven仓库 -->
12      <repository>
13        <id>cloudera</id>
14        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
15      </repository>  
16  </repositories>
17
18  <properties>
19    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
20    <!-- hadoop版本 -->
21    <hadoop.version>2.3.0-cdh5.0.0</hadoop.version>
22    <!-- hbase版本 -->
23    <hbase.version>0.96.1.1-cdh5.0.0</hbase.version>
24    <!-- hive版本 -->
25    <hive.version>0.12.0-cdh5.0.0</hive.version>
26    <!-- junit版本 -->
27    <junit.version>4.8.1</junit.version>
28  </properties>
29
30  <dependencies>
31      <!-- hadoop相关依赖 -->
32      <dependency>
33          <groupId>org.apache.hadoop</groupId>
34          <artifactId>hadoop-mapreduce-client-core</artifactId>
35          <version>${hadoop.version}</version>
36          <exclusions>
37              <exclusion>
38                  <artifactId>jdk.tools</artifactId>
39                  <groupId>jdk.tools</groupId>
40              </exclusion>
41          </exclusions>
42      </dependency>
43
44      <dependency>
45          <groupId>org.apache.hadoop</groupId>
46          <artifactId>hadoop-common</artifactId>
47          <version>${hadoop.version}</version>
48      </dependency>
49
50      <dependency>
51          <groupId>org.apache.hadoop</groupId>
52          <artifactId>hadoop-hdfs</artifactId>
53          <version>${hadoop.version}</version>
54      </dependency>
55
56      <dependency>
57          <groupId>org.apache.hadoop</groupId>
58          <artifactId>hadoop-client</artifactId>
59          <version>${hadoop.version}</version>
60          <exclusions>
61              <exclusion>
62                  <artifactId>mockito-all</artifactId>
63                  <groupId>org.mockito</groupId>
64              </exclusion>
65          </exclusions>
66      </dependency>
67
68      <!-- MRUnit相关依赖 -->
69      <dependency>
70    <groupId>org.apache.mrunit</groupId>
71    <artifactId>mrunit</artifactId>
72    <version>0.9.0-incubating</version>
73    <classifier>hadoop2</classifier>
74</dependency>
75
76<!-- junit依赖 -->
77   <dependency>
78     <groupId>junit</groupId>
79     <artifactId>junit</artifactId>
80     <version>${junit.version}</version>
81     <scope>test</scope>
82   </dependency>
83  </dependencies>
84
85  <build>
86    <!-- 这是一个打可执行jar的插件,没有将依赖打进去,执行package命令即可 -->
87    <plugins>
88      <plugin>
89       <groupId>org.apache.maven.plugins</groupId>
90       <artifactId>maven-jar-plugin</artifactId>
91       <version>2.4</version>
92       <configuration>
93         <archive>
94            <manifest>
95              <addClasspath>false</addClasspath>
96              <classpathPrefix>lib/</classpathPrefix>
97              <mainClass>org.qiyongkang.mr.parsetofivele.ParseDataToFileElementMR</mainClass>
98            </manifest>
99          </archive>
100       </configuration>
101      </plugin>
102
103      <!-- 此插件用于将依赖jar全部打到一个jar包里面去,以免在hadoop运行环境添加依赖包 -->
104      <plugin>
105          <groupId>org.apache.maven.plugins</groupId>
106          <artifactId>maven-assembly-plugin</artifactId>
107          <version>2.3</version>
108          <configuration>
109              <descriptorRefs>
110                  <descriptorRef>jar-with-dependencies</descriptorRef>
111              </descriptorRefs>
112              <archive>
113                <manifest>
114                    <addClasspath>false</addClasspath>
115                    <mainClass>org.qiyongkang.mr.parsetofivele.ParseDataToFileElementMR</mainClass>
116                </manifest>
117              </archive>
118          </configuration>
119          <executions>
120              <execution>
121                  <id>make-assembly</id>
122                  <phase>package</phase>
123                  <goals>
124                      <goal>assembly</goal>
125                  </goals>
126              </execution>
127          </executions>
128      </plugin>
129
130      <!-- 拷贝依赖包 -->
131      <plugin>
132          <groupId>org.apache.maven.plugins</groupId>
133          <artifactId>maven-dependency-plugin</artifactId>
134          <executions>
135              <execution>
136                  <id>copy-dependencies</id>
137                  <phase>package</phase>
138                  <goals>
139                      <goal>copy-dependencies</goal>
140                  </goals>
141                  <configuration>
142                      <outputDirectory>${project.build.directory}/lib</outputDirectory>
143                      <overWriteReleases>false</overWriteReleases>
144                      <overWriteSnapshots>false</overWriteSnapshots>
145                      <overWriteIfNewer>true</overWriteIfNewer>
146                  </configuration>
147              </execution>
148          </executions>
149      </plugin>
150    </plugins>
151  </build>
152</project>
153

然后,我们准备一份文件,格式如下:


1
2
3
4
5
6
7
8
1202.102.224.68|53|61.158.148.103|17872|22640|p.tencentmind.com|A|A_125.39.213.86|20160308100839.993|0|r
2202.102.224.68|53|61.158.152.97|20366|27048|api.k.sohu.com|A|A_123.126.104.116;A_123.126.104.119;A_123.126.104.114;A_123.126.104.117;A_123.126.104.118;A_123.126.104.120;A_123.126.104.115;A_123.126.104.122|20160308100839.993|0|r
3115.60.53.151|7582|202.102.224.68|53|33946|cip4.e1977.com|A||20160308100839.993|0|q
4182.119.224.59|14731|202.102.224.68|53|31185|s.jpush.cn|A||20160308100839.993|0|q
5202.102.224.68|53|182.118.77.145|22420|19278|file32.mafengwo.net|A|A_182.118.77.145|20160308100839.993|0|r
6202.102.224.68|53|115.60.14.138|22929|31604|mmbiz.qpic.cn|A|A_42.236.95.35;A_42.236.95.36;A_42.236.95.34;A_182.118.63.200;A_182.118.63.196;A_42.236.95.33;A_42.236.95.37|20160308100839.993|0|r
7115.60.109.162|3760|202.102.224.68|53|8920|a.root-servers.net|A||20160308100839.993|0|q
8

每一行以|分隔,然后r或者q结尾,这里我们的MR只会取r结尾的数据,并且只会取此行的某几列数据,然后以其中三行为key进行计数,作为reducer的输入,最后将结果写入到hdfs,这样便可极大的祛除无效数据,减小文件大小。
这里,笔者准备了一个1.9大小.txt文件,如:
上面的jar就是后面我们要在yarn上执行的包。
然后,执行:


1
2
1su hdfs
2

使用hdfs用户。因为这里笔者使用的生态系统环境就是上一篇博文中使用cm搭建的环境。cm会为hdfs创建一个hdfs用户,所以我们必须使用此用户进行hdfs的相关操作。
执行以下命令,将文件上传到hdfs的/test/input目录:


1
2
1hadoop fs -put testData.txt /test/input
2

执行hadoop fs -ls /test/input可看到上传到hdfs成功:

二、Mapper类编写

Mapper类ParseDataToFileElementMapper:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1public static class ParseDataToFileElementMapper extends Mapper<Object, Text, Text, IntWritable> {
2
3        private static final IntWritable one = new IntWritable(1);
4        private Text mapKey = new Text();
5
6        @Override
7        protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
8                throws IOException, InterruptedException {
9            String[] values = value.toString().split("\\|");
10
11            if ("r".equals(values[10])) {
12
13                mapKey.set(values[5] + "\t" + values[0] + "\t" + values[2]);
14                System.out.println(mapKey.toString());
15                context.write(mapKey, one);
16            }
17        }
18
19    }
20

这里,由于代码不多,笔者将Mapper和Reducer作为内部类,大家可以抽离出来。

三、Reducer类编写

Reducer类ParseDataToFileElementReducer:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1public static class ParseDataToFileElementReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
2        private Text reduceKey = new Text();
3        private IntWritable result = new IntWritable();
4
5        @Override
6        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
7                throws IOException, InterruptedException {
8            //把key相同的统计一下次数
9            //cname + topDomain + cip + dip
10            int sum = 0;
11            for (IntWritable val : values) {
12              sum += val.get();
13            }
14            this.result.set(sum);
15            this.reduceKey.set("1.1-1.1" + "\t" + key.toString());
16
17            context.write(this.reduceKey, this.result);
18        }
19
20    }
21

这里,mapper会将txt数据一行行读取解析,经过shuffle后,会对key进行哈希,然后将相同的key交给一个Reducer,然后reducer对相同key进行计数,写入hdfs。

四、main函数调用MR

主类ParseDataToFileElementMR:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
2        Configuration conf = new Configuration();
3        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
4        if (otherArgs.length != 2) {
5          System.err.println("Usage: ParseDataToFileElementMR <in> <out>");
6          System.exit(2);
7        }
8        Job job = Job.getInstance(conf, "ParseDataToFileElementMR");
9        job.setJarByClass(ParseDataToFileElementMR.class);
10        //Mapper
11        job.setMapperClass(ParseDataToFileElementMapper.class);
12
13        //Combiner
14//        job.setCombinerClass(ParseDataToFileElementReducer.class);
15
16        //Reducer
17        job.setReducerClass(ParseDataToFileElementReducer.class);
18        job.setNumReduceTasks(10);
19
20        job.setOutputKeyClass(Text.class);
21        job.setOutputValueClass(IntWritable.class);
22
23        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
24        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
25
26        //将reduce输出文件压缩.gz
27        FileOutputFormat.setCompressOutput(job, true);  //job使用压缩  
28        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); //设置压缩格式
29
30        System.exit(job.waitForCompletion(true) ? 0 : 1);
31    }
32

这里我们指定reducer个数为1个,并指定输出格式为.gz。

五、编写MRUnit测试

接下来,我们使用MRUnit对MR进行测试,相关的jar依赖在第一步pom文件已给出,直接贴出测试代码,和junit一样执行:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
1package org.qiyongkang.mr.parsetofivele;
2
3import java.util.ArrayList;
4import java.util.List;
5
6import org.apache.hadoop.io.IntWritable;
7import org.apache.hadoop.io.Text;
8import org.apache.hadoop.mrunit.mapreduce.MapDriver;
9import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
10import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
11import org.junit.Before;
12import org.junit.Test;
13import org.qiyongkang.mr.parsetofivele.ParseDataToFileElementMR.ParseDataToFileElementMapper;
14import org.qiyongkang.mr.parsetofivele.ParseDataToFileElementMR.ParseDataToFileElementReducer;
15
16/**
17 * ClassName:ParseDataToFileElementMRTest <br/>
18 * Function: TODO ADD FUNCTION. <br/>
19 * Reason: TODO ADD REASON. <br/>
20 * Date: 2016年3月15日 下午12:04:55 <br/>
21 *
22 * @author qiyongkang
23 * @version
24 * @since JDK 1.6
25 * @see
26 */
27public class ParseDataToFileElementMRTest {
28
29    MapDriver<Object, Text, Text, IntWritable> mapDriver;
30    ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
31    MapReduceDriver<Object, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;
32
33    @Before
34    public void setUp() throws Exception {
35        ParseDataToFileElementMapper mapper = new ParseDataToFileElementMapper();
36        ParseDataToFileElementReducer reducer = new ParseDataToFileElementReducer();
37        mapDriver = MapDriver.newMapDriver(mapper);
38        reduceDriver = ReduceDriver.newReduceDriver(reducer);
39        mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
40    }
41
42    @Test
43    public void testMapper() {
44        mapDriver.withInput(new Object(), new Text(
45                "202.102.224.68|53|115.60.109.162|3760|8920|a.root-servers.net|A|A_198.41.0.4|20160308100839.993|0|r"));
46        mapDriver.withOutput(new Text("a.root-servers.net\t202.102.224.68\t115.60.109.162"), new IntWritable(1));
47        mapDriver.runTest();
48    }
49
50    @Test
51    public void testReducer() {
52        List<IntWritable> values = new ArrayList<IntWritable>();
53        values.add(new IntWritable(1));
54        values.add(new IntWritable(1));
55        reduceDriver.withInput(new Text("a.root-servers.net\t202.102.224.68\t115.60.109.162"), values);
56        reduceDriver.withOutput(new Text("1.1-1.1\ta.root-servers.net\t202.102.224.68\t115.60.109.162"),
57                new IntWritable(2));
58        reduceDriver.runTest();
59    }
60
61    @Test
62    public void testMapReducer() {
63        mapReduceDriver.withInput(new Object(), new Text(
64                "202.102.224.68|53|115.60.109.162|3760|8920|a.root-servers.net|A|A_198.41.0.4|20160308100839.993|0|r"));
65        List<IntWritable> values = new ArrayList<IntWritable>();
66        values.add(new IntWritable(1));
67        mapReduceDriver.withOutput(new Text("1.1-1.1\ta.root-servers.net\t202.102.224.68\t115.60.109.162"), new IntWritable(1));
68        mapReduceDriver.runTest();
69    }
70
71}
72
73

这里我们可以对文件的单行进行测试,因为mapper本来就类似bufferedReader对文件一行行的读取。

六、打包

这里,笔者使用maven提供的插件进行打包,已在pom文件写出。然后,为了不将依赖包拷到hadoop环境,我们采用jar-with-dependencies这种打包方式,笔者对mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar反编译如下:
同时也指定了main函数所在类,大家可以看下pom文件。

七、在yarn上执行(MR2)

MR已写完,下面我们便可以在yarn上执行了。由于hadoop1.x使用的是MR1,而yarn上已经包括了MR2了,关于MR1与MR2的区别,笔者在后面的博文中会进行介绍。
下面开始执行:


1
2
1yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar /test/input /test/output
2

这里,我们的输入文件格式是使用的.txt,其实hdfs还支持压缩格式以及其它的格式,后面再进行介绍。
然后,我们在hdfs上查看下输出目录:
这里由于reducer只指定了一个,所以只有一个输出文件。
我们把此文件get到本地,解压看看:

八、查看运行结果以及日志

这里,我们访问http://massdata8:19888/jobhistory,JobHistory Server的默认端口便可查看MR运行日志:
同时,也可以运行yarn application -list,查看正在运行的job。

好了,关于MR的编写就讲到这儿了,希望给刚学hadoop的童鞋提供点帮助,另外,大家也可以看看hadoop提供的mr example,学会如何写一个基本的mr。

给TA打赏
共{{data.count}}人
人已打赏
安全运维

MongoDB数据建模小案例:多列数据结构

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索