基于Hadoop生态圈的数据仓库实践 —— 进阶技术(五)

释放双眼,带上耳机,听听看~!

五、快照
前面实验说明了处理维度的扩展。本节讨论两种事实表的扩展技术。
有些用户,尤其是管理者,经常要看某个特定时间点的数据。也就是说,他们需要数据的快照。周期快照和累积快照是两种常用的事实表扩展技术。
周期快照是在一个给定的时间对事实表进行一段时期的总计。例如,一个月销售订单周期快照汇总每个月底时总的销售订单金额。
累积快照用于跟踪事实表的变化。例如,数据仓库可能需要累积(存储)销售订单从下订单的时间开始,到订单中的商品被打包、运输和到达的各阶段的时间点数据来跟踪订单生命周期的进展情况。用户可能要取得在某个给定时间点,销售订单处理状态的累积快照。
下面说明周期快照和累积快照的细节问题。
1. 周期快照
下面以销售订单的月底汇总为例说明如何实现一个周期快照。
首先需要添加一个新的事实表。下图中的模式显示了一个名为month_end_sales_order_fact的新事实表。
该表中有两个度量值,month_order_amount和month_order_quantity。这两个值是不能加到sales_order_fact表中的,原因是,sales_order_fact表和新的度量值有不同的时间属性(数据的粒度不同)。sales_order_fact表包含的是每天一条记录。新的度量值要的是每月的数据。使用下面的脚本建立month_end_sales_order_fact表。


1
2
3
4
5
6
7
8
9
10
11
1USE dw;  
2  
3CREATE TABLE month_end_sales_order_fact (  
4    order_month_sk INT COMMENT 'order month surrogate key',
5    product_sk INT COMMENT 'product surrogate key',
6    month_order_amount DECIMAL(10,2) COMMENT 'month order amount',
7    month_order_quantity INT COMMENT 'month order quantity'
8)
9CLUSTERED BY (order_month_sk) INTO 8 BUCKETS      
10STORED AS ORC TBLPROPERTIES ('transactional'='true');
11

1
2
1        建立了month_end_sales_order_fact表后,现在需要向表中装载数据。月底销售订单事实表的数据源是已有的销售订单事实表。month_sum.sql文件用于装载月底销售订单事实表,该文件内容如下。  
2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1-- 设置变量以支持事务    
2set hive.support.concurrency=true;    
3set hive.exec.dynamic.partition.mode=nonstrict;    
4set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;    
5set hive.compactor.initiator.on=true;    
6set hive.compactor.worker.threads=1;  
7
8USE dw;  
9
10SET hivevar:pre_month_date = add_months(current_date,-1);  
11
12delete from month_end_sales_order_fact
13 where month_end_sales_order_fact.order_month_sk in
14 (select month_sk
15    from month_dim
16   where month = month(${hivevar:pre_month_date})
17     and year = year(${hivevar:pre_month_date}));
18
19insert into month_end_sales_order_fact
20select b.month_sk, a.product_sk, sum(order_amount), sum(order_quantity)
21  from sales_order_fact a,
22       month_dim b,
23       order_date_dim d  -- 视图
24 where a.order_date_sk = d.order_date_sk
25   and b.month = d.month
26   and b.year = d.year
27   and b.month = month(${hivevar:pre_month_date})
28   and b.year = year(${hivevar:pre_month_date})
29 group by b.month_sk, a.product_sk ;
30

1
2
1        每个月第一天,在每天销售订单定期装载执行完后,执行此脚本,装载上个月的销售订单数据。为此需要修改Oozie的工作流定义。  
2

