一 在HIVE中创建ETL数据库
->create database etl;
二 在工程目录下新建MysqlToHive.py 和conf文件夹
在conf文件夹下新建如下文件,最后的工程目录如下图
三 源码
Import.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1<?xml version="1.0" encoding="UTF-8"?>
2<root>
3 <importtype>
4 <value>add</value> <!-- 增量导入或者全导入 -->
5 </importtype>
6
7 <task type="all">
8 <table>user_all</table> <!-- 数据库中需要增量导入的第一张表名 -->
9 <table>oder_all</table> <!-- 数据库中需要增量导入的第一张表名 -->
10 </task>
11
12 <task type="add">
13 <table>user_add</table> <!-- 数据库中需要增量导入的第一张表名 -->
14 <table>oder_add</table> <!-- 数据库中需要增量导入的第一张表名 -->
15 </task>
16
17</root>
18
oder_add.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1<?xml version="1.0" encoding="UTF-8"?>
2
3<root>
4 <sqoop-shell type="import">
5 <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
6 <param key="username">root</param> <!-- 数据库用户名 -->
7 <param key="password">123456</param> <!-- 数据库密码 -->
8 <param key="table">oderinfo</param><!-- 数据库中待导出的表名 -->
9 <param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
10 <param key="hive-partition-key">dt</param> <!-- 通过时间分区 -->
11 <param key="hive-partition-value">$dt</param>
12 <param key="hive-import"></param>
13 <param key="check-column">crt_time</param> <!-- 增量导入检查的列 -->
14 <param key="incremental">lastmodified</param> <!-- 按照时间簇来进行增量导入 -->
15 <param key="last-value">23:59:59</param> <!-- 增量导入时间划分点 -->
16 <param key="num-mappers">1</param> <!-- 使用map任务个数 -->
17 <param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理 -->
18 </sqoop-shell>
19</root>
20
oder_all.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1<?xml version="1.0" encoding="UTF-8"?>
2
3<root>
4 <sqoop-shell type="import">
5 <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
6 <param key="username">root</param><!-- 数据库用户名 -->
7 <param key="password">123456</param><!-- 数据库密码 -->
8 <param key="table">oderinfo</param><!-- 数据库中待导出的表名 -->
9 <param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
10 <param key="hive-partition-key">dt</param> <!-- 通过时间分区 -->
11 <param key="hive-partition-value">$dt</param>
12 <param key="hive-import"></param>
13 <param key="create-hive-table"></param> <!-- 在hive中新建一张同名同结构的表 -->
14 <param key="hive-overwrite"></param> <!-- 覆盖原来以存在的表 -->
15 <param key="num-mappers">1</param> <!-- 使用map任务个数 -->
16 <param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理 -->
17 </sqoop-shell>
18</root>
19
user_add.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1<?xml version="1.0" encoding="UTF-8"?>
2
3<root>
4 <sqoop-shell type="import">
5 <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
6 <param key="username">root</param> <!-- 数据库用户名 -->
7 <param key="password">123456</param> <!-- 数据库密码 -->
8 <param key="table">userinfo</param><!-- 数据库中待导出的表名 -->
9 <param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
10 <param key="hive-partition-key">dt</param> <!-- 通过时间分区 -->
11 <param key="hive-partition-value">$dt</param>
12 <param key="hive-import"></param>
13 <param key="check-column">crt_time</param> <!-- 增量导入检查的列 -->
14 <param key="incremental">lastmodified</param> <!-- 按照时间簇来进行增量导入 -->
15 <param key="last-value">23:59:59</param> <!-- 增量导入时间划分点 -->
16 <param key="num-mappers">1</param> <!-- 使用map任务个数 -->
17 <param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理 -->
18 </sqoop-shell>
19</root>
20
user_all.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1<?xml version="1.0" encoding="UTF-8"?>
2
3<root>
4 <sqoop-shell type="import">
5 <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
6 <param key="username">root</param><!-- 数据库用户名 -->
7 <param key="password">123456</param><!-- 数据库密码 -->
8 <param key="table">userinfo</param><!-- 数据库中待导出的表名 -->
9 <param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
10 <param key="hive-partition-key">dt</param> <!-- 通过时间分区 -->
11 <param key="hive-partition-value">$dt</param>
12 <param key="hive-import"></param>
13 <param key="create-hive-table"></param> <!-- 在hive中新建一张同名同结构的表 -->
14 <param key="hive-overwrite"></param> <!-- 覆盖原来以存在的表 -->
15 <param key="num-mappers">1</param> <!-- 使用map任务个数 -->
16 <param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理 -->
17 </sqoop-shell>
18</root>
19
MysqlToHive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 1# _*_ coding:UTF-8 _*_
2'''
3Created on 2016��12��1��
4
5@author: duking
6'''
7import datetime
8import os
9import xml.etree.ElementTree as ET
10import collections
11
12#获取昨天时间
13def getYesterday():
14 today=datetime.date.today()
15 oneday=datetime.timedelta(days=1)
16 yesterday=today-oneday
17 return yesterday
18
19
20def Resolve_Conf(dt):
21
22 #获取当前工程目录
23 PROJECT_DIR = os.getcwd()
24 #获得配置文件名
25 conf_file = PROJECT_DIR + "\conf\Import.xml"
26 #解析配置文件
27 xml_tree = ET.parse(conf_file)
28
29 #提取出本次导入的类型 全导入或者增量导入 通过配置import.xml中的plan标签的value值设定
30 import_types = xml_tree.findall('./importtype')
31 for import_type in import_types:
32 aim_types = import_type.findall('./value')
33 for i in range(len(aim_types)):
34 aim_type = aim_types[i].text
35
36 #获得task元素
37 tasks = xml_tree.findall('./task')
38
39 #用来保存待执行的sqoop命令的集合
40 cmds = []
41
42 for task in tasks:
43 #获得导入类型,增量导入或者全量导入
44 import_type = task.attrib["type"]
45
46 #如果task的标签导入类型与设定类型不同,结束本次循环
47 if(import_type != aim_type):
48 continue
49
50 #获得表名集合
51 tables = task.findall('./table')
52
53 #迭代表名集合,解析表配置文件
54 for i in range(len(tables)):
55 #表名
56 table_name = tables[i].text
57 #表配置文件名
58 table_conf_file = PROJECT_DIR + "\conf\\" + table_name + ".xml"
59
60 #解析表配置文件
61 xmlTree = ET.parse(table_conf_file)
62
63 #获取sqoop-shell 节点
64 sqoopNodes = xmlTree.findall("./sqoop-shell")
65 #获取sqoop 命令类型
66 sqoop_cmd_type = sqoopNodes[0].attrib["type"]
67
68 #首先组装成sqoop命令头
69 command = "sqoop " + sqoop_cmd_type
70
71 #获取
72 praNodes = sqoopNodes[0].findall("./param")
73
74 #用来保存param的信息的有序字典
75 cmap = collections.OrderedDict()
76 #将所有param中的key-value存入字典中
77 for i in range(len(praNodes)):
78 #获取key的属性值
79 key = praNodes[i].attrib["key"]
80 #获取param标签中的值
81 value = praNodes[i].text
82 #保存到字典中
83 cmap[key] = value
84
85 #迭代字典将param的信息拼装成字符串
86 for key in cmap:
87
88 value = cmap[key]
89
90 #如果不是键值对形式的命令 或者值为空,跳出此次循环
91 if(value == None or value == "" or value == " "):
92 value = ""
93
94 if(key == "hive-partition-value"):
95 value = value.replace('$dt',str(dt))
96 #合成前一天的时间
97 if(key == "last-value"):
98 value = '"' + str(dt) + " " + value + '"'
99
100 #拼装为命令
101 command += " --" + key + " " + value + " "
102
103 #将命令加入至待执行的命令集合
104 cmds.append(command)
105
106 return cmds
107
108
109#python 模块的入口:main函数
110if __name__ == '__main__':
111
112 dt = getYesterday();
113
114 #解析配置文件,生成相应的HQL语句
115 cmds = Resolve_Conf(dt)
116
117 #迭代集合,执行命令
118 for i in range(len(cmds)):
119 cmd = cmds[i]
120 print cmd
121 #执行导入过秤
122 os.system(cmd)
123