hadoop生态系统学习之路(九)MR将结果输出到数据库(DB)

释放双眼,带上耳机,听听看~!

最开始讲MapReduce的时候,我们是指定输出目录,然后把结果直接输出到hdfs上。然后,在介绍hive的简单使用时,我们直接将结果输出到了hive表中。另外,MR还可以将结果输出到数据库以及hbase。
今天,笔者就给大家介绍MR将结果输出到db。
首先,笔者要提及一下之前MR将结果输出到hive表,这里需要注意,只能向某张表中入一次数据,再次执行MR报错:
org.apache.hive.hcatalog.common.HCatException : 2003 : Non-partitioned table already contains data : qyk_test.user_info。因为hive中的表数据实际也是放在hdfs中的。我们都知道hdfs是一次写入,多次读取的。那么,怎么解决这个问题呢?我们可以每次创建一个临时表,然后将MR的数据入到临时表,入完后再把临时表的数据使用insert into table 表名 select * from 临时表名 入到实际表中,入完后再删除临时表。
好了,接下来,笔者分以下几个步骤进行介绍:

一、pom依赖

这里,需要添加一个mysql驱动包依赖:


1
2
3
4
5
6
7
1<!-- mysql驱动包 -->
2<dependency>
3   <groupId>mysql</groupId>
4   <artifactId>mysql-connector-java</artifactId>
5   <version>5.1.36</version>
6 </dependency>
7

二、数据以及数据库表准备

我们还是使用之前博文中入到hive的输入文件user_info.txt,放在hdfs中的/qiyongkang/input目录下:


1
2
3
4
5
6
7
8
9
111  1200.0  qyk1    21
222  1301    qyk2    22
333  1400.0  qyk3    23
444  1500.0  qyk4    24
555  1210.0  qyk5    25
666  124 qyk6    26
777  1233    qyk7    27
888  15011   qyk8    28
9

然后,我们这里使用的是mysql数据库,在test数据库建表:


1
2
3
4
5
6
7
1CREATE TABLE `user_info` (
2  `id` bigint(20) DEFAULT NULL,
3  `account` varchar(50) DEFAULT NULL,
4  `name` varchar(50) DEFAULT NULL,
5  `age` int(11) DEFAULT NULL
6) ENGINE=InnoDB DEFAULT CHARSET=utf8
7

三、MR编写

首先,我们来看主类LoadDataToDbMR:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
1/**
2 * Project Name:mr-demo
3 * File Name:LoadDataToDbMR.java
4 * Package Name:org.qiyongkang.mr.dbstore
5 * Date:2016年4月10日下午3:16:05
6 * Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
7 *
8*/
9
10package org.qiyongkang.mr.dbstore;
11
12import java.io.IOException;
13
14import org.apache.hadoop.conf.Configuration;
15import org.apache.hadoop.fs.Path;
16import org.apache.hadoop.mapreduce.Job;
17import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
18import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
19import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
20
21/**
22 * ClassName:LoadDataToDbMR <br/>
23 * Function: TODO ADD FUNCTION. <br/>
24 * Reason:   TODO ADD REASON. <br/>
25 * Date:     2016年4月10日 下午3:16:05 <br/>
26 * @author   qiyongkang
27 * @version  
28 * @since    JDK 1.6
29 * @see      
30 */
31public class LoadDataToDbMR {
32    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
33        Configuration conf = new Configuration();
34
35        //数据库配置
36        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://192.168.52.31:3306/test","root", "root");
37
38        Job job = Job.getInstance(conf, "db store");
39        job.setJarByClass(LoadDataToDbMR.class);
40
41        // 设置Mapper
42        job.setMapperClass(DbStoreMapper.class);
43
44        // 由于没有reducer,这里设置为0
45        job.setNumReduceTasks(0);
46
47        // 设置输入文件路径
48        FileInputFormat.addInputPath(job, new Path("/qiyongkang/input"));
49
50        DBOutputFormat.setOutput(job, "user_info", "id", "account", "name", "age");
51        job.setOutputFormatClass(DBOutputFormat.class);
52
53        System.exit(job.waitForCompletion(true) ? 0 : 1);
54    }
55}
56
57
58

