hadoop项目实战–ETL–(三)实现mysql表到HIVE表的全量导入与增量导入

释放双眼,带上耳机,听听看~!

一 在HIVE中创建ETL数据库

->create database etl;

 

二 在工程目录下新建MysqlToHive.py 和conf文件夹

在conf文件夹下新建如下文件,最后的工程目录如下图

hadoop项目实战--ETL--(三)实现mysql表到HIVE表的全量导入与增量导入

 

三 源码

Import.xml


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1<?xml version="1.0" encoding="UTF-8"?>
2<root>
3   <importtype>
4       <value>add</value>    <!-- 增量导入或者全导入 -->
5   </importtype>
6
7   <task type="all">
8       <table>user_all</table> <!-- 数据库中需要增量导入的第一张表名 -->
9       <table>oder_all</table> <!-- 数据库中需要增量导入的第一张表名 -->
10  </task>
11 
12  <task type="add">
13      <table>user_add</table> <!-- 数据库中需要增量导入的第一张表名 -->
14      <table>oder_add</table> <!-- 数据库中需要增量导入的第一张表名 -->
15  </task>
16 
17</root>
18

oder_add.xml


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1<?xml version="1.0" encoding="UTF-8"?>
2
3<root>
4   <sqoop-shell type="import">
5       <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
6       <param key="username">root</param> <!-- 数据库用户名 -->
7       <param key="password">123456</param> <!-- 数据库密码 -->
8       <param key="table">oderinfo</param><!-- 数据库中待导出的表名 -->
9       <param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
10      <param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
11      <param key="hive-partition-value">$dt</param>
12      <param key="hive-import"></param>
13      <param key="check-column">crt_time</param> <!-- 增量导入检查的列 -->
14      <param key="incremental">lastmodified</param> <!-- 按照时间簇来进行增量导入 -->
15      <param key="last-value">23:59:59</param> <!-- 增量导入时间划分点 -->
16      <param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
17      <param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
18  </sqoop-shell>
19</root>
20

oder_all.xml


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1<?xml version="1.0" encoding="UTF-8"?>
2
3<root>
4   <sqoop-shell type="import">
5       <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param>   <!-- 数据库连接地址 -->
6       <param key="username">root</param><!-- 数据库用户名 -->
7       <param key="password">123456</param><!-- 数据库密码 -->
8       <param key="table">oderinfo</param><!-- 数据库中待导出的表名 -->
9       <param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
10      <param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
11      <param key="hive-partition-value">$dt</param>
12      <param key="hive-import"></param>                  
13      <param key="create-hive-table"></param>   <!-- 在hive中新建一张同名同结构的表 -->
14      <param key="hive-overwrite"></param> <!-- 覆盖原来以存在的表 -->
15      <param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
16      <param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
17  </sqoop-shell>
18</root>
19

user_add.xml


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1<?xml version="1.0" encoding="UTF-8"?>
2
3<root>
4   <sqoop-shell type="import">
5       <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
6       <param key="username">root</param> <!-- 数据库用户名 -->
7       <param key="password">123456</param> <!-- 数据库密码 -->
8       <param key="table">userinfo</param><!-- 数据库中待导出的表名 -->
9       <param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
10      <param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
11      <param key="hive-partition-value">$dt</param>
12      <param key="hive-import"></param>
13      <param key="check-column">crt_time</param> <!-- 增量导入检查的列 -->
14      <param key="incremental">lastmodified</param> <!-- 按照时间簇来进行增量导入 -->
15      <param key="last-value">23:59:59</param> <!-- 增量导入时间划分点 -->
16      <param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
17      <param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
18  </sqoop-shell>
19</root>
20

user_all.xml


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1<?xml version="1.0" encoding="UTF-8"?>
2
3<root>
4   <sqoop-shell type="import">
5       <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param>   <!-- 数据库连接地址 -->
6       <param key="username">root</param><!-- 数据库用户名 -->
7       <param key="password">123456</param><!-- 数据库密码 -->
8       <param key="table">userinfo</param><!-- 数据库中待导出的表名 -->
9       <param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
10      <param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
11      <param key="hive-partition-value">$dt</param>
12      <param key="hive-import"></param>                  
13      <param key="create-hive-table"></param>   <!-- 在hive中新建一张同名同结构的表 -->
14      <param key="hive-overwrite"></param> <!-- 覆盖原来以存在的表 -->
15      <param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
16      <param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
17  </sqoop-shell>
18</root>
19