(1)修改工作流作业配置文件
修改后的workflow.xml文件内容如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
1<?xml version="1.0" encoding="UTF-8"?>
2<workflow-app xmlns="uri:oozie:workflow:0.1" name="regular_etl">
3    <start to="fork-node"/>
4    <fork name="fork-node">
5        <path start="sqoop-customer" />
6        <path start="sqoop-product" />
7        <path start="sqoop-sales_order" />
8    </fork>
9
10    <action name="sqoop-customer">
11        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
12            <job-tracker>${jobTracker}</job-tracker>
13            <name-node>${nameNode}</name-node>
14            <arg>import</arg>
15            <arg>--connect</arg>
16            <arg>jdbc:mysql://cdh1:3306/source?useSSL=false</arg>
17            <arg>--username</arg>
18            <arg>root</arg>
19            <arg>--password</arg>
20            <arg>mypassword</arg>
21            <arg>--table</arg>
22            <arg>customer</arg>
23            <arg>--hive-import</arg>
24            <arg>--hive-table</arg>
25            <arg>rds.customer</arg>
26            <arg>--hive-overwrite</arg>            
27            <file>/tmp/hive-site.xml#hive-site.xml</file>
28            <archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
29        </sqoop>
30        <ok to="joining"/>
31        <error to="fail"/>
32    </action>
33  <action name="sqoop-product">
34        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
35            <job-tracker>${jobTracker}</job-tracker>
36            <name-node>${nameNode}</name-node>
37            <arg>import</arg>
38            <arg>--connect</arg>
39            <arg>jdbc:mysql://cdh1:3306/source?useSSL=false</arg>
40            <arg>--username</arg>
41            <arg>root</arg>
42            <arg>--password</arg>
43            <arg>mypassword</arg>
44            <arg>--table</arg>
45            <arg>product</arg>
46            <arg>--hive-import</arg>
47            <arg>--hive-table</arg>
48            <arg>rds.product</arg>
49            <arg>--hive-overwrite</arg>            
50            <file>/tmp/hive-site.xml#hive-site.xml</file>
51            <archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
52        </sqoop>
53        <ok to="joining"/>
54        <error to="fail"/>
55    </action>
56    <action name="sqoop-sales_order">
57        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
58            <job-tracker>${jobTracker}</job-tracker>
59            <name-node>${nameNode}</name-node>
60            <command>job --exec myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop</command>
61            <file>/tmp/hive-site.xml#hive-site.xml</file>
62            <archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
63        </sqoop>
64        <ok to="joining"/>
65        <error to="fail"/>
66    </action>
67
68    <join name="joining" to="hive-node"/>
69
70    <action name="hive-node">
71        <hive xmlns="uri:oozie:hive-action:0.2">
72            <job-tracker>${jobTracker}</job-tracker>
73            <name-node>${nameNode}</name-node>
74            <job-xml>/tmp/hive-site.xml</job-xml>
75            <script>/tmp/regular_etl.sql</script>
76        </hive>
77        <ok to="decision-node"/>
78        <error to="fail"/>
79    </action>
80
81    <decision name="decision-node">
82       <switch>
83         <case to="month-sum">
84             ${date eq 20}
85         </case>
86         <default to="end"/>
87       </switch>
88    </decision>
89
90    <action name="month-sum">
91        <hive xmlns="uri:oozie:hive-action:0.2">
92            <job-tracker>${jobTracker}</job-tracker>
93            <name-node>${nameNode}</name-node>
94            <job-xml>/tmp/hive-site.xml</job-xml>
95            <script>/tmp/month_sum.sql</script>
96        </hive>
97        <ok to="end"/>
98        <error to="fail"/>
99    </action>
100
101    <kill name="fail">
102        <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
103    </kill>
104    <end name="end"/>
105</workflow-app>
106

1
2
1        在该文件中增加了一个decision-node,当date参数的值等于20时,执行month_sum.sql文件后结束,否则直接结束。之所以这里是20是为了测试。month_sum.sql文件中用的是add_months(current_date,-1)取上个月的年月,因此不必要非得1号执行,任何一天都可以。这个工作流保证了每月汇总只有在每天汇总执行完后才执行,并且每月只执行一次。工作流DAG如下图所示。  
2

(2)修改协调作业配置文件
修改后的coordinator.xml文件内容如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1<coordinator-app name="regular_etl-coord" frequency="${coord:days(1)}" start="${start}" end="${end}" timezone="${timezone}" xmlns="uri
2:oozie:coordinator:0.1">
3    <action>
4        <workflow>
5            <app-path>${workflowAppUri}</app-path>
6            <configuration>
7                <property>
8                    <name>jobTracker</name>
9                    <value>${jobTracker}</value>
10                </property>
11                <property>
12                    <name>nameNode</name>
13                    <value>${nameNode}</value>
14                </property>
15                <property>
16                    <name>queueName</name>
17                    <value>${queueName}</value>
18                </property>
19                <property>
20                    <name>date</name>
21                    <value>${date}</value>
22                </property>
23            </configuration>
24        </workflow>
25    </action>
26</coordinator-app>
27

1
2
1        在该文件中增加了一个date属性,用于向workflow.xml文件传递date参数的值。  
2

(3)修改协调作业属性文件
修改后的job-coord.properties文件内容如下:


1
2
3
4
5
6
7
8
9
10
1nameNode=hdfs://cdh2:8020
2jobTracker=cdh2:8032
3queueName=default
4oozie.use.system.libpath=true
5oozie.coord.application.path=${nameNode}/user/${user.name}
6timezone=UTC
7start=2016-07-20T01:30Z
8end=2020-12-31T01:30Z
9workflowAppUri=${nameNode}/user/${user.name}
10

1
2
1        该文件中只修改了start和end属性的值以用于测试。  
2

(4)部署工作流和协调作业


1
2
3
4
5
6
7
1hdfs dfs -put -f coordinator.xml /user/root/  
2hdfs dfs -put -f /root/workflow.xml /user/root/  
3hdfs dfs -put -f /etc/hive/conf.cloudera.hive/hive-site.xml /tmp/  
4hdfs dfs -put -f /root/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar /tmp/  
5hdfs dfs -put -f /root/regular_etl.sql /tmp/
6hdfs dfs -put -f /root/month_sum.sql /tmp/
7

1
2
1(5)运行协调作业进行测试  
2

1
2
1oozie job -oozie http://cdh2:11000/oozie -config /root/job-coord.properties -run -D date=`date +"%d"`
2

1
2
1        通过命令行的-D参数传递date属性的值,date为当前日期数,执行时是20号。  
2

        到了9点半工作流开始运行,等执行完全成功后,month_end_sales_order_fact表中有了2016年6月销售订单汇总的两条数据,如下图所示。
order_month_sk的值为198,使用下面的查询可以确认对应的年月是2016年6月。


1
2
1select year,month from month_dim where month_sk=198;
2

