基于hadoop生态圈的数据仓库实践——进阶技术(十五)

释放双眼,带上耳机,听听看~!

十五、维度合并
随着数据仓库中维度的增加,我们会发现有些通用的数据存在于多个维度中。例如,客户维度的客户地址相关信息、送货地址相关信息和工厂维度里都有邮编、城市和州。本节说明如何把三个维度里的邮编相关信息合并到一个新的维度。
1. 修改数据仓库模式
为了合并维度,需要改变数据仓库模式。下图显示了修改后的模式。新增了一个zip_code_dim表,sales_order_fact和production_fact表的结构也做了相应的修改。注意图中只显示了与邮编维度相关的表。
zip_code_dim表与两个事实表相关联。这些关系替换了这两个事实表与客户维度、工厂维度的关系。sales_order_fact表需要两个关系,一个关联到客户地址邮编,另一个关联到送货地址邮编。与production_fact表只有一个关系,所以在这个事实表里只增加了工厂地址邮编代理键。
下面的脚本用于修改数据仓库模式,所做的修改如下。
创建邮编维度表zip_code_dim。初始装载邮编相关数据基于zip_code_dim表创建customer_zip_code_dim和shipping_zip_code_dim视图。在sales_order_fact表上增加customer_zip_code_sk和shipping_zip_code_sk列。基于已有的客户邮编和送货邮编初始装载两个邮编代理键在customer_dim表上删除客户和送货邮编及其它们的城市和州列。在pa_customer_dim上删除客户的城市、州和邮编列。基于zip_code_dim表创建factory_zip_code_dim视图。给production_fact表增加factory_zip_code_sk列。从现有的工厂邮编装载factory_zip_code_sk值。在factory_dim表上删除工厂编码及其它们的城市和州列。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
1use dw;  
2
3-- 建立地址维度表  
4create table zip_code_dim (  
5    zip_code_sk int,  
6    zip_code int,  
7    city varchar(30),  
8    state varchar(2),  
9    version int,  
10    effective_date date,  
11    expiry_date date  
12)
13clustered by (zip_code_sk) into 8 buckets        
14stored as orc tblproperties ('transactional'='true');
15  
16-- 初始装载邮编相关数据  
17insert into zip_code_dim values (1,17050,'pittsburgh','PA',1,'1900-01-01','2200-01-01');  
18insert into zip_code_dim values (2,17051,'mc veytown','PA',1,'1900-01-01','2200-01-01');  
19insert into zip_code_dim values (3,17052,'mapleton depot','PA',1,'1900-01-01','2200-01-01');  
20insert into zip_code_dim values (4,17053,'marysville','PA',1,'1900-01-01','2200-01-01');  
21insert into zip_code_dim values (5,17054,'mattawana','PA',1,'1900-01-01','2200-01-01');  
22insert into zip_code_dim values (6,17055,'mechanicsburg','PA',1,'1900-01-01','2200-01-01');  
23insert into zip_code_dim values (7,44102,'cleveland','OH',1,'1900-01-01','2200-01-01');  
24  
25-- 创建视图  
26create view customer_zip_code_dim (customer_zip_code_sk , customer_zip_code , customer_city , customer_state , version , effective_date , expiry_date) as  
27    select  
28        zip_code_sk,  
29        zip_code,  
30        city,  
31        state,  
32        version,  
33        effective_date,  
34        expiry_date  
35    from  
36        zip_code_dim;  
37  
38create view shipping_zip_code_dim (shipping_zip_code_sk , shipping_zip_code , shipping_city , shipping_state , version , effective_date , expiry_date) as  
39    select  
40        zip_code_sk,  
41        zip_code,  
42        city,  
43        state,  
44        version,  
45        effective_date,  
46        expiry_date  
47    from  
48        zip_code_dim;  
49  
50-- 添加邮编代理键
51alter table sales_order_fact rename to sales_order_fact_old;
52create table sales_order_fact(                                                    
53   order_number int COMMENT 'order number',                                        
54   customer_sk int COMMENT 'customer surrogate key',
55   customer_zip_code_sk int COMMENT 'customer zip code sk',
56   shipping_zip_code_sk int COMMENT 'shipping zip code sk',
57   product_sk int COMMENT 'product surrogate key',    
58   sales_order_attribute_sk int COMMENT 'sales order attribute surrogate key',    
59   order_date_sk int COMMENT 'order date surrogate key',    
60   entry_date_sk int COMMENT 'entry date surrogate key',  
61   allocate_date_sk int COMMENT 'allocate date surrogate key',                        
62   allocate_quantity int COMMENT 'allocate quantity',                                  
63   packing_date_sk int COMMENT 'packing date surrogate key',                          
64   packing_quantity int COMMENT 'packing quantity',                                    
65   ship_date_sk int COMMENT 'ship date surrogate key',                                
66   ship_quantity int COMMENT 'ship quantity',                                          
67   receive_date_sk int COMMENT 'receive date surrogate key',                          
68   receive_quantity int COMMENT 'receive quantity',                                    
69   request_delivery_date_sk int COMMENT 'request delivery date surrogate key',        
70   order_amount decimal(10,2) COMMENT 'order amount',                                  
71   order_quantity int COMMENT 'order quantity')      
72clustered by (order_number) into 8 buckets        
73stored as orc tblproperties ('transactional'='true');  
74insert into sales_order_fact  
75select order_number,    
76       customer_sk,
77       null,
78       null,
79       product_sk,  
80       sales_order_attribute_sk,  
81       order_date_sk,  
82       entry_date_sk,  
83       allocate_date_sk,              
84       allocate_quantity,                  
85       packing_date_sk,                    
86       packing_quantity,                          
87       ship_date_sk,                        
88       ship_quantity,                                  
89       receive_date_sk,                
90       receive_quantity,                
91       request_delivery_date_sk,  
92       order_amount,                            
93       order_quantity  
94  from sales_order_fact_old;  
95drop table sales_order_fact_old;  
96  
97-- 初始装载两个邮编代理键
98drop table if exists tmp;
99create table tmp as
100select t1.order_number,    
101       t1.customer_sk,
102       t2.customer_zip_code_sk,
103       t3.shipping_zip_code_sk,
104       t1.product_sk,  
105       t1.sales_order_attribute_sk,  
106       t1.order_date_sk,  
107       t1.entry_date_sk,  
108       t1.allocate_date_sk,              
109       t1.allocate_quantity,                  
110       t1.packing_date_sk,                    
111       t1.packing_quantity,                          
112       t1.ship_date_sk,                        
113       t1.ship_quantity,                                  
114       t1.receive_date_sk,                
115       t1.receive_quantity,                
116       t1.request_delivery_date_sk,  
117       t1.order_amount,                            
118       t1.order_quantity
119  from sales_order_fact t1
120  left join
121(select a.order_number order_number,c.customer_zip_code_sk customer_zip_code_sk
122  from sales_order_fact a,
123       customer_dim b,
124       customer_zip_code_dim c
125 where a.customer_sk = b.customer_sk
126   and b.customer_zip_code = c.customer_zip_code) t2 on t1.order_number = t2.order_number
127  left join
128(select a.order_number order_number,c.shipping_zip_code_sk shipping_zip_code_sk
129  from sales_order_fact a,
130       customer_dim b,
131       shipping_zip_code_dim c
132 where a.customer_sk = b.customer_sk
133   and b.shipping_zip_code = c.shipping_zip_code) t3 on t1.order_number = t3.order_number;
134delete from sales_order_fact where sales_order_fact.order_number in (select order_number from tmp);
135insert into sales_order_fact select * from tmp;
136
137alter table customer_dim rename to customer_dim_old;
138create table customer_dim
139(customer_sk int COMMENT 'surrogate key',
140 customer_number int COMMENT 'number',
141 customer_name varchar(50) COMMENT 'name',
142 customer_street_address varchar(50) COMMENT 'address',
143 shipping_address varchar(50) COMMENT 'shipping_address',
144 version int COMMENT 'version',
145 effective_date date COMMENT 'effective date',
146 expiry_date date COMMENT 'expiry date')  
147clustered by (customer_sk) into 8 buckets        
148stored as orc tblproperties ('transactional'='true');
149insert into customer_dim
150select customer_sk,
151       customer_number,
152       customer_name,
153       customer_street_address,
154       shipping_address,
155       version,
156       effective_date,
157       expiry_date
158  from customer_dim_old;
159drop table customer_dim_old;
160
161alter table pa_customer_dim rename to pa_customer_dim_old;
162create table pa_customer_dim
163(customer_sk int,
164 customer_number int,
165 customer_name varchar(50),
166 customer_street_address varchar(50),
167 shipping_address varchar(50),
168 version int,
169 effective_date date,
170 expiry_date date)
171clustered by (customer_sk) into 8 buckets        
172stored as orc tblproperties ('transactional'='true');
173insert into pa_customer_dim
174select customer_sk,
175       customer_number,
176       customer_name,
177       customer_street_address,
178       shipping_address,
179       version,
180       effective_date,
181       expiry_date
182  from pa_customer_dim_old;
183drop table pa_customer_dim_old;
184
185-- 创建视图  
186create view factory_zip_code_dim (factory_zip_code_sk , factory_zip_code , factory_city , factory_state , version,effective_date , expiry_date) as  
187    select  
188        zip_code_sk,  
189        zip_code,  
190        city,  
191        state,  
192        version,  
193        effective_date,  
194        expiry_date  
195    from  
196        zip_code_dim;  
197  
198alter table production_fact rename to production_fact_old;
199create table production_fact
200(product_sk int,
201 production_date_sk int,
202 factory_sk int,
203 factory_zip_code_sk int,
204 production_quantity int);
205-- 初始装载邮编代理键
206insert into production_fact
207select a.product_sk,
208       a.production_date_sk,
209       a.factory_sk,
210       c.factory_zip_code_sk,
211       a.production_quantity
212  from production_fact_old a,
213       factory_dim b,
214       factory_zip_code_dim c
215 where a.factory_sk = b.factory_sk
216   and b.factory_zip_code = c.factory_zip_code;
217drop table production_fact_old;  
218    
219-- 在factory_dim表上删除工厂编码及其它们的城市和州列  
220alter table factory_dim rename to factory_dim_old;
221create table factory_dim
222(factory_sk int,
223 factory_code int,
224 factory_name varchar(30),
225 factory_street_address varchar(50),
226 version int,
227 effective_date date,
228 expiry_date date)
229clustered by (factory_sk) into 8 buckets        
230stored as orc tblproperties ('transactional'='true');
231insert into factory_dim
232select factory_sk,
233       factory_code,
234       factory_name,
235       factory_street_address,
236       version,
237       effective_date,
238       expiry_date
239  from factory_dim_old;
240drop table factory_dim_old;
241