MysqlToHive.py


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
1# _*_ coding:UTF-8 _*_
2'''
3Created on 2016��12��1��
4
5@author: duking
6'''
7import datetime
8import os
9import xml.etree.ElementTree as ET
10import collections
11
12#获取昨天时间
13def getYesterday():
14    today=datetime.date.today()
15    oneday=datetime.timedelta(days=1)
16    yesterday=today-oneday  
17    return yesterday
18
19
20def Resolve_Conf(dt):
21    
22    #获取当前工程目录
23    PROJECT_DIR = os.getcwd()
24    #获得配置文件名
25    conf_file = PROJECT_DIR + "\conf\Import.xml"
26    #解析配置文件
27    xml_tree = ET.parse(conf_file)
28    
29    #提取出本次导入的类型  全导入或者增量导入  通过配置import.xml中的plan标签的value值设定
30    import_types = xml_tree.findall('./importtype')
31    for import_type in import_types:
32        aim_types = import_type.findall('./value')
33        for i in range(len(aim_types)):
34            aim_type = aim_types[i].text
35            
36    #获得task元素
37    tasks = xml_tree.findall('./task')
38    
39    #用来保存待执行的sqoop命令的集合
40    cmds = []
41    
42    for task in tasks:
43        #获得导入类型,增量导入或者全量导入
44        import_type = task.attrib["type"]
45        
46        #如果task的标签导入类型与设定类型不同,结束本次循环
47        if(import_type != aim_type):
48            continue
49
50        #获得表名集合
51        tables = task.findall('./table')
52        
53        #迭代表名集合,解析表配置文件
54        for i in range(len(tables)):
55            #表名
56            table_name = tables[i].text
57            #表配置文件名
58            table_conf_file = PROJECT_DIR + "\conf\\" + table_name + ".xml"
59            
60            #解析表配置文件
61            xmlTree = ET.parse(table_conf_file)
62            
63            #获取sqoop-shell 节点
64            sqoopNodes = xmlTree.findall("./sqoop-shell")
65            #获取sqoop 命令类型
66            sqoop_cmd_type = sqoopNodes[0].attrib["type"]
67            
68            #首先组装成sqoop命令头
69            command = "sqoop " + sqoop_cmd_type
70                
71            #获取
72            praNodes = sqoopNodes[0].findall("./param")
73            
74            #用来保存param的信息的有序字典
75            cmap = collections.OrderedDict()            
76            #将所有param中的key-value存入字典中
77            for i in range(len(praNodes)):
78                #获取key的属性值
79                key = praNodes[i].attrib["key"]
80                #获取param标签中的值
81                value = praNodes[i].text
82                #保存到字典中
83                cmap[key] = value
84            
85            #迭代字典将param的信息拼装成字符串
86            for key in cmap:
87                  
88                value = cmap[key]
89                
90                #如果不是键值对形式的命令 或者值为空,跳出此次循环
91                if(value == None or value == "" or value == " "):
92                    value = ""
93                    
94                if(key == "hive-partition-value"):
95                    value = value.replace('$dt',str(dt))  
96                #合成前一天的时间
97                if(key == "last-value"):
98                    value = '"' + str(dt) + " " + value + '"'
99                        
100                #拼装为命令
101                command += " --" + key + " " + value + " "
102                    
103            #将命令加入至待执行的命令集合
104            cmds.append(command)
105        
106    return cmds  
107        
108
109#python 模块的入口:main函数
110if __name__ == '__main__':
111    
112    dt = getYesterday();
113    
114    #解析配置文件,生成相应的HQL语句
115    cmds = Resolve_Conf(dt)
116    
117    #迭代集合,执行命令
118    for i in range(len(cmds)):
119        cmd = cmds[i]
120        print cmd
121        #执行导入过秤
122        os.system(cmd)
123

给TA打赏
共{{data.count}}人
人已打赏
安全运维

基于spring boot和mongodb打造一套完整的权限架构(二)【MAVEN依赖以及相应配置】

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索