1
2
1**2. 累积快照**  
2

        本小节说明如何在销售订单上实现累积快照。
该累加快照跟踪五个销售订单的里程碑:下订单、分配库房、打包、配送和收货。这五个里程碑的日期及其各自的数量来自源数据库的销售订单表。一个订单完整的生命周期由五行描述:下订单的时间一行,订单商品被分配到库房的时间一行,产品打包的时间一行,订单配送的时间一行,订单客户收货的时间一行。每个里程碑各有一个状态:N为新订单,A为已分配库房,P为已打包,S为已配送,R为已收货。源数据库的sales_order表结构必须做相应的改变,以处理五种不同的状态。
(1)修改数据库模式
执行下面的脚本修改数据库模式。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
1-- MySQL
2USE source;  
3-- 修改销售订单事务表  
4ALTER TABLE sales_order  
5  CHANGE order_date status_date datetime  
6, ADD order_status VARCHAR(1) AFTER status_date  
7, CHANGE order_quantity quantity INT;  
8
9-- 删除sales_order表的主键  
10alter table sales_order change order_number order_number int not null;  
11alter table sales_order drop primary key;  
12
13-- 建立新的主键
14alter table sales_order add id int unsigned not null auto_increment primary key comment '主键' first;
15
16-- Hive
17-- rds.sales_order并没有增加id列,原因有两个:
18-- 1. 该列只作为增量检查列,不用存储
19-- 2. 不用再重新导入所有数据
20use rds;
21alter table sales_order
22change order_date status_date timestamp comment 'status date';
23alter table sales_order
24change order_quantity quantity int comment 'quantity';
25alter table sales_order
26add columns (order_status varchar(1) comment 'order status');
27
28USE dw;  
29-- 事实表增加八列
30alter table sales_order_fact rename to sales_order_fact_old;
31create table sales_order_fact
32(
33  order_sk int comment 'order surrogate key',
34  customer_sk int comment 'customer surrogate key',
35  product_sk int comment 'product surrogate key',
36  order_date_sk int comment 'order date surrogate key',
37  allocate_date_sk int comment 'allocate date surrogate key',
38  allocate_quantity int comment 'allocate quantity',
39  packing_date_sk int comment 'packing date surrogate key',
40  packing_quantity int comment 'packing quantity',
41  ship_date_sk int comment 'ship date surrogate key',
42  ship_quantity int comment 'ship quantity',
43  receive_date_sk int comment 'receive date surrogate key',
44  receive_quantity int comment 'receive quantity',
45  request_delivery_date_sk int comment 'request delivery date surrogate key',
46  order_amount decimal(10,2) comment 'order amount',
47  order_quantity int comment 'order quantity'
48)
49CLUSTERED BY (order_sk) INTO 8 BUCKETS  
50STORED AS ORC TBLPROPERTIES ('transactional'='true');
51insert into sales_order_fact
52select order_sk,
53       customer_sk,
54       product_sk,
55       order_date_sk,
56       null,
57       null,
58       null,
59       null,
60       null,
61       null,
62       null,
63       null,
64       request_delivery_date_sk,
65       order_amount,
66       order_quantity
67  from sales_order_fact_old;
68drop table sales_order_fact_old;
69  
70-- 建立四个日期维度视图  
71CREATE VIEW allocate_date_dim (allocate_date_sk, allocate_date, month, month_name, quarter, year, promo_ind)
72AS
73SELECT date_sk,
74       date,
75       month,
76       month_name,
77       quarter,
78       year,
79       promo_ind
80  FROM date_dim ;
81  
82CREATE VIEW packing_date_dim (packing_date_sk, packing_date, month, month_name, quarter, year, promo_ind)
83AS  
84SELECT date_sk,
85       date,
86       month,
87       month_name,
88       quarter,
89       year,
90       promo_ind
91  FROM date_dim ;
92  
93CREATE VIEW ship_date_dim (ship_date_sk, ship_date, month, month_name, quarter, year, promo_ind)
94AS  
95SELECT date_sk,
96       date,
97       month,
98       month_name,
99       quarter,
100       year,
101       promo_ind
102  FROM date_dim ;
103  
104CREATE VIEW receive_date_dim (receive_date_sk, receive_date, month, month_name, quarter, year, promo_ind)
105AS  
106SELECT date_sk,
107       date,
108       month,
109       month_name,
110       quarter,
111       year,
112       promo_ind
113  FROM date_dim ;
114

1
2
1        修改后的数据仓库模式如下图所示。  
2

         对源数据库的修改如下:把order_date列改名为status_date,添加了名为order_status的列,并把order_quantity列改名为quantity。正如名字所表示的,order_status列用于存储N,A,P,S或R之一。它描述了status_date列对应的状态值。如果一条记录的状态为N,则status_date列是下订单的日期。如果状态是R,status_date列是收货日期。对数据仓库的修改如下:给现有的sales_order_fact表添加四个数量和四个日期代理键,要加的新列是allocate_date_sk、allocate_quantity、packing_date_sk、packing_quantity、ship_date_sk、ship_quantity、receive_date_sk、receive_quantity。还要在日期维度上使用数据库视图角色扮演生成四个新的日期代理键。
