基于Hadoop生态圈的数据仓库实践——进阶技术(十)

释放双眼,带上耳机,听听看~!

十、杂项维度
本节讨论杂项维度。简单地说,杂项维度就是一种包含的数据具有很少可能值的维度。例如销售订单,它可能有很多离散数据(yes-no这种类型的值),如
verification_ind(如果订单已经被审核,值为yes)credit_check_flag(表示此订单的客户信用状态是否已经检查)new_customer_ind(如果这是新客户的首个订单,值为yes)web_order_flag(表示此订单是否是在线下的订单) 这类数据常被用于增强销售分析,其特点是属性可能很多但每种属性的可能值很少,适合用称为杂项维度的特殊维度类型存储。
1. 新增销售订单属性杂项维度
给现有的数据仓库新增一个销售订单杂项维度,需要新增一个名为sales_order_attribute_dim的维度表。下图显示了增加杂项维度表后的数据仓库模式(这里只显示了和销售订单属性相关的表)。
新的维度表包括四个yes-no列:verification_ind、credit_check_flag、new_customer_ind和web_order_flag。每个列可以有两个可能值中的一个(Y 或 N),因此sales_order_attribute_dim表最多有16(2^4)行。可以预装载这个维度,并且只需装载一次。
注意,如果知道某种组合是不可能出现的,就不需要装载这种组合。执行下面的脚本修改数据库模式。这个脚本做了四项工作:建立sales_order_attribute_dim表,向表中预装载全部16种可能的数据,给销售订单事实表添加杂项维度代理键,给源数据库里的sales_order表增加对应的四个属性列。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
1USE dw;  
2  
3-- 建立杂项维度表  
4CREATE TABLE sales_order_attribute_dim (  
5    sales_order_attribute_sk INT COMMENT 'sales order attribute surrogate key',  
6    verification_ind CHAR(1) COMMENT 'verification index, Y or N',  
7    credit_check_flag CHAR(1) COMMENT 'credit check flag, Y or N',  
8    new_customer_ind CHAR(1) COMMENT 'new customer index, Y or N',  
9    web_order_flag CHAR(1) COMMENT 'web order flag, Y or N',  
10    version int COMMENT 'version',  
11    effective_date DATE COMMENT 'effective date',  
12    expiry_date DATE COMMENT 'expiry date'
13)
14clustered by (sales_order_attribute_sk) into 8 buckets    
15stored as orc tblproperties ('transactional'='true');  
16  
17-- 生成杂项维度数据  
18INSERT INTO sales_order_attribute_dim VALUES (1, 'Y', 'N', 'N', 'N', 1,'1900-00-00', '2200-01-01');
19INSERT INTO sales_order_attribute_dim VALUES (2, 'Y', 'Y', 'N', 'N', 1,'1900-00-00', '2200-01-01');  
20INSERT INTO sales_order_attribute_dim VALUES (3, 'Y', 'Y', 'Y', 'N', 1,'1900-00-00', '2200-01-01');  
21INSERT INTO sales_order_attribute_dim VALUES (4, 'Y', 'Y', 'Y', 'Y', 1,'1900-00-00', '2200-01-01');  
22INSERT INTO sales_order_attribute_dim VALUES (5, 'Y', 'N', 'Y', 'N', 1,'1900-00-00', '2200-01-01');  
23INSERT INTO sales_order_attribute_dim VALUES (6, 'Y', 'N', 'Y', 'Y', 1,'1900-00-00', '2200-01-01');  
24INSERT INTO sales_order_attribute_dim VALUES (7, 'Y', 'N', 'N', 'Y', 1,'1900-00-00', '2200-01-01');  
25INSERT INTO sales_order_attribute_dim VALUES (8, 'Y', 'Y', 'N', 'Y', 1,'1900-00-00', '2200-01-01');  
26INSERT INTO sales_order_attribute_dim VALUES (9, 'N', 'N', 'N', 'N', 1,'1900-00-00', '2200-01-01');  
27INSERT INTO sales_order_attribute_dim VALUES (10, 'N', 'Y', 'N', 'N', 1,'1900-00-00', '2200-01-01');  
28INSERT INTO sales_order_attribute_dim VALUES (11, 'N', 'Y', 'Y', 'N', 1,'1900-00-00', '2200-01-01');  
29INSERT INTO sales_order_attribute_dim VALUES (12, 'N', 'Y', 'Y', 'Y', 1,'1900-00-00', '2200-01-01');  
30INSERT INTO sales_order_attribute_dim VALUES (13, 'N', 'N', 'Y', 'N', 1,'1900-00-00', '2200-01-01');  
31INSERT INTO sales_order_attribute_dim VALUES (14, 'N', 'N', 'Y', 'Y', 1,'1900-00-00', '2200-01-01');  
32INSERT INTO sales_order_attribute_dim VALUES (15, 'N', 'N', 'N', 'Y', 1,'1900-00-00', '2200-01-01');  
33INSERT INTO sales_order_attribute_dim VALUES (16, 'N', 'Y', 'N', 'Y', 1,'1900-00-00', '2200-01-01');  
34
35-- 建立杂项维度外键  
36alter table sales_order_fact rename to sales_order_fact_old;
37create table sales_order_fact(                                                
38   order_number int COMMENT 'order number',                                    
39   customer_sk int COMMENT 'customer surrogate key',                              
40   product_sk int COMMENT 'product surrogate key',
41   sales_order_attribute_sk int COMMENT 'sales order attribute surrogate key',
42   order_date_sk int COMMENT 'order date surrogate key',                          
43   allocate_date_sk int COMMENT 'allocate date surrogate key',                    
44   allocate_quantity int COMMENT 'allocate quantity',                              
45   packing_date_sk int COMMENT 'packing date surrogate key',                      
46   packing_quantity int COMMENT 'packing quantity',                                
47   ship_date_sk int COMMENT 'ship date surrogate key',                            
48   ship_quantity int COMMENT 'ship quantity',                                      
49   receive_date_sk int COMMENT 'receive date surrogate key',                      
50   receive_quantity int COMMENT 'receive quantity',                                
51   request_delivery_date_sk int COMMENT 'request delivery date surrogate key',    
52   order_amount decimal(10,2) COMMENT 'order amount',                              
53   order_quantity int COMMENT 'order quantity')  
54clustered by (order_number) into 8 buckets    
55stored as orc tblproperties ('transactional'='true');
56insert into table sales_order_fact
57select order_number,
58       customer_sk,
59       product_sk,
60       null,
61       order_date_sk,
62       allocate_date_sk,
63       allocate_quantity,
64       packing_date_sk,
65       packing_quantity,
66       ship_date_sk,
67       ship_quantity,
68       receive_date_sk,
69       receive_quantity,
70       request_delivery_date_sk,
71       order_amount,
72       order_quantity
73  from sales_order_fact_old;
74drop table sales_order_fact_old;
75
76-- 给源库的销售订单表增加对应的属性  
77USE source;  
78ALTER TABLE sales_order  
79  ADD verification_ind CHAR (1) AFTER product_code  
80, ADD credit_check_flag CHAR (1) AFTER verification_ind  
81, ADD new_customer_ind CHAR (1) AFTER credit_check_flag  
82, ADD web_order_flag CHAR (1) AFTER new_customer_ind ;  
83
84-- 给销售订单过渡表增加对应的属性
85USE rds;    
86ALTER TABLE sales_order ADD COLUMNS
87(
88verification_ind CHAR(1) COMMENT 'verification index, Y or N',  
89credit_check_flag CHAR(1) COMMENT 'credit check flag, Y or N',  
90new_customer_ind CHAR(1) COMMENT 'new customer index, Y or N',  
91web_order_flag CHAR(1) COMMENT 'web order flag, Y or N'
92) ;
93