然后,我们再来看Mapper:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
1/**
2 * Project Name:mr-demo
3 * File Name:DbStoreMapper.java
4 * Package Name:org.qiyongkang.mr.dbstore
5 * Date:2016年4月10日下午3:15:46
6 * Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
7 *
8*/
9
10package org.qiyongkang.mr.dbstore;
11
12import java.io.IOException;
13
14import org.apache.hadoop.io.LongWritable;
15import org.apache.hadoop.io.Text;
16import org.apache.hadoop.mapreduce.Mapper;
17
18/**
19 * ClassName:DbStoreMapper <br/>
20 * Function: TODO ADD FUNCTION. <br/>
21 * Reason: TODO ADD REASON. <br/>
22 * Date: 2016年4月10日 下午3:15:46 <br/>
23 *
24 * @author qiyongkang
25 * @version
26 * @since JDK 1.6
27 * @see
28 */
29public class DbStoreMapper extends Mapper<LongWritable, Text, UserInfoDBWritable, UserInfoDBWritable> {
30    private UserInfo userInfo = new UserInfo();
31
32    private UserInfoDBWritable userInfoDBWritable = null;
33
34    @Override
35    protected void map(LongWritable key, Text value,
36            Mapper<LongWritable, Text, UserInfoDBWritable, UserInfoDBWritable>.Context context)
37                    throws IOException, InterruptedException {
38
39        // 每行以制表符分隔 id, account, name, age
40        String[] strs = value.toString().split("\t");
41
42        // id,
43        userInfo.setId(Long.valueOf(strs[0]));
44
45        // account
46        userInfo.setAccount(strs[1]);
47
48        // name
49        userInfo.setName(strs[2]);
50
51        // age
52        userInfo.setAge(Integer.valueOf(strs[3]));
53
54        // 写入到db,放在key
55        userInfoDBWritable = new UserInfoDBWritable(userInfo);
56        context.write(userInfoDBWritable , null);
57    }
58
59}
60
61

这里,我们准备了一个Model,UserInfo:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
1/**
2 * Project Name:mr-demo
3 * File Name:UserInfo.java
4 * Package Name:org.qiyongkang.mr.dbstore
5 * Date:2016年4月10日下午3:30:01
6 * Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
7 *
8*/
9
10package org.qiyongkang.mr.dbstore;
11/**
12 * ClassName:UserInfo <br/>
13 * Function: TODO ADD FUNCTION. <br/>
14 * Reason:   TODO ADD REASON. <br/>
15 * Date:     2016年4月10日 下午3:30:01 <br/>
16 * @author   qiyongkang
17 * @version  
18 * @since    JDK 1.6
19 * @see      
20 */
21public class UserInfo {
22    private long id;
23
24    private String account;
25
26    private String name;
27
28    private int age;
29
30    public long getId() {
31        return id;
32    }
33
34    public void setId(long id) {
35        this.id = id;
36    }
37
38    public String getAccount() {
39        return account;
40    }
41
42    public void setAccount(String account) {
43        this.account = account;
44    }
45
46    public String getName() {
47        return name;
48    }
49
50    public void setName(String name) {
51        this.name = name;
52    }
53
54    public int getAge() {
55        return age;
56    }
57
58    public void setAge(int age) {
59        this.age = age;
60    }
61
62}
63
64
65