(2)重建Sqoop作业
使用下面的脚本重建Sqoop作业,因为源表会有多个相同的order_number,所以不能再用它作为检查字段,将检查字段改为id


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1last_value=`sqoop job --show myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop | grep incremental.last.value | awk '{print $3}'`
2sqoop job --delete myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop  
3sqoop job \
4--meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop \
5--create myjob_incremental_import \
6-- \
7import \
8--connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" \
9--table sales_order \
10--columns "order_number, customer_number, product_code, status_date, entry_date, order_amount, quantity, request_delivery_date, order_status" \
11--hive-import \
12--hive-table rds.sales_order \
13--incremental append \
14--check-column id \
15--last-value $last_value
16

1
2
1(3)修改定期装载regular_etl.sql文件  
2

需要依据数据库模式修改定期装载的HiveQL脚本,修改后的脚本如下所示。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
1-- 设置变量以支持事务    
2set hive.support.concurrency=true;    
3set hive.exec.dynamic.partition.mode=nonstrict;    
4set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;    
5set hive.compactor.initiator.on=true;    
6set hive.compactor.worker.threads=1;    
7    
8USE dw;    
9      
10-- 设置SCD的生效时间和过期时间    
11SET hivevar:cur_date = CURRENT_DATE();    
12SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);    
13SET hivevar:max_date = CAST('2200-01-01' AS DATE);    
14      
15-- 设置CDC的上限时间    
16INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;    
17    
18-- 装载customer维度    
19-- 设置已删除记录和地址相关列上SCD2的过期,用<=>运算符处理NULL值。    
20UPDATE customer_dim    
21   SET expiry_date = ${hivevar:pre_date}      
22 WHERE customer_dim.customer_sk IN      
23(SELECT a.customer_sk    
24   FROM (SELECT customer_sk,    
25                customer_number,    
26                customer_street_address,    
27                customer_zip_code,    
28                customer_city,    
29                customer_state,    
30                shipping_address,    
31                shipping_zip_code,    
32                shipping_city,    
33                shipping_state    
34           FROM customer_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN    
35                rds.customer b ON a.customer_number = b.customer_number    
36          WHERE b.customer_number IS NULL OR    
37          (  !(a.customer_street_address <=> b.customer_street_address)    
38          OR !(a.customer_zip_code <=> b.customer_zip_code)    
39          OR !(a.customer_city <=> b.customer_city)    
40          OR !(a.customer_state <=> b.customer_state)    
41          OR !(a.shipping_address <=> b.shipping_address)    
42          OR !(a.shipping_zip_code <=> b.shipping_zip_code)    
43          OR !(a.shipping_city <=> b.shipping_city)    
44          OR !(a.shipping_state <=> b.shipping_state)    
45          ));    
46    
47-- 处理customer_street_addresses列上SCD2的新增行      
48INSERT INTO customer_dim    
49SELECT    
50    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,    
51    t1.customer_number,    
52    t1.customer_name,    
53    t1.customer_street_address,    
54    t1.customer_zip_code,    
55    t1.customer_city,    
56    t1.customer_state,    
57    t1.shipping_address,    
58    t1.shipping_zip_code,    
59    t1.shipping_city,    
60    t1.shipping_state,    
61    t1.version,    
62    t1.effective_date,    
63    t1.expiry_date    
64FROM      
65(      
66SELECT      
67    t2.customer_number customer_number,    
68    t2.customer_name customer_name,    
69    t2.customer_street_address customer_street_address,    
70    t2.customer_zip_code customer_zip_code,    
71    t2.customer_city customer_city,    
72    t2.customer_state customer_state,    
73    t2.shipping_address shipping_address,    
74    t2.shipping_zip_code shipping_zip_code,    
75    t2.shipping_city shipping_city,    
76    t2.shipping_state shipping_state,    
77    t1.version + 1 version,    
78    ${hivevar:pre_date} effective_date,      
79    ${hivevar:max_date} expiry_date      
80 FROM customer_dim t1    
81INNER JOIN rds.customer t2      
82   ON t1.customer_number = t2.customer_number      
83  AND t1.expiry_date = ${hivevar:pre_date}      
84 LEFT JOIN customer_dim t3    
85   ON t1.customer_number = t3.customer_number    
86  AND t3.expiry_date = ${hivevar:max_date}      
87WHERE (!(t1.customer_street_address <=> t2.customer_street_address)    
88   OR  !(t1.customer_zip_code <=> t2.customer_zip_code)    
89   OR  !(t1.customer_city <=> t2.customer_city)    
90   OR  !(t1.customer_state <=> t2.customer_state)    
91   OR  !(t1.shipping_address <=> t2.shipping_address)    
92   OR  !(t1.shipping_zip_code <=> t2.shipping_zip_code)    
93   OR  !(t1.shipping_city <=> t2.shipping_city)    
94   OR  !(t1.shipping_state <=> t2.shipping_state)    
95   )    
96  AND t3.customer_sk IS NULL) t1      
97CROSS JOIN      
98(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;    
99    
100-- 处理customer_name列上的SCD1    
101-- 因为hive的update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update    
102-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录    
103DROP TABLE IF EXISTS tmp;    
104CREATE TABLE tmp AS    
105SELECT    
106    a.customer_sk,    
107    a.customer_number,    
108    b.customer_name,    
109    a.customer_street_address,    
110    a.customer_zip_code,    
111    a.customer_city,    
112    a.customer_state,    
113    a.shipping_address,    
114    a.shipping_zip_code,    
115    a.shipping_city,    
116    a.shipping_state,    
117    a.version,    
118    a.effective_date,    
119    a.expiry_date    
120  FROM customer_dim a, rds.customer b      
121 WHERE a.customer_number = b.customer_number AND !(a.customer_name <=> b.customer_name);      
122DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);      
123INSERT INTO customer_dim SELECT * FROM tmp;    
124    
125-- 处理新增的customer记录    
126INSERT INTO customer_dim    
127SELECT    
128    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,    
129    t1.customer_number,    
130    t1.customer_name,    
131    t1.customer_street_address,    
132    t1.customer_zip_code,    
133    t1.customer_city,    
134    t1.customer_state,    
135    t1.shipping_address,    
136    t1.shipping_zip_code,    
137    t1.shipping_city,    
138    t1.shipping_state,    
139    1,    
140    ${hivevar:pre_date},    
141    ${hivevar:max_date}    
142FROM      
143(      
144SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number      
145 WHERE t2.customer_sk IS NULL) t1      
146CROSS JOIN      
147(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;    
148    
149-- 重载PA客户维度    
150TRUNCATE TABLE pa_customer_dim;      
151INSERT INTO pa_customer_dim      
152SELECT      
153  customer_sk      
154, customer_number      
155, customer_name      
156, customer_street_address      
157, customer_zip_code      
158, customer_city      
159, customer_state      
160, shipping_address      
161, shipping_zip_code      
162, shipping_city      
163, shipping_state      
164, version      
165, effective_date      
166, expiry_date      
167FROM customer_dim      
168WHERE customer_state = 'PA' ;    
169    
170-- 装载product维度    
171-- 设置已删除记录和product_name、product_category列上SCD2的过期    
172UPDATE product_dim    
173   SET expiry_date = ${hivevar:pre_date}      
174 WHERE product_dim.product_sk IN      
175(SELECT a.product_sk    
176   FROM (SELECT product_sk,product_code,product_name,product_category    
177           FROM product_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN    
178                rds.product b ON a.product_code = b.product_code    
179          WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));    
180    
181-- 处理product_name、product_category列上SCD2的新增行      
182INSERT INTO product_dim    
183SELECT    
184    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,    
185    t1.product_code,    
186    t1.product_name,    
187    t1.product_category,    
188    t1.version,    
189    t1.effective_date,    
190    t1.expiry_date    
191FROM      
192(      
193SELECT      
194    t2.product_code product_code,    
195    t2.product_name product_name,    
196    t2.product_category product_category,        
197    t1.version + 1 version,    
198    ${hivevar:pre_date} effective_date,      
199    ${hivevar:max_date} expiry_date      
200 FROM product_dim t1    
201INNER JOIN rds.product t2      
202   ON t1.product_code = t2.product_code      
203  AND t1.expiry_date = ${hivevar:pre_date}      
204 LEFT JOIN product_dim t3    
205   ON t1.product_code = t3.product_code    
206  AND t3.expiry_date = ${hivevar:max_date}      
207WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1      
208CROSS JOIN      
209(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;    
210    
211-- 处理新增的product记录    
212INSERT INTO product_dim    
213SELECT    
214    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,    
215    t1.product_code,    
216    t1.product_name,    
217    t1.product_category,    
218    1,    
219    ${hivevar:pre_date},    
220    ${hivevar:max_date}    
221FROM      
222(      
223SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code      
224 WHERE t2.product_sk IS NULL) t1      
225CROSS JOIN      
226(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;    
227    
228-- 装载order维度,
229-- 前一天新增的销售订单号  
230INSERT INTO order_dim    
231SELECT    
232    ROW_NUMBER() OVER (ORDER BY t1.order_number) + t2.sk_max,    
233    t1.order_number,    
234    t1.version,    
235    t1.effective_date,    
236    t1.expiry_date    
237  FROM    
238(    
239SELECT    
240    order_number order_number,    
241    1 version,    
242    status_date effective_date,    
243    '2200-01-01' expiry_date    
244  FROM rds.sales_order, rds.cdc_time    
245 WHERE order_status = 'N' AND entry_date >= last_load AND entry_date < current_load ) t1    
246CROSS JOIN      
247(SELECT COALESCE(MAX(order_sk),0) sk_max FROM order_dim) t2;    
248    
249-- 装载销售订单事实表
250-- 前一天新增的销售订单  
251INSERT INTO sales_order_fact    
252SELECT    
253    order_sk,    
254    customer_sk,    
255    product_sk,    
256    e.order_date_sk,
257    null,
258    null,
259    null,
260    null,
261    null,
262    null,
263    null,
264    null,
265    f.request_delivery_date_sk,
266    order_amount,    
267    quantity    
268  FROM    
269    rds.sales_order a,    
270    order_dim b,    
271    customer_dim c,    
272    product_dim d,    
273    order_date_dim e,  
274    request_delivery_date_dim f,  
275    rds.cdc_time g    
276 WHERE
277    a.order_status = 'N'
278AND a.order_number = b.order_number    
279AND a.customer_number = c.customer_number    
280AND a.status_date >= c.effective_date    
281AND a.status_date < c.expiry_date    
282AND a.product_code = d.product_code    
283AND a.status_date >= d.effective_date    
284AND a.status_date < d.expiry_date    
285AND to_date(a.status_date) = e.order_date  
286AND to_date(a.request_delivery_date) = f.request_delivery_date  
287AND a.entry_date >= g.last_load AND a.entry_date < g.current_load ;    
288
289-- 处理分配库房、打包、配送和收货四个状态
290DROP TABLE IF EXISTS tmp;
291CREATE TABLE tmp AS
292select t0.order_sk order_sk,
293       t0.customer_sk customer_sk,
294       t0.product_sk product_sk,
295       t0.order_date_sk order_date_sk,
296       t2.allocate_date_sk allocate_date_sk,
297       t1.quantity allocate_quantity,
298       t0.packing_date_sk packing_date_sk,
299       t0.packing_quantity packing_quantity,
300       t0.ship_date_sk ship_date_sk,
301       t0.ship_quantity ship_quantity,
302       t0.receive_date_sk receive_date_sk,
303       t0.receive_quantity receive_quantity,
304       t0.request_delivery_date_sk request_delivery_date_sk,
305       t0.order_amount order_amount,
306       t0.order_quantity order_quantity
307  from sales_order_fact t0,
308       rds.sales_order t1,
309       allocate_date_dim t2,
310       order_dim t3,
311       rds.cdc_time t4
312 where t0.order_sk = t3.order_sk
313   and t3.order_number = t1.order_number and t1.order_status = 'A'
314   and to_date(t1.status_date) = t2.allocate_date
315   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
316
317DELETE FROM sales_order_fact WHERE sales_order_fact.order_sk IN (SELECT order_sk FROM tmp);
318INSERT INTO sales_order_fact SELECT * FROM tmp;
319
320DROP TABLE IF EXISTS tmp;
321CREATE TABLE tmp AS
322select t0.order_sk order_sk,
323       t0.customer_sk customer_sk,
324       t0.product_sk product_sk,
325       t0.order_date_sk order_date_sk,
326       t0.allocate_date_sk allocate_date_sk,
327       t0.allocate_quantity allocate_quantity,
328       t2.packing_date_sk packing_date_sk,
329       t1.quantity packing_quantity,
330       t0.ship_date_sk ship_date_sk,
331       t0.ship_quantity ship_quantity,
332       t0.receive_date_sk receive_date_sk,
333       t0.receive_quantity receive_quantity,
334       t0.request_delivery_date_sk request_delivery_date_sk,
335       t0.order_amount order_amount,
336       t0.order_quantity order_quantity
337  from sales_order_fact t0,
338       rds.sales_order t1,
339       packing_date_dim t2,
340       order_dim t3,
341       rds.cdc_time t4
342 where t0.order_sk = t3.order_sk
343   and t3.order_number = t1.order_number and t1.order_status = 'P'
344   and to_date(t1.status_date) = t2.packing_date
345   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
346  
347DELETE FROM sales_order_fact WHERE sales_order_fact.order_sk IN (SELECT order_sk FROM tmp);
348INSERT INTO sales_order_fact SELECT * FROM tmp;
349
350DROP TABLE IF EXISTS tmp;
351CREATE TABLE tmp AS
352select t0.order_sk order_sk,
353       t0.customer_sk customer_sk,
354       t0.product_sk product_sk,
355       t0.order_date_sk order_date_sk,
356       t0.allocate_date_sk allocate_date_sk,
357       t0.allocate_quantity allocate_quantity,
358       t0.packing_date_sk packing_date_sk,
359       t0.packing_quantity packing_quantity,
360       t2.ship_date_sk ship_date_sk,
361       t1.quantity ship_quantity,
362       t0.receive_date_sk receive_date_sk,
363       t0.receive_quantity receive_quantity,
364       t0.request_delivery_date_sk request_delivery_date_sk,
365       t0.order_amount order_amount,
366       t0.order_quantity order_quantity
367  from sales_order_fact t0,
368       rds.sales_order t1,
369       ship_date_dim t2,
370       order_dim t3,
371       rds.cdc_time t4
372 where t0.order_sk = t3.order_sk
373   and t3.order_number = t1.order_number and t1.order_status = 'S'
374   and to_date(t1.status_date) = t2.ship_date
375   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
376  
377DELETE FROM sales_order_fact WHERE sales_order_fact.order_sk IN (SELECT order_sk FROM tmp);
378INSERT INTO sales_order_fact SELECT * FROM tmp;
379
380DROP TABLE IF EXISTS tmp;
381CREATE TABLE tmp AS
382select t0.order_sk order_sk,
383       t0.customer_sk customer_sk,
384       t0.product_sk product_sk,
385       t0.order_date_sk order_date_sk,
386       t0.allocate_date_sk allocate_date_sk,
387       t0.allocate_quantity allocate_quantity,
388       t0.packing_date_sk packing_date_sk,
389       t0.packing_quantity packing_quantity,
390       t0.ship_date_sk ship_date_sk,
391       t0.ship_quantity ship_quantity,
392       t2.receive_date_sk receive_date_sk,
393       t1.quantity receive_quantity,
394       t0.request_delivery_date_sk request_delivery_date_sk,
395       t0.order_amount order_amount,
396       t0.order_quantity order_quantity
397  from sales_order_fact t0,
398       rds.sales_order t1,
399       receive_date_dim t2,
400       order_dim t3,
401       rds.cdc_time t4
402 where t0.order_sk = t3.order_sk
403   and t3.order_number = t1.order_number and t1.order_status = 'R'
404   and to_date(t1.status_date) = t2.receive_date
405   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
406  
407DELETE FROM sales_order_fact WHERE sales_order_fact.order_sk IN (SELECT order_sk FROM tmp);
408INSERT INTO sales_order_fact SELECT * FROM tmp;
409
410-- 更新时间戳表的last_load字段    
411INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;
412

1
2
1**3. 测试**  
2

(1)新增两个销售订单


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1USE source;  
2/***        
3新增订单日期为2016年7月20日的2条订单。    
4***/      
5SET @start_date := unix_timestamp('2016-07-20');      
6SET @end_date := unix_timestamp('2016-07-21');  
7SET @request_delivery_date := '2016-07-25';    
8DROP TABLE IF EXISTS temp_sales_order_data;      
9CREATE TABLE temp_sales_order_data AS SELECT * FROM sales_order WHERE 1=0;      
10      
11SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));      
12SET @amount := floor(1000 + rand() * 9000);    
13SET @quantity := floor(10 + rand() * 90);    
14INSERT INTO temp_sales_order_data VALUES (1, 1, 1, 1, @order_date, 'N', @request_delivery_date, @order_date, @amount, @quantity);      
15      
16SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));      
17SET @amount := floor(1000 + rand() * 9000);      
18SET @quantity := floor(10 + rand() * 90);    
19INSERT INTO temp_sales_order_data VALUES (2, 2, 2, 2, @order_date, 'N', @request_delivery_date, @order_date, @amount, @quantity);        
20
21INSERT INTO sales_order      
22select null,
23       (@order_number:=@order_number + 1) + 128,
24       customer_number,
25       product_code,
26       status_date,
27       order_status,
28       request_delivery_date,
29       entry_date,
30       order_amount,
31       quantity
32  from temp_sales_order_data t1,(select @order_number:=0) t2
33 order by t1.status_date;        
34    
35COMMIT ;
36

1
2
1(2)设置cdc_time的日期  
2

1
2
3
1use rds;
2INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-07-20', '2016-07-21' FROM rds.cdc_time;
3

1
2
1将regular_etl.sql文件中的SET hivevar:cur_date = CURRENT_DATE();行改为SET hivevar:cur_date = '2016-07-21';  
2

(3)执行定期装载脚本


1
2
1./regular_etl.sh
2

1
2
1(4)查询sales_order_fact表里的两个销售订单,确认定期装载成功  
2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1use dw;
2select b.order_number,
3       c.order_date,
4       d.allocate_date,
5       e.packing_date,
6       f.ship_date,
7       g.receive_date
8  from sales_order_fact a
9 inner join order_dim b on a.order_sk = b.order_sk
10  left join order_date_dim c on a.order_date_sk = c.order_date_sk
11  left join allocate_date_dim d on a.allocate_date_sk = d.allocate_date_sk
12  left join packing_date_dim e on a.packing_date_sk = e.packing_date_sk
13  left join ship_date_dim f on a.ship_date_sk = f.ship_date_sk
14  left join receive_date_dim g on a.receive_date_sk = g.receive_date_sk
15 where b.order_number IN (129 , 130);
16

1
2
1        查询结果如下图所示,只有order_date列有值,其它日期都是NULL,因为这两个订单是新增的,并且还没有分配库房、打包、配送或收货。  
2

(5)添加销售订单作为这两个订单的分配库房和/或打包的里程碑


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1USE source;  
2
3SET @start_date := unix_timestamp('2016-07-21');      
4SET @end_date := unix_timestamp('2016-07-22');  
5SET @mid_date := unix_timestamp('2016-07-21 12:00:00');
6SET @request_delivery_date := '2016-07-25';    
7DROP TABLE IF EXISTS temp_sales_order_data;      
8CREATE TABLE temp_sales_order_data AS SELECT * FROM sales_order WHERE 1=0;      
9      
10SET @order_date := from_unixtime(@start_date + rand() * (@mid_date - @start_date));  
11select order_amount,quantity into @amount,@quantity from sales_order where order_number=129;
12INSERT INTO temp_sales_order_data VALUES (1, 129, 1, 1, @order_date, 'A', @request_delivery_date, @order_date, @amount, @quantity);      
13      
14SET @order_date := from_unixtime(@mid_date + rand() * (@end_date - @mid_date));      
15INSERT INTO temp_sales_order_data VALUES (2, 129, 1, 1, @order_date, 'P', @request_delivery_date, @order_date, @amount, @quantity);        
16
17SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));  
18select order_amount,quantity into @amount,@quantity from sales_order where order_number=130;
19INSERT INTO temp_sales_order_data VALUES (3, 130, 2, 2, @order_date, 'A', @request_delivery_date, @order_date, @amount, @quantity);
20
21INSERT INTO sales_order      
22select null,
23       order_number,
24       customer_number,
25       product_code,
26       status_date,
27       order_status,
28       request_delivery_date,
29       entry_date,
30       order_amount,
31       quantity
32  from temp_sales_order_data
33 order by entry_date;
34
35COMMIT ;
36