1
2
1 执行完修改数据仓库模式的脚本后,可以查询customer_zip_code_dim、shipping_code_dim、factory_zip_code_dim维度表和sales_order_fact、production_fact事实表,确认邮编已经被成功分离。  
2

2. 修改定期装载脚本
定期装载有三个地方的修改:
删除客户维度装载里所有邮编信息相关的列,因为客户维度里不再有客户邮编和送货邮编相关信息。在事实表中引用客户邮编视图和送货邮编视图中的代理键。修改pa_customer_dim装载,因为需要从销售订单事实表的customer_zip_code_sk获取客户邮编。 修改后的regular_etl.sql脚本如下所示。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
1-- 设置环境与时间窗口  
2!run /root/set_time.sql  
3    
4-- 装载customer维度    
5-- 设置已删除记录和地址相关列上SCD2的过期,用<=>运算符处理NULL值。    
6UPDATE customer_dim    
7   SET expiry_date = ${hivevar:pre_date}      
8 WHERE customer_dim.customer_sk IN      
9(SELECT a.customer_sk    
10   FROM (SELECT customer_sk,    
11                customer_number,    
12                customer_street_address,    
13                shipping_address                  
14           FROM customer_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN    
15                rds.customer b ON a.customer_number = b.customer_number    
16          WHERE b.customer_number IS NULL OR    
17          (  !(a.customer_street_address <=> b.customer_street_address)    
18          OR !(a.shipping_address <=> b.shipping_address)    
19          ));    
20    
21-- 处理customer_street_addresses列上SCD2的新增行      
22INSERT INTO customer_dim    
23SELECT    
24    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,    
25    t1.customer_number,    
26    t1.customer_name,    
27    t1.customer_street_address,    
28    t1.shipping_address,    
29    t1.version,    
30    t1.effective_date,    
31    t1.expiry_date    
32FROM      
33(      
34SELECT      
35    t2.customer_number customer_number,    
36    t2.customer_name customer_name,    
37    t2.customer_street_address customer_street_address,    
38    t2.shipping_address shipping_address,    
39    t1.version + 1 version,    
40    ${hivevar:pre_date} effective_date,      
41    ${hivevar:max_date} expiry_date      
42 FROM customer_dim t1    
43INNER JOIN rds.customer t2      
44   ON t1.customer_number = t2.customer_number      
45  AND t1.expiry_date = ${hivevar:pre_date}      
46 LEFT JOIN customer_dim t3    
47   ON t1.customer_number = t3.customer_number    
48  AND t3.expiry_date = ${hivevar:max_date}      
49WHERE (!(t1.customer_street_address <=> t2.customer_street_address)    
50   OR  !(t1.shipping_address <=> t2.shipping_address)    
51   )    
52  AND t3.customer_sk IS NULL) t1      
53CROSS JOIN      
54(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;    
55    
56-- 处理customer_name列上的SCD1    
57-- 因为hive的update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update    
58-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录    
59DROP TABLE IF EXISTS tmp;    
60CREATE TABLE tmp AS    
61SELECT    
62    a.customer_sk,    
63    a.customer_number,    
64    b.customer_name,    
65    a.customer_street_address,    
66    a.shipping_address,    
67    a.version,    
68    a.effective_date,    
69    a.expiry_date    
70  FROM customer_dim a, rds.customer b      
71 WHERE a.customer_number = b.customer_number AND !(a.customer_name <=> b.customer_name);      
72DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);      
73INSERT INTO customer_dim SELECT * FROM tmp;    
74    
75-- 处理新增的customer记录    
76INSERT INTO customer_dim    
77SELECT    
78    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,    
79    t1.customer_number,    
80    t1.customer_name,    
81    t1.customer_street_address,    
82    t1.shipping_address,    
83    1,    
84    ${hivevar:pre_date},    
85    ${hivevar:max_date}    
86FROM      
87(      
88SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number      
89 WHERE t2.customer_sk IS NULL) t1      
90CROSS JOIN      
91(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;    
92    
93-- 装载product维度    
94-- 设置已删除记录和product_name、product_category列上SCD2的过期    
95UPDATE product_dim    
96   SET expiry_date = ${hivevar:pre_date}      
97 WHERE product_dim.product_sk IN      
98(SELECT a.product_sk    
99   FROM (SELECT product_sk,product_code,product_name,product_category    
100           FROM product_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN    
101                rds.product b ON a.product_code = b.product_code    
102          WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));    
103    
104-- 处理product_name、product_category列上SCD2的新增行      
105INSERT INTO product_dim    
106SELECT    
107    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,    
108    t1.product_code,    
109    t1.product_name,    
110    t1.product_category,    
111    t1.version,    
112    t1.effective_date,    
113    t1.expiry_date    
114FROM      
115(      
116SELECT      
117    t2.product_code product_code,    
118    t2.product_name product_name,    
119    t2.product_category product_category,        
120    t1.version + 1 version,    
121    ${hivevar:pre_date} effective_date,      
122    ${hivevar:max_date} expiry_date      
123 FROM product_dim t1    
124INNER JOIN rds.product t2      
125   ON t1.product_code = t2.product_code      
126  AND t1.expiry_date = ${hivevar:pre_date}      
127 LEFT JOIN product_dim t3    
128   ON t1.product_code = t3.product_code    
129  AND t3.expiry_date = ${hivevar:max_date}      
130WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1      
131CROSS JOIN      
132(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;    
133    
134-- 处理新增的product记录    
135INSERT INTO product_dim    
136SELECT    
137    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,    
138    t1.product_code,    
139    t1.product_name,    
140    t1.product_category,    
141    1,    
142    ${hivevar:pre_date},    
143    ${hivevar:max_date}    
144FROM      
145(      
146SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code      
147 WHERE t2.product_sk IS NULL) t1      
148CROSS JOIN      
149(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;    
150
151-- 装载product_count_fact表
152insert overwrite table product_count_fact
153select product_sk,date_sk
154  from (select a.product_sk product_sk,
155               a.product_code product_code,
156               b.date_sk date_sk,
157               row_number() over (partition by a.product_code order by b.date_sk) rn
158          from product_dim a,date_dim b
159         where a.effective_date = b.date) t
160 where rn = 1;
161
162-- 装载销售订单事实表
163-- 前一天新增的销售订单  
164INSERT INTO sales_order_fact    
165SELECT    
166    a.order_number,    
167    customer_sk,
168    i.customer_zip_code_sk,  
169    j.shipping_zip_code_sk,    
170    product_sk,
171    g.sales_order_attribute_sk,
172    e.order_date_sk,
173 h.entry_date_sk,
174    null,
175    null,
176    null,
177    null,
178    null,
179    null,
180    null,
181    null,
182    f.request_delivery_date_sk,
183    order_amount,    
184    quantity    
185  FROM    
186    rds.sales_order a,    
187    customer_dim c,    
188    product_dim d,    
189    order_date_dim e,  
190    request_delivery_date_dim f,
191    sales_order_attribute_dim g,
192    entry_date_dim h,
193    customer_zip_code_dim i,  
194    shipping_zip_code_dim j,  
195    rds.customer k,
196    rds.cdc_time l
197 WHERE
198    a.order_status = 'N'
199AND a.customer_number = c.customer_number    
200AND a.status_date >= c.effective_date    
201AND a.status_date < c.expiry_date
202AND a.customer_number = k.customer_number  
203AND k.customer_zip_code = i.customer_zip_code  
204AND a.status_date >= i.effective_date  
205AND a.status_date <= i.expiry_date  
206AND k.shipping_zip_code = j.shipping_zip_code  
207AND a.status_date >= j.effective_date  
208AND a.status_date <= j.expiry_date    
209AND a.product_code = d.product_code    
210AND a.status_date >= d.effective_date    
211AND a.status_date < d.expiry_date    
212AND to_date(a.status_date) = e.order_date
213AND to_date(a.entry_date) = h.entry_date  
214AND to_date(a.request_delivery_date) = f.request_delivery_date
215AND a.verification_ind = g.verification_ind  
216AND a.credit_check_flag = g.credit_check_flag  
217AND a.new_customer_ind = g.new_customer_ind  
218AND a.web_order_flag = g.web_order_flag
219AND a.entry_date >= l.last_load AND a.entry_date < l.current_load ;    
220
221-- 重载PA客户维度    
222TRUNCATE TABLE pa_customer_dim;      
223INSERT INTO pa_customer_dim      
224SELECT DISTINCT a.*            
225  FROM customer_dim a,
226       sales_order_fact b,
227       customer_zip_code_dim c    
228 WHERE c.customer_state = 'PA'
229   AND b.customer_zip_code_sk = c.customer_zip_code_sk
230   AND a.customer_sk = b.customer_sk;
231
232-- 处理分配库房、打包、配送和收货四个状态
233DROP TABLE IF EXISTS tmp;
234CREATE TABLE tmp AS
235select t0.order_number order_number,
236       t0.customer_sk customer_sk,
237       t0.customer_zip_code_sk,
238       t0.shipping_zip_code_sk,
239       t0.product_sk product_sk,
240       t0.sales_order_attribute_sk,
241       t0.order_date_sk order_date_sk,
242       t0.entry_date_sk entry_date_sk,
243       t2.allocate_date_sk allocate_date_sk,
244       t1.quantity allocate_quantity,
245       t0.packing_date_sk packing_date_sk,
246       t0.packing_quantity packing_quantity,
247       t0.ship_date_sk ship_date_sk,
248       t0.ship_quantity ship_quantity,
249       t0.receive_date_sk receive_date_sk,
250       t0.receive_quantity receive_quantity,
251       t0.request_delivery_date_sk request_delivery_date_sk,
252       t0.order_amount order_amount,
253       t0.order_quantity order_quantity
254  from sales_order_fact t0,
255       rds.sales_order t1,
256       allocate_date_dim t2,
257       rds.cdc_time t4
258 where t0.order_number = t1.order_number and t1.order_status = 'A'
259   and to_date(t1.status_date) = t2.allocate_date
260   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
261
262DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
263INSERT INTO sales_order_fact SELECT * FROM tmp;
264
265DROP TABLE IF EXISTS tmp;
266CREATE TABLE tmp AS
267select t0.order_number order_number,
268       t0.customer_sk customer_sk,
269       t0.customer_zip_code_sk,
270       t0.shipping_zip_code_sk,
271       t0.product_sk product_sk,
272       t0.sales_order_attribute_sk,
273       t0.order_date_sk order_date_sk,
274       t0.entry_date_sk entry_date_sk,
275       t0.allocate_date_sk allocate_date_sk,
276       t0.allocate_quantity allocate_quantity,
277       t2.packing_date_sk packing_date_sk,
278       t1.quantity packing_quantity,
279       t0.ship_date_sk ship_date_sk,
280       t0.ship_quantity ship_quantity,
281       t0.receive_date_sk receive_date_sk,
282       t0.receive_quantity receive_quantity,
283       t0.request_delivery_date_sk request_delivery_date_sk,
284       t0.order_amount order_amount,
285       t0.order_quantity order_quantity
286  from sales_order_fact t0,
287       rds.sales_order t1,
288       packing_date_dim t2,
289       rds.cdc_time t4
290 where t0.order_number = t1.order_number and t1.order_status = 'P'
291   and to_date(t1.status_date) = t2.packing_date
292   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
293  
294DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
295INSERT INTO sales_order_fact SELECT * FROM tmp;
296
297DROP TABLE IF EXISTS tmp;
298CREATE TABLE tmp AS
299select t0.order_number order_number,
300       t0.customer_sk customer_sk,
301       t0.customer_zip_code_sk,
302       t0.shipping_zip_code_sk,
303       t0.product_sk product_sk,
304       t0.sales_order_attribute_sk,
305       t0.order_date_sk order_date_sk,
306       t0.entry_date_sk entry_date_sk,
307       t0.allocate_date_sk allocate_date_sk,
308       t0.allocate_quantity allocate_quantity,
309       t0.packing_date_sk packing_date_sk,
310       t0.packing_quantity packing_quantity,
311       t2.ship_date_sk ship_date_sk,
312       t1.quantity ship_quantity,
313       t0.receive_date_sk receive_date_sk,
314       t0.receive_quantity receive_quantity,
315       t0.request_delivery_date_sk request_delivery_date_sk,
316       t0.order_amount order_amount,
317       t0.order_quantity order_quantity
318  from sales_order_fact t0,
319       rds.sales_order t1,
320       ship_date_dim t2,
321       rds.cdc_time t4
322 where t0.order_number = t1.order_number and t1.order_status = 'S'
323   and to_date(t1.status_date) = t2.ship_date
324   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
325  
326DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
327INSERT INTO sales_order_fact SELECT * FROM tmp;
328
329DROP TABLE IF EXISTS tmp;
330CREATE TABLE tmp AS
331select t0.order_number order_number,
332       t0.customer_sk customer_sk,
333       t0.customer_zip_code_sk,
334       t0.shipping_zip_code_sk,
335       t0.product_sk product_sk,
336       t0.sales_order_attribute_sk,
337       t0.order_date_sk order_date_sk,
338       t0.entry_date_sk entry_date_sk,
339       t0.allocate_date_sk allocate_date_sk,
340       t0.allocate_quantity allocate_quantity,
341       t0.packing_date_sk packing_date_sk,
342       t0.packing_quantity packing_quantity,
343       t0.ship_date_sk ship_date_sk,
344       t0.ship_quantity ship_quantity,
345       t2.receive_date_sk receive_date_sk,
346       t1.quantity receive_quantity,
347       t0.request_delivery_date_sk request_delivery_date_sk,
348       t0.order_amount order_amount,
349       t0.order_quantity order_quantity
350  from sales_order_fact t0,
351       rds.sales_order t1,
352       receive_date_dim t2,
353       rds.cdc_time t4
354 where t0.order_number = t1.order_number and t1.order_status = 'R'
355   and to_date(t1.status_date) = t2.receive_date
356   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
357  
358DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
359INSERT INTO sales_order_fact SELECT * FROM tmp;
360
361-- 更新时间戳表的last_load字段    
362INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;
363

1
2
1  **3. 测试修改后的定期装载**  
2

执行修改后的定期装载脚本前,需要做一些准备工作。首先对源数据的客户信息做以下两处修改:
客户编号4的客户和送货邮编从17050改为17055新增一个编号15的客户 使用下面的语句进行修改:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1update source.customer
2   set customer_street_address = '9999 Louise Dr.',
3       customer_zip_code = 17055,
4       customer_city = 'Pittsburgh',
5       shipping_address = '9999 Louise Dr.',
6       shipping_zip_code = 17055,
7       shipping_city = 'Pittsburgh'
8 where customer_number = 4;
9
10insert into source.customer
11values(15, 'Super Stores', '1000 Woodland St.', 17055, 'Pittsburgh', 'PA', '1000 Woodland St.', 17055, 'Pittsburgh', 'PA');
12
13COMMIT;
14

1
2
1 现在在装载新的客户数据前查询最后的客户和送货邮编。后面可以用改变后的信息和此查询的输出作对比。查询语句如下。  
2

1
2
3
4
5
6
7
8
9
10
11
12
13
1use dw;
2SELECT order_date_sk odsk,
3       customer_number cn,
4       customer_zip_code czc,
5       shipping_zip_code szc
6  FROM customer_zip_code_dim a,
7       shipping_zip_code_dim b,
8       sales_order_fact c,
9       customer_dim d
10 WHERE a.customer_zip_code_sk = c.customer_zip_code_sk
11   AND b.shipping_zip_code_sk = c.shipping_zip_code_sk
12   AND d.customer_sk = c.customer_sk;
13

1
2
1 然后使用下面的语句新增两条销售订单。  
2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1SET @order_date := from_unixtime(unix_timestamp('2016-08-08 00:00:01') + rand() * (unix_timestamp('2016-08-08 12:00:00') - unix_timestamp('2016-08-08 00:00:01')));      
2SET @amount := floor(1000 + rand() * 9000);    
3SET @quantity := floor(10 + rand() * 90);  
4
5INSERT INTO source.sales_order VALUES
6  (null, 144, 4, 3, 'Y', 'Y', 'Y', 'N',  @order_date, 'N', '2016-08-10',
7        @order_date, @amount, @quantity);
8      
9SET @order_date := from_unixtime(unix_timestamp('2016-08-08 12:00:00') + rand() * (unix_timestamp('2016-08-09 00:00:00') - unix_timestamp('2016-08-08 12:00:00')));      
10SET @amount := floor(1000 + rand() * 9000);    
11SET @quantity := floor(10 + rand() * 90);  
12
13INSERT INTO source.sales_order VALUES
14  (null, 145, 15, 4, 'Y', 'N', 'Y', 'N', @order_date, 'N', '2016-08-10',
15       @order_date, @amount, @quantity);
16commit;
17

1
2
1 使用下面的SQL命令修改时间窗口。  
2

1
2
1INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-08-08', '2016-08-08' FROM rds.cdc_time;
2

1
2
1 执行下面的命令定期装载。  
2

1
2
1./regular_etl.sh
2

1
2
1 查询customer_dim表,确认两个改变的客户,即编号4和15的客户,已经正确装载。  
2

1
2
3
4
5
6
7
8
9
10
11
1select customer_sk csk,
2       customer_number cnum,
3       customer_name cnam,
4       customer_street_address csd,
5       shipping_address sd,
6       version,
7       effective_date,
8       expiry_date
9  from dw.customer_dim
10 where customer_number in (4, 15);
11

1
2
1 查询结果如下图所示。  
2

查询sales_order_fact表里的两条新销售订单,确认邮编已经正确装载。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1use dw;
2select a.order_number onum,
3       f.customer_number cnum,
4       b.customer_zip_code czc,
5       c.shipping_zip_code szc,
6       g.product_code pc,
7       d.order_date od,
8       e.entry_date ed,
9       a.order_amount,
10       a.order_quantity
11  from sales_order_fact a,
12       customer_zip_code_dim b,
13       shipping_zip_code_dim c,
14       order_date_dim d,
15       entry_date_dim e,
16       customer_dim f,
17       product_dim g
18 where a.order_number IN (144, 145)
19   and a.customer_sk = f.customer_sk
20   and a.product_sk = g.product_sk
21   and a.customer_zip_code_sk = b.customer_zip_code_sk
22   and a.shipping_zip_code_sk = c.shipping_zip_code_sk
23   and a.order_date_sk = d.order_date_sk
24   and a.entry_date_sk = e.entry_date_sk;
25

1
2
1 查询结果如下图所示。  
2

查询pa_customer_dim表,确认PA客户正确装载。


1
2
3
4
5
6
7
8
9
10
1select customer_sk csk,
2       customer_number cnum,
3       customer_name cnam,
4       customer_street_address csa,
5       shipping_address sad,
6       version,
7       effective_date,
8       expiry_date
9  from dw.pa_customer_dim;
10

1
2
1 查询结果如下图所示。  
2

4. 修改产品定期装载
类似于对定期数据仓库装载的修改,需要删除工厂维度导入里所有与邮编相关的列,并在产品事实表导入时使用工厂邮编代理键。修改后的regular_etl_daily_production.sql脚本如下所示。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
1-- 设置环境与时间窗口
2!run /root/set_time.sql
3
4-- 工厂信息很少修改,一般不需要保留历史,所以使用SCD1
5drop table if exists tmp;
6create table tmp as
7select a.factory_sk,
8       a.factory_code,
9       b.factory_name,
10       b.factory_street_address,
11       a.version,
12       a.effective_date,
13       a.expiry_date
14  from factory_dim a,rds.factory_master b
15 where a.factory_code = b.factory_code and
16     !(a.factory_name <=> b.factory_name
17   and a.factory_street_address <=> b.factory_street_address
18   );
19
20delete from factory_dim where factory_dim.factory_sk in (select factory_sk from tmp);
21insert into factory_dim select * from tmp;
22
23-- 添加新的工厂信息
24INSERT INTO factory_dim    
25SELECT    
26    ROW_NUMBER() OVER (ORDER BY t1.factory_code) + t2.sk_max,    
27    t1.factory_code,    
28    t1.factory_name,    
29    t1.factory_street_address,    
30    1,    
31    ${hivevar:pre_date},    
32    ${hivevar:max_date}    
33FROM      
34(      
35SELECT t1.* FROM rds.factory_master t1 LEFT JOIN factory_dim t2 ON t1.factory_code = t2.factory_code      
36 WHERE t2.factory_sk IS NULL) t1      
37CROSS JOIN      
38(SELECT COALESCE(MAX(factory_sk),0) sk_max FROM factory_dim) t2;
39
40-- 装载每日产品事实表
41INSERT INTO production_fact  
42SELECT  
43  b.product_sk  
44, c.date_sk  
45, d.factory_sk  
46, e.factory_zip_code_sk
47, production_quantity  
48FROM  
49  rds.daily_production a  
50, product_dim b  
51, date_dim c  
52, factory_dim d
53, factory_zip_code_dim e
54, rds.factory_master f
55WHERE  
56    production_date = ${hivevar:pre_date}
57AND a.product_code = b.product_code  
58AND a.production_date >= b.effective_date  
59AND a.production_date <= b.expiry_date
60AND a.factory_code = f.factory_code  
61AND f.factory_zip_code = e.factory_zip_code  
62AND a.production_date >= e.effective_date  
63AND a.production_date < e.expiry_date  
64AND a.production_date = c.date  
65AND a.factory_code = d.factory_code ;
66

1
2
1 **5. 测试修改后的产品定期装载**  
2

添加一个新的工厂信息。


1
2
3
4
1insert into source.factory_master
2values (5,'Fifth Factory','90909 McNicholds Blvd.',17055,'Pittsburgh','PA');
3commit;
4

1
2
1 向daily_production表里添加三个日常产品记录。  
2

1
2
3
4
5
6
1INSERT INTO source.daily_production VALUES
2  (1, '2016-08-08', 3, 400 )
3, (3, '2016-08-08', 4, 200 )
4, (5, '2016-08-08', 5, 100 );
5commit;
6

1
2
1 修改时间窗口。  
2

1
2
1INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-08-08', '2016-08-08' FROM rds.cdc_time;
2

1
2
1 执行产品定期装载。  
2

1
2
1./regular_etl_daily_production.sh
2

1
2
1 查询factory_dim,确认导入是正确的。  
2

1
2
3
4
5
6
7
8
9
1select factory_sk,
2       factory_code,
3       factory_name,
4       factory_street_address,
5       version,
6       effective_date,
7       expiry_date    
8  from dw.factory_dim;
9

1
2
1 查询结果如下图所示。  
2

查询production_fact表确认三个新的日常产品被正确装载。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1use dw;
2select e.product_code pc,
3       b.date,
4       c.factory_code fc,
5       d.factory_zip_code fzc,
6       a.production_quantity qty
7  from production_fact a,
8       date_dim b,
9       factory_dim c,
10       factory_zip_code_dim d,
11       product_dim e
12 where a.product_sk = e.product_sk
13   and a.production_date_sk = b.date_sk
14   and a.factory_sk = c.factory_sk
15   and a.factory_zip_code_sk = d.factory_zip_code_sk;
16

查询结果如下图所示。

基于hadoop生态圈的数据仓库实践——进阶技术(十五)

给TA打赏
共{{data.count}}人
人已打赏
安全运维

MongoDB数据建模小案例:朋友圈评论内容管理

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索