然后,我们要想MR输出到Db,那么此类必须实现DBWritable,如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
1/**
2 * Project Name:mr-demo
3 * File Name:UserInfoDBWritable.java
4 * Package Name:org.qiyongkang.mr.dbstore
5 * Date:2016年4月10日下午3:27:32
6 * Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
7 *
8*/
9
10package org.qiyongkang.mr.dbstore;
11
12import java.sql.PreparedStatement;
13import java.sql.ResultSet;
14import java.sql.SQLException;
15
16import org.apache.hadoop.mapreduce.lib.db.DBWritable;
17
18/**
19 * ClassName:UserInfoDBWritable <br/>
20 * Function: TODO ADD FUNCTION. <br/>
21 * Reason:   TODO ADD REASON. <br/>
22 * Date:     2016年4月10日 下午3:27:32 <br/>
23 * @author   qiyongkang
24 * @version  
25 * @since    JDK 1.6
26 * @see      
27 */
28public class UserInfoDBWritable implements DBWritable {
29    private UserInfo userInfo;
30
31    public UserInfoDBWritable() {}
32
33    public UserInfoDBWritable(UserInfo userInfo) {
34        this.userInfo = userInfo;
35    }
36
37    @Override
38    public void write(PreparedStatement statement) throws SQLException {
39        statement.setLong(1, userInfo.getId());
40        statement.setString(2, userInfo.getAccount());
41        statement.setString(3, userInfo.getName());
42        statement.setInt(4, userInfo.getAge());
43    }
44
45    @Override
46    public void readFields(ResultSet resultSet) throws SQLException {
47
48    }
49
50}
51
52
53

这里面的参数设置顺序与主类中设置DBOutputFormat时的字段顺序一致。

四、执行并查看结果

下面,还是同样的打包方式,只需修改下main函数所在的类即可。然后,上传到主节点,使用hdfs用户执行,注意此jar的权限设置。
接下来,执行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,日志如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
1bash-4.1$ yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
216/04/10 16:14:11 INFO client.RMProxy: Connecting to ResourceManager at massdata8/172.31.25.8:8032
316/04/10 16:14:12 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
416/04/10 16:14:13 INFO input.FileInputFormat: Total input paths to process : 1
516/04/10 16:14:13 INFO mapreduce.JobSubmitter: number of splits:1
616/04/10 16:14:14 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1458262657013_0982
716/04/10 16:14:14 INFO impl.YarnClientImpl: Submitted application application_1458262657013_0982
816/04/10 16:14:14 INFO mapreduce.Job: The url to track the job: http://massdata8:8088/proxy/application_1458262657013_0982/
916/04/10 16:14:14 INFO mapreduce.Job: Running job: job_1458262657013_0982
1016/04/10 16:14:21 INFO mapreduce.Job: Job job_1458262657013_0982 running in uber mode : false
1116/04/10 16:14:21 INFO mapreduce.Job:  map 0% reduce 0%
1216/04/10 16:14:29 INFO mapreduce.Job:  map 100% reduce 0%
1316/04/10 16:14:29 INFO mapreduce.Job: Job job_1458262657013_0982 completed successfully
1416/04/10 16:14:29 INFO mapreduce.Job: Counters: 30
15    File System Counters
16        FILE: Number of bytes read=0
17        FILE: Number of bytes written=91506
18        FILE: Number of read operations=0
19        FILE: Number of large read operations=0
20        FILE: Number of write operations=0
21        HDFS: Number of bytes read=259
22        HDFS: Number of bytes written=0
23        HDFS: Number of read operations=2
24        HDFS: Number of large read operations=0
25        HDFS: Number of write operations=0
26    Job Counters
27        Launched map tasks=1
28        Data-local map tasks=1
29        Total time spent by all maps in occupied slots (ms)=4459
30        Total time spent by all reduces in occupied slots (ms)=0
31        Total time spent by all map tasks (ms)=4459
32        Total vcore-seconds taken by all map tasks=4459
33        Total megabyte-seconds taken by all map tasks=4566016
34    Map-Reduce Framework
35        Map input records=8
36        Map output records=8
37        Input split bytes=117
38        Spilled Records=0
39        Failed Shuffles=0
40        Merged Map outputs=0
41        GC time elapsed (ms)=21
42        CPU time spent (ms)=1530
43        Physical memory (bytes) snapshot=321511424
44        Virtual memory (bytes) snapshot=1579036672
45        Total committed heap usage (bytes)=792199168
46    File Input Format Counters
47        Bytes Read=142
48    File Output Format Counters
49        Bytes Written=0
50

然后,我们在数据库执行查询SELECT * FROM user_info;可以看到:
说明入库成功!
好了,就介绍到这儿了。

给TA打赏
共{{data.count}}人
人已打赏
安全运维

MongoDB数据建模小案例:物联网时序数据库建模

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索