1
2
1(6)修改cdc_time的日期  
2

1
2
3
1use rds;
2INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-07-21', '2016-07-22' FROM rds.cdc_time;
3

1
2
1将regular_etl.sql文件中的SET hivevar:cur_date = CURRENT_DATE();行改为SET hivevar:cur_date = '2016-07-22';  
2

(7)执行定期装载脚本


1
2
1./regular_etl.sh
2

1
2
1(8)查询sales_order_fact表里的两个销售订单,确认定期装载成功  
2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1use dw;
2select b.order_number,
3       c.order_date,
4       d.allocate_date,
5       e.packing_date,
6       f.ship_date,
7       g.receive_date
8  from sales_order_fact a
9 inner join order_dim b on a.order_sk = b.order_sk
10  left join order_date_dim c on a.order_date_sk = c.order_date_sk
11  left join allocate_date_dim d on a.allocate_date_sk = d.allocate_date_sk
12  left join packing_date_dim e on a.packing_date_sk = e.packing_date_sk
13  left join ship_date_dim f on a.ship_date_sk = f.ship_date_sk
14  left join receive_date_dim g on a.receive_date_sk = g.receive_date_sk
15 where b.order_number IN (129 , 130);
16

1
2
1        查询结果如下图所示。第一个订单具有了allocate_date和packing_date,第二个只具有allocate_date。  
2