1
2
1 **2. 重建Sqoop作业**  
2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1last_value=`sqoop job --show myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop | grep incremental.last.value | awk '{print $3}'`  
2sqoop job --delete myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop    
3sqoop job \
4--meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop \
5--create myjob_incremental_import \
6-- \
7import \
8--connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" \
9--table sales_order \
10--columns "order_number, customer_number, product_code, status_date, entry_date, order_amount, quantity, request_delivery_date, order_status, verification_ind, credit_check_flag, new_customer_ind, web_order_flag" \
11--hive-import \
12--hive-table rds.sales_order \
13--incremental append \
14--check-column id \
15--last-value $last_value
16

1
2
1 **3. 修改定期装载脚本**  
2

由于有了一个新的维度,必须修改定期装载脚本。下面显示了修改后的regular_etl.sql脚本文件内容。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
1-- 设置变量以支持事务    
2set hive.support.concurrency=true;    
3set hive.exec.dynamic.partition.mode=nonstrict;    
4set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;    
5set hive.compactor.initiator.on=true;    
6set hive.compactor.worker.threads=1;    
7    
8USE dw;    
9      
10-- 设置SCD的生效时间和过期时间    
11SET hivevar:cur_date = CURRENT_DATE();
12SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);    
13SET hivevar:max_date = CAST('2200-01-01' AS DATE);    
14      
15-- 设置CDC的上限时间    
16INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;    
17    
18-- 装载customer维度    
19-- 设置已删除记录和地址相关列上SCD2的过期,用<=>运算符处理NULL值。    
20UPDATE customer_dim    
21   SET expiry_date = ${hivevar:pre_date}      
22 WHERE customer_dim.customer_sk IN      
23(SELECT a.customer_sk    
24   FROM (SELECT customer_sk,    
25                customer_number,    
26                customer_street_address,    
27                customer_zip_code,    
28                customer_city,    
29                customer_state,    
30                shipping_address,    
31                shipping_zip_code,    
32                shipping_city,    
33                shipping_state    
34           FROM customer_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN    
35                rds.customer b ON a.customer_number = b.customer_number    
36          WHERE b.customer_number IS NULL OR    
37          (  !(a.customer_street_address <=> b.customer_street_address)    
38          OR !(a.customer_zip_code <=> b.customer_zip_code)    
39          OR !(a.customer_city <=> b.customer_city)    
40          OR !(a.customer_state <=> b.customer_state)    
41          OR !(a.shipping_address <=> b.shipping_address)    
42          OR !(a.shipping_zip_code <=> b.shipping_zip_code)    
43          OR !(a.shipping_city <=> b.shipping_city)    
44          OR !(a.shipping_state <=> b.shipping_state)    
45          ));    
46    
47-- 处理customer_street_addresses列上SCD2的新增行      
48INSERT INTO customer_dim    
49SELECT    
50    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,    
51    t1.customer_number,    
52    t1.customer_name,    
53    t1.customer_street_address,    
54    t1.customer_zip_code,    
55    t1.customer_city,    
56    t1.customer_state,    
57    t1.shipping_address,    
58    t1.shipping_zip_code,    
59    t1.shipping_city,    
60    t1.shipping_state,    
61    t1.version,    
62    t1.effective_date,    
63    t1.expiry_date    
64FROM      
65(      
66SELECT      
67    t2.customer_number customer_number,    
68    t2.customer_name customer_name,    
69    t2.customer_street_address customer_street_address,    
70    t2.customer_zip_code customer_zip_code,    
71    t2.customer_city customer_city,    
72    t2.customer_state customer_state,    
73    t2.shipping_address shipping_address,    
74    t2.shipping_zip_code shipping_zip_code,    
75    t2.shipping_city shipping_city,    
76    t2.shipping_state shipping_state,    
77    t1.version + 1 version,    
78    ${hivevar:pre_date} effective_date,      
79    ${hivevar:max_date} expiry_date      
80 FROM customer_dim t1    
81INNER JOIN rds.customer t2      
82   ON t1.customer_number = t2.customer_number      
83  AND t1.expiry_date = ${hivevar:pre_date}      
84 LEFT JOIN customer_dim t3    
85   ON t1.customer_number = t3.customer_number    
86  AND t3.expiry_date = ${hivevar:max_date}      
87WHERE (!(t1.customer_street_address <=> t2.customer_street_address)    
88   OR  !(t1.customer_zip_code <=> t2.customer_zip_code)    
89   OR  !(t1.customer_city <=> t2.customer_city)    
90   OR  !(t1.customer_state <=> t2.customer_state)    
91   OR  !(t1.shipping_address <=> t2.shipping_address)    
92   OR  !(t1.shipping_zip_code <=> t2.shipping_zip_code)    
93   OR  !(t1.shipping_city <=> t2.shipping_city)    
94   OR  !(t1.shipping_state <=> t2.shipping_state)    
95   )    
96  AND t3.customer_sk IS NULL) t1      
97CROSS JOIN      
98(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;    
99    
100-- 处理customer_name列上的SCD1    
101-- 因为hive的update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update    
102-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录    
103DROP TABLE IF EXISTS tmp;    
104CREATE TABLE tmp AS    
105SELECT    
106    a.customer_sk,    
107    a.customer_number,    
108    b.customer_name,    
109    a.customer_street_address,    
110    a.customer_zip_code,    
111    a.customer_city,    
112    a.customer_state,    
113    a.shipping_address,    
114    a.shipping_zip_code,    
115    a.shipping_city,    
116    a.shipping_state,    
117    a.version,    
118    a.effective_date,    
119    a.expiry_date    
120  FROM customer_dim a, rds.customer b      
121 WHERE a.customer_number = b.customer_number AND !(a.customer_name <=> b.customer_name);      
122DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);      
123INSERT INTO customer_dim SELECT * FROM tmp;    
124    
125-- 处理新增的customer记录    
126INSERT INTO customer_dim    
127SELECT    
128    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,    
129    t1.customer_number,    
130    t1.customer_name,    
131    t1.customer_street_address,    
132    t1.customer_zip_code,    
133    t1.customer_city,    
134    t1.customer_state,    
135    t1.shipping_address,    
136    t1.shipping_zip_code,    
137    t1.shipping_city,    
138    t1.shipping_state,    
139    1,    
140    ${hivevar:pre_date},    
141    ${hivevar:max_date}    
142FROM      
143(      
144SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number      
145 WHERE t2.customer_sk IS NULL) t1      
146CROSS JOIN      
147(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;    
148    
149-- 重载PA客户维度    
150TRUNCATE TABLE pa_customer_dim;      
151INSERT INTO pa_customer_dim      
152SELECT      
153  customer_sk      
154, customer_number      
155, customer_name      
156, customer_street_address      
157, customer_zip_code      
158, customer_city      
159, customer_state      
160, shipping_address      
161, shipping_zip_code      
162, shipping_city      
163, shipping_state      
164, version      
165, effective_date      
166, expiry_date      
167FROM customer_dim      
168WHERE customer_state = 'PA' ;    
169    
170-- 装载product维度    
171-- 设置已删除记录和product_name、product_category列上SCD2的过期    
172UPDATE product_dim    
173   SET expiry_date = ${hivevar:pre_date}      
174 WHERE product_dim.product_sk IN      
175(SELECT a.product_sk    
176   FROM (SELECT product_sk,product_code,product_name,product_category    
177           FROM product_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN    
178                rds.product b ON a.product_code = b.product_code    
179          WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));    
180    
181-- 处理product_name、product_category列上SCD2的新增行      
182INSERT INTO product_dim    
183SELECT    
184    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,    
185    t1.product_code,    
186    t1.product_name,    
187    t1.product_category,    
188    t1.version,    
189    t1.effective_date,    
190    t1.expiry_date    
191FROM      
192(      
193SELECT      
194    t2.product_code product_code,    
195    t2.product_name product_name,    
196    t2.product_category product_category,        
197    t1.version + 1 version,    
198    ${hivevar:pre_date} effective_date,      
199    ${hivevar:max_date} expiry_date      
200 FROM product_dim t1    
201INNER JOIN rds.product t2      
202   ON t1.product_code = t2.product_code      
203  AND t1.expiry_date = ${hivevar:pre_date}      
204 LEFT JOIN product_dim t3    
205   ON t1.product_code = t3.product_code    
206  AND t3.expiry_date = ${hivevar:max_date}      
207WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1      
208CROSS JOIN      
209(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;    
210    
211-- 处理新增的product记录    
212INSERT INTO product_dim    
213SELECT    
214    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,    
215    t1.product_code,    
216    t1.product_name,    
217    t1.product_category,    
218    1,    
219    ${hivevar:pre_date},    
220    ${hivevar:max_date}    
221FROM      
222(      
223SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code      
224 WHERE t2.product_sk IS NULL) t1      
225CROSS JOIN      
226(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;    
227    
228-- 装载销售订单事实表
229-- 前一天新增的销售订单  
230INSERT INTO sales_order_fact    
231SELECT    
232    a.order_number,    
233    customer_sk,    
234    product_sk,
235    g.sales_order_attribute_sk,
236    e.order_date_sk,
237    null,
238    null,
239    null,
240    null,
241    null,
242    null,
243    null,
244    null,
245    f.request_delivery_date_sk,
246    order_amount,    
247    quantity    
248  FROM    
249    rds.sales_order a,    
250    customer_dim c,    
251    product_dim d,    
252    order_date_dim e,  
253    request_delivery_date_dim f,
254    sales_order_attribute_dim g,
255    rds.cdc_time h
256 WHERE
257    a.order_status = 'N'
258AND a.customer_number = c.customer_number    
259AND a.status_date >= c.effective_date    
260AND a.status_date < c.expiry_date    
261AND a.product_code = d.product_code    
262AND a.status_date >= d.effective_date    
263AND a.status_date < d.expiry_date    
264AND to_date(a.status_date) = e.order_date  
265AND to_date(a.request_delivery_date) = f.request_delivery_date
266AND a.verification_ind = g.verification_ind  
267AND a.credit_check_flag = g.credit_check_flag  
268AND a.new_customer_ind = g.new_customer_ind  
269AND a.web_order_flag = g.web_order_flag
270AND a.entry_date >= h.last_load AND a.entry_date < h.current_load ;    
271
272-- 处理分配库房、打包、配送和收货四个状态
273DROP TABLE IF EXISTS tmp;
274CREATE TABLE tmp AS
275select t0.order_number order_number,
276       t0.customer_sk customer_sk,
277       t0.product_sk product_sk,
278       t0.sales_order_attribute_sk,
279       t0.order_date_sk order_date_sk,
280       t2.allocate_date_sk allocate_date_sk,
281       t1.quantity allocate_quantity,
282       t0.packing_date_sk packing_date_sk,
283       t0.packing_quantity packing_quantity,
284       t0.ship_date_sk ship_date_sk,
285       t0.ship_quantity ship_quantity,
286       t0.receive_date_sk receive_date_sk,
287       t0.receive_quantity receive_quantity,
288       t0.request_delivery_date_sk request_delivery_date_sk,
289       t0.order_amount order_amount,
290       t0.order_quantity order_quantity
291  from sales_order_fact t0,
292       rds.sales_order t1,
293       allocate_date_dim t2,
294       rds.cdc_time t4
295 where t0.order_number = t1.order_number and t1.order_status = 'A'
296   and to_date(t1.status_date) = t2.allocate_date
297   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
298
299DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
300INSERT INTO sales_order_fact SELECT * FROM tmp;
301
302DROP TABLE IF EXISTS tmp;
303CREATE TABLE tmp AS
304select t0.order_number order_number,
305       t0.customer_sk customer_sk,
306       t0.product_sk product_sk,
307       t0.sales_order_attribute_sk,
308       t0.order_date_sk order_date_sk,
309       t0.allocate_date_sk allocate_date_sk,
310       t0.allocate_quantity allocate_quantity,
311       t2.packing_date_sk packing_date_sk,
312       t1.quantity packing_quantity,
313       t0.ship_date_sk ship_date_sk,
314       t0.ship_quantity ship_quantity,
315       t0.receive_date_sk receive_date_sk,
316       t0.receive_quantity receive_quantity,
317       t0.request_delivery_date_sk request_delivery_date_sk,
318       t0.order_amount order_amount,
319       t0.order_quantity order_quantity
320  from sales_order_fact t0,
321       rds.sales_order t1,
322       packing_date_dim t2,
323       rds.cdc_time t4
324 where t0.order_number = t1.order_number and t1.order_status = 'P'
325   and to_date(t1.status_date) = t2.packing_date
326   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
327  
328DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
329INSERT INTO sales_order_fact SELECT * FROM tmp;
330
331DROP TABLE IF EXISTS tmp;
332CREATE TABLE tmp AS
333select t0.order_number order_number,
334       t0.customer_sk customer_sk,
335       t0.product_sk product_sk,
336       t0.sales_order_attribute_sk,
337       t0.order_date_sk order_date_sk,
338       t0.allocate_date_sk allocate_date_sk,
339       t0.allocate_quantity allocate_quantity,
340       t0.packing_date_sk packing_date_sk,
341       t0.packing_quantity packing_quantity,
342       t2.ship_date_sk ship_date_sk,
343       t1.quantity ship_quantity,
344       t0.receive_date_sk receive_date_sk,
345       t0.receive_quantity receive_quantity,
346       t0.request_delivery_date_sk request_delivery_date_sk,
347       t0.order_amount order_amount,
348       t0.order_quantity order_quantity
349  from sales_order_fact t0,
350       rds.sales_order t1,
351       ship_date_dim t2,
352       rds.cdc_time t4
353 where t0.order_number = t1.order_number and t1.order_status = 'S'
354   and to_date(t1.status_date) = t2.ship_date
355   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
356  
357DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
358INSERT INTO sales_order_fact SELECT * FROM tmp;
359
360DROP TABLE IF EXISTS tmp;
361CREATE TABLE tmp AS
362select t0.order_number order_number,
363       t0.customer_sk customer_sk,
364       t0.product_sk product_sk,
365       t0.sales_order_attribute_sk,
366       t0.order_date_sk order_date_sk,
367       t0.allocate_date_sk allocate_date_sk,
368       t0.allocate_quantity allocate_quantity,
369       t0.packing_date_sk packing_date_sk,
370       t0.packing_quantity packing_quantity,
371       t0.ship_date_sk ship_date_sk,
372       t0.ship_quantity ship_quantity,
373       t2.receive_date_sk receive_date_sk,
374       t1.quantity receive_quantity,
375       t0.request_delivery_date_sk request_delivery_date_sk,
376       t0.order_amount order_amount,
377       t0.order_quantity order_quantity
378  from sales_order_fact t0,
379       rds.sales_order t1,
380       receive_date_dim t2,
381       rds.cdc_time t4
382 where t0.order_number = t1.order_number and t1.order_status = 'R'
383   and to_date(t1.status_date) = t2.receive_date
384   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
385  
386DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
387INSERT INTO sales_order_fact SELECT * FROM tmp;
388
389-- 更新时间戳表的last_load字段    
390INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;
391

1
2
1 **4. 测试修改后的定期装载**  
2

(1)使用下面的脚本添加八个销售订单。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
1USE source;  
2DROP TABLE IF EXISTS temp_sales_order_data;    
3CREATE TABLE temp_sales_order_data AS SELECT * FROM sales_order WHERE 1=0;
4
5SET @start_date := unix_timestamp('2016-07-31');    
6SET @end_date := unix_timestamp('2016-08-01');    
7
8SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
9SET @amount := floor(1000 + rand() * 9000);  
10SET @quantity := floor(10 + rand() * 90);      
11INSERT INTO temp_sales_order_data VALUES (1, 133, 1, 1, 'Y', 'Y', 'N', 'Y', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity);  
12
13SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
14SET @amount := floor(1000 + rand() * 9000);  
15SET @quantity := floor(10 + rand() * 90);      
16INSERT INTO temp_sales_order_data VALUES (2, 134, 2, 2, 'N', 'N', 'N', 'N', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity);  
17
18SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
19SET @amount := floor(1000 + rand() * 9000);  
20SET @quantity := floor(10 + rand() * 90);      
21INSERT INTO temp_sales_order_data VALUES (3, 135, 3, 3, 'Y', 'Y', 'N', 'N', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity);  
22
23SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
24SET @amount := floor(1000 + rand() * 9000);  
25SET @quantity := floor(10 + rand() * 90);      
26INSERT INTO temp_sales_order_data VALUES (4, 136, 4, 4, 'Y', 'N', 'N', 'N', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity);
27
28SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
29SET @amount := floor(1000 + rand() * 9000);  
30SET @quantity := floor(10 + rand() * 90);      
31INSERT INTO temp_sales_order_data VALUES (5, 137, 11, 1, 'N', 'Y', 'Y', 'Y', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity);
32
33SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
34SET @amount := floor(1000 + rand() * 9000);  
35SET @quantity := floor(10 + rand() * 90);      
36INSERT INTO temp_sales_order_data VALUES (6, 138, 12, 2, 'N', 'Y', 'Y', 'N', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity);
37
38SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
39SET @amount := floor(1000 + rand() * 9000);  
40SET @quantity := floor(10 + rand() * 90);      
41INSERT INTO temp_sales_order_data VALUES (7, 139, 13, 3, 'Y', 'Y', 'Y', 'N', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity);
42
43SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
44SET @amount := floor(1000 + rand() * 9000);  
45SET @quantity := floor(10 + rand() * 90);      
46INSERT INTO temp_sales_order_data VALUES (8, 140, 14, 4, 'Y', 'N', 'Y', 'N', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity);
47
48INSERT INTO sales_order          
49select null,    
50       @rn:=@rn+1,    
51       customer_number,    
52       product_code,
53       verification_ind,
54       credit_check_flag,
55       new_customer_ind,
56       web_order_flag,  
57       status_date,    
58       order_status,    
59       request_delivery_date,    
60       entry_date,    
61       order_amount,    
62       quantity    
63  from temp_sales_order_data t1 ,(select @rn:=132) t2
64 order by t1.status_date;  
65  
66COMMIT;
67

1
2
1 (2)执行定期装载  
2

1
2
1./regular_etl.sh
2

1
2
1 (3)验证结果  
2

可以使用下面的分析性查询确认装载正确。该查询分析出检查了信用状态的新用户有所占的比例。


1
2
3
4
5
6
7
8
1USE dw;  
2SELECT CONCAT(ROUND(checked / (checked + not_checked) * 100),' % ')  
3  FROM (SELECT sum(case when credit_check_flag='Y' then 1 else 0 end) checked,
4               sum(case when credit_check_flag='N' then 1 else 0 end) not_checked
5          FROM sales_order_fact a, sales_order_attribute_dim b  
6         WHERE new_customer_ind = 'Y'              
7           AND a.sales_order_attribute_sk = b.sales_order_attribute_sk) t;
8

查询结果下图所示。

基于Hadoop生态圈的数据仓库实践——进阶技术(十)

给TA打赏
共{{data.count}}人
人已打赏
安全运维

MongoDB最简单的入门教程之三 使用Java代码往MongoDB里插入数据

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索