(9)添加销售订单作为这两个订单后面的里程碑:打包、配送和/或收货。注意四个日期可能相同。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1USE source;  
2
3SET @start_date := unix_timestamp('2016-07-22');      
4SET @end_date := unix_timestamp('2016-07-23');  
5SET @mid_date := unix_timestamp('2016-07-22 12:00:00');
6SET @request_delivery_date := '2016-07-25';    
7DROP TABLE IF EXISTS temp_sales_order_data;      
8CREATE TABLE temp_sales_order_data AS SELECT * FROM sales_order WHERE 1=0;      
9      
10SET @order_date := from_unixtime(@start_date + rand() * (@mid_date - @start_date));  
11select order_amount,quantity into @amount,@quantity from sales_order where order_number=129 limit 1;
12INSERT INTO temp_sales_order_data VALUES (1, 129, 1, 1, @order_date, 'S', @request_delivery_date, @order_date, @amount, @quantity);      
13      
14SET @order_date := from_unixtime(@mid_date + rand() * (@end_date - @mid_date));      
15INSERT INTO temp_sales_order_data VALUES (2, 129, 1, 1, @order_date, 'R', @request_delivery_date, @order_date, @amount, @quantity);        
16
17SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));  
18select order_amount,quantity into @amount,@quantity from sales_order where order_number=130 limit 1;
19INSERT INTO temp_sales_order_data VALUES (3, 130, 2, 2, @order_date, 'P', @request_delivery_date, @order_date, @amount, @quantity);
20
21INSERT INTO sales_order      
22select null,
23       order_number,
24       customer_number,
25       product_code,
26       status_date,
27       order_status,
28       request_delivery_date,
29       entry_date,
30       order_amount,
31       quantity
32  from temp_sales_order_data
33 order by entry_date;
34
35COMMIT ;
36

1
2
1(10)修改cdc_time的日期  
2

1
2
3
1use rds;
2INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-07-22', '2016-07-23' FROM rds.cdc_time;
3

1
2
1将regular_etl.sql文件中的SET hivevar:cur_date = CURRENT_DATE();行改为SET hivevar:cur_date = '2016-07-23';  
2

(11)执行定期装载脚本


1
2
1./regular_etl.sh
2

1
2
1(12)查询sales_order_fact表里的两个销售订单,确认定期装载成功  
2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1use dw;
2select b.order_number,
3       c.order_date,
4       d.allocate_date,
5       e.packing_date,
6       f.ship_date,
7       g.receive_date
8  from sales_order_fact a
9 inner join order_dim b on a.order_sk = b.order_sk
10  left join order_date_dim c on a.order_date_sk = c.order_date_sk
11  left join allocate_date_dim d on a.allocate_date_sk = d.allocate_date_sk
12  left join packing_date_dim e on a.packing_date_sk = e.packing_date_sk
13  left join ship_date_dim f on a.ship_date_sk = f.ship_date_sk
14  left join receive_date_dim g on a.receive_date_sk = g.receive_date_sk
15 where b.order_number IN (129 , 130);
16

1
2
1        查询结果如下图所示。第一个订单号为129的订单,具有了全部日期,这意味着订单已完成(客户已经收货)。第二个订单已经打包,但是还没有配送。  
2

(13)还原
将regular_etl.sql文件中的SET hivevar:cur_date = DATE_ADD(CURRENT_DATE(),2);行改为SET hivevar:cur_date = CURRENT_DATE();

给TA打赏
共{{data.count}}人
人已打赏
安全运维

MongoDB最简单的入门教程之三 使用Java代码往MongoDB里插入数据

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索