十五、维度合并
随着数据仓库中维度的增加,我们会发现有些通用的数据存在于多个维度中。例如,客户维度的客户地址相关信息、送货地址相关信息和工厂维度里都有邮编、城市和州。本节说明如何把三个维度里的邮编相关信息合并到一个新的维度。
1. 修改数据仓库模式
为了合并维度,需要改变数据仓库模式。下图显示了修改后的模式。新增了一个zip_code_dim表,sales_order_fact和production_fact表的结构也做了相应的修改。注意图中只显示了与邮编维度相关的表。
zip_code_dim表与两个事实表相关联。这些关系替换了这两个事实表与客户维度、工厂维度的关系。sales_order_fact表需要两个关系,一个关联到客户地址邮编,另一个关联到送货地址邮编。与production_fact表只有一个关系,所以在这个事实表里只增加了工厂地址邮编代理键。
下面的脚本用于修改数据仓库模式,所做的修改如下。
创建邮编维度表zip_code_dim。初始装载邮编相关数据基于zip_code_dim表创建customer_zip_code_dim和shipping_zip_code_dim视图。在sales_order_fact表上增加customer_zip_code_sk和shipping_zip_code_sk列。基于已有的客户邮编和送货邮编初始装载两个邮编代理键在customer_dim表上删除客户和送货邮编及其它们的城市和州列。在pa_customer_dim上删除客户的城市、州和邮编列。基于zip_code_dim表创建factory_zip_code_dim视图。给production_fact表增加factory_zip_code_sk列。从现有的工厂邮编装载factory_zip_code_sk值。在factory_dim表上删除工厂编码及其它们的城市和州列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
| 1use dw;
2
3-- 建立地址维度表
4create table zip_code_dim (
5 zip_code_sk int,
6 zip_code int,
7 city varchar(30),
8 state varchar(2),
9 version int,
10 effective_date date,
11 expiry_date date
12)
13clustered by (zip_code_sk) into 8 buckets
14stored as orc tblproperties ('transactional'='true');
15
16-- 初始装载邮编相关数据
17insert into zip_code_dim values (1,17050,'pittsburgh','PA',1,'1900-01-01','2200-01-01');
18insert into zip_code_dim values (2,17051,'mc veytown','PA',1,'1900-01-01','2200-01-01');
19insert into zip_code_dim values (3,17052,'mapleton depot','PA',1,'1900-01-01','2200-01-01');
20insert into zip_code_dim values (4,17053,'marysville','PA',1,'1900-01-01','2200-01-01');
21insert into zip_code_dim values (5,17054,'mattawana','PA',1,'1900-01-01','2200-01-01');
22insert into zip_code_dim values (6,17055,'mechanicsburg','PA',1,'1900-01-01','2200-01-01');
23insert into zip_code_dim values (7,44102,'cleveland','OH',1,'1900-01-01','2200-01-01');
24
25-- 创建视图
26create view customer_zip_code_dim (customer_zip_code_sk , customer_zip_code , customer_city , customer_state , version , effective_date , expiry_date) as
27 select
28 zip_code_sk,
29 zip_code,
30 city,
31 state,
32 version,
33 effective_date,
34 expiry_date
35 from
36 zip_code_dim;
37
38create view shipping_zip_code_dim (shipping_zip_code_sk , shipping_zip_code , shipping_city , shipping_state , version , effective_date , expiry_date) as
39 select
40 zip_code_sk,
41 zip_code,
42 city,
43 state,
44 version,
45 effective_date,
46 expiry_date
47 from
48 zip_code_dim;
49
50-- 添加邮编代理键
51alter table sales_order_fact rename to sales_order_fact_old;
52create table sales_order_fact(
53 order_number int COMMENT 'order number',
54 customer_sk int COMMENT 'customer surrogate key',
55 customer_zip_code_sk int COMMENT 'customer zip code sk',
56 shipping_zip_code_sk int COMMENT 'shipping zip code sk',
57 product_sk int COMMENT 'product surrogate key',
58 sales_order_attribute_sk int COMMENT 'sales order attribute surrogate key',
59 order_date_sk int COMMENT 'order date surrogate key',
60 entry_date_sk int COMMENT 'entry date surrogate key',
61 allocate_date_sk int COMMENT 'allocate date surrogate key',
62 allocate_quantity int COMMENT 'allocate quantity',
63 packing_date_sk int COMMENT 'packing date surrogate key',
64 packing_quantity int COMMENT 'packing quantity',
65 ship_date_sk int COMMENT 'ship date surrogate key',
66 ship_quantity int COMMENT 'ship quantity',
67 receive_date_sk int COMMENT 'receive date surrogate key',
68 receive_quantity int COMMENT 'receive quantity',
69 request_delivery_date_sk int COMMENT 'request delivery date surrogate key',
70 order_amount decimal(10,2) COMMENT 'order amount',
71 order_quantity int COMMENT 'order quantity')
72clustered by (order_number) into 8 buckets
73stored as orc tblproperties ('transactional'='true');
74insert into sales_order_fact
75select order_number,
76 customer_sk,
77 null,
78 null,
79 product_sk,
80 sales_order_attribute_sk,
81 order_date_sk,
82 entry_date_sk,
83 allocate_date_sk,
84 allocate_quantity,
85 packing_date_sk,
86 packing_quantity,
87 ship_date_sk,
88 ship_quantity,
89 receive_date_sk,
90 receive_quantity,
91 request_delivery_date_sk,
92 order_amount,
93 order_quantity
94 from sales_order_fact_old;
95drop table sales_order_fact_old;
96
97-- 初始装载两个邮编代理键
98drop table if exists tmp;
99create table tmp as
100select t1.order_number,
101 t1.customer_sk,
102 t2.customer_zip_code_sk,
103 t3.shipping_zip_code_sk,
104 t1.product_sk,
105 t1.sales_order_attribute_sk,
106 t1.order_date_sk,
107 t1.entry_date_sk,
108 t1.allocate_date_sk,
109 t1.allocate_quantity,
110 t1.packing_date_sk,
111 t1.packing_quantity,
112 t1.ship_date_sk,
113 t1.ship_quantity,
114 t1.receive_date_sk,
115 t1.receive_quantity,
116 t1.request_delivery_date_sk,
117 t1.order_amount,
118 t1.order_quantity
119 from sales_order_fact t1
120 left join
121(select a.order_number order_number,c.customer_zip_code_sk customer_zip_code_sk
122 from sales_order_fact a,
123 customer_dim b,
124 customer_zip_code_dim c
125 where a.customer_sk = b.customer_sk
126 and b.customer_zip_code = c.customer_zip_code) t2 on t1.order_number = t2.order_number
127 left join
128(select a.order_number order_number,c.shipping_zip_code_sk shipping_zip_code_sk
129 from sales_order_fact a,
130 customer_dim b,
131 shipping_zip_code_dim c
132 where a.customer_sk = b.customer_sk
133 and b.shipping_zip_code = c.shipping_zip_code) t3 on t1.order_number = t3.order_number;
134delete from sales_order_fact where sales_order_fact.order_number in (select order_number from tmp);
135insert into sales_order_fact select * from tmp;
136
137alter table customer_dim rename to customer_dim_old;
138create table customer_dim
139(customer_sk int COMMENT 'surrogate key',
140 customer_number int COMMENT 'number',
141 customer_name varchar(50) COMMENT 'name',
142 customer_street_address varchar(50) COMMENT 'address',
143 shipping_address varchar(50) COMMENT 'shipping_address',
144 version int COMMENT 'version',
145 effective_date date COMMENT 'effective date',
146 expiry_date date COMMENT 'expiry date')
147clustered by (customer_sk) into 8 buckets
148stored as orc tblproperties ('transactional'='true');
149insert into customer_dim
150select customer_sk,
151 customer_number,
152 customer_name,
153 customer_street_address,
154 shipping_address,
155 version,
156 effective_date,
157 expiry_date
158 from customer_dim_old;
159drop table customer_dim_old;
160
161alter table pa_customer_dim rename to pa_customer_dim_old;
162create table pa_customer_dim
163(customer_sk int,
164 customer_number int,
165 customer_name varchar(50),
166 customer_street_address varchar(50),
167 shipping_address varchar(50),
168 version int,
169 effective_date date,
170 expiry_date date)
171clustered by (customer_sk) into 8 buckets
172stored as orc tblproperties ('transactional'='true');
173insert into pa_customer_dim
174select customer_sk,
175 customer_number,
176 customer_name,
177 customer_street_address,
178 shipping_address,
179 version,
180 effective_date,
181 expiry_date
182 from pa_customer_dim_old;
183drop table pa_customer_dim_old;
184
185-- 创建视图
186create view factory_zip_code_dim (factory_zip_code_sk , factory_zip_code , factory_city , factory_state , version,effective_date , expiry_date) as
187 select
188 zip_code_sk,
189 zip_code,
190 city,
191 state,
192 version,
193 effective_date,
194 expiry_date
195 from
196 zip_code_dim;
197
198alter table production_fact rename to production_fact_old;
199create table production_fact
200(product_sk int,
201 production_date_sk int,
202 factory_sk int,
203 factory_zip_code_sk int,
204 production_quantity int);
205-- 初始装载邮编代理键
206insert into production_fact
207select a.product_sk,
208 a.production_date_sk,
209 a.factory_sk,
210 c.factory_zip_code_sk,
211 a.production_quantity
212 from production_fact_old a,
213 factory_dim b,
214 factory_zip_code_dim c
215 where a.factory_sk = b.factory_sk
216 and b.factory_zip_code = c.factory_zip_code;
217drop table production_fact_old;
218
219-- 在factory_dim表上删除工厂编码及其它们的城市和州列
220alter table factory_dim rename to factory_dim_old;
221create table factory_dim
222(factory_sk int,
223 factory_code int,
224 factory_name varchar(30),
225 factory_street_address varchar(50),
226 version int,
227 effective_date date,
228 expiry_date date)
229clustered by (factory_sk) into 8 buckets
230stored as orc tblproperties ('transactional'='true');
231insert into factory_dim
232select factory_sk,
233 factory_code,
234 factory_name,
235 factory_street_address,
236 version,
237 effective_date,
238 expiry_date
239 from factory_dim_old;
240drop table factory_dim_old;
241 |
1 2
| 1 执行完修改数据仓库模式的脚本后,可以查询customer_zip_code_dim、shipping_code_dim、factory_zip_code_dim维度表和sales_order_fact、production_fact事实表,确认邮编已经被成功分离。
2 |
2. 修改定期装载脚本
定期装载有三个地方的修改:
删除客户维度装载里所有邮编信息相关的列,因为客户维度里不再有客户邮编和送货邮编相关信息。在事实表中引用客户邮编视图和送货邮编视图中的代理键。修改pa_customer_dim装载,因为需要从销售订单事实表的customer_zip_code_sk获取客户邮编。 修改后的regular_etl.sql脚本如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
| 1-- 设置环境与时间窗口
2!run /root/set_time.sql
3
4-- 装载customer维度
5-- 设置已删除记录和地址相关列上SCD2的过期,用<=>运算符处理NULL值。
6UPDATE customer_dim
7 SET expiry_date = ${hivevar:pre_date}
8 WHERE customer_dim.customer_sk IN
9(SELECT a.customer_sk
10 FROM (SELECT customer_sk,
11 customer_number,
12 customer_street_address,
13 shipping_address
14 FROM customer_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN
15 rds.customer b ON a.customer_number = b.customer_number
16 WHERE b.customer_number IS NULL OR
17 ( !(a.customer_street_address <=> b.customer_street_address)
18 OR !(a.shipping_address <=> b.shipping_address)
19 ));
20
21-- 处理customer_street_addresses列上SCD2的新增行
22INSERT INTO customer_dim
23SELECT
24 ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
25 t1.customer_number,
26 t1.customer_name,
27 t1.customer_street_address,
28 t1.shipping_address,
29 t1.version,
30 t1.effective_date,
31 t1.expiry_date
32FROM
33(
34SELECT
35 t2.customer_number customer_number,
36 t2.customer_name customer_name,
37 t2.customer_street_address customer_street_address,
38 t2.shipping_address shipping_address,
39 t1.version + 1 version,
40 ${hivevar:pre_date} effective_date,
41 ${hivevar:max_date} expiry_date
42 FROM customer_dim t1
43INNER JOIN rds.customer t2
44 ON t1.customer_number = t2.customer_number
45 AND t1.expiry_date = ${hivevar:pre_date}
46 LEFT JOIN customer_dim t3
47 ON t1.customer_number = t3.customer_number
48 AND t3.expiry_date = ${hivevar:max_date}
49WHERE (!(t1.customer_street_address <=> t2.customer_street_address)
50 OR !(t1.shipping_address <=> t2.shipping_address)
51 )
52 AND t3.customer_sk IS NULL) t1
53CROSS JOIN
54(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;
55
56-- 处理customer_name列上的SCD1
57-- 因为hive的update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update
58-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录
59DROP TABLE IF EXISTS tmp;
60CREATE TABLE tmp AS
61SELECT
62 a.customer_sk,
63 a.customer_number,
64 b.customer_name,
65 a.customer_street_address,
66 a.shipping_address,
67 a.version,
68 a.effective_date,
69 a.expiry_date
70 FROM customer_dim a, rds.customer b
71 WHERE a.customer_number = b.customer_number AND !(a.customer_name <=> b.customer_name);
72DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);
73INSERT INTO customer_dim SELECT * FROM tmp;
74
75-- 处理新增的customer记录
76INSERT INTO customer_dim
77SELECT
78 ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
79 t1.customer_number,
80 t1.customer_name,
81 t1.customer_street_address,
82 t1.shipping_address,
83 1,
84 ${hivevar:pre_date},
85 ${hivevar:max_date}
86FROM
87(
88SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number
89 WHERE t2.customer_sk IS NULL) t1
90CROSS JOIN
91(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;
92
93-- 装载product维度
94-- 设置已删除记录和product_name、product_category列上SCD2的过期
95UPDATE product_dim
96 SET expiry_date = ${hivevar:pre_date}
97 WHERE product_dim.product_sk IN
98(SELECT a.product_sk
99 FROM (SELECT product_sk,product_code,product_name,product_category
100 FROM product_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN
101 rds.product b ON a.product_code = b.product_code
102 WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));
103
104-- 处理product_name、product_category列上SCD2的新增行
105INSERT INTO product_dim
106SELECT
107 ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,
108 t1.product_code,
109 t1.product_name,
110 t1.product_category,
111 t1.version,
112 t1.effective_date,
113 t1.expiry_date
114FROM
115(
116SELECT
117 t2.product_code product_code,
118 t2.product_name product_name,
119 t2.product_category product_category,
120 t1.version + 1 version,
121 ${hivevar:pre_date} effective_date,
122 ${hivevar:max_date} expiry_date
123 FROM product_dim t1
124INNER JOIN rds.product t2
125 ON t1.product_code = t2.product_code
126 AND t1.expiry_date = ${hivevar:pre_date}
127 LEFT JOIN product_dim t3
128 ON t1.product_code = t3.product_code
129 AND t3.expiry_date = ${hivevar:max_date}
130WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1
131CROSS JOIN
132(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;
133
134-- 处理新增的product记录
135INSERT INTO product_dim
136SELECT
137 ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,
138 t1.product_code,
139 t1.product_name,
140 t1.product_category,
141 1,
142 ${hivevar:pre_date},
143 ${hivevar:max_date}
144FROM
145(
146SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code
147 WHERE t2.product_sk IS NULL) t1
148CROSS JOIN
149(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;
150
151-- 装载product_count_fact表
152insert overwrite table product_count_fact
153select product_sk,date_sk
154 from (select a.product_sk product_sk,
155 a.product_code product_code,
156 b.date_sk date_sk,
157 row_number() over (partition by a.product_code order by b.date_sk) rn
158 from product_dim a,date_dim b
159 where a.effective_date = b.date) t
160 where rn = 1;
161
162-- 装载销售订单事实表
163-- 前一天新增的销售订单
164INSERT INTO sales_order_fact
165SELECT
166 a.order_number,
167 customer_sk,
168 i.customer_zip_code_sk,
169 j.shipping_zip_code_sk,
170 product_sk,
171 g.sales_order_attribute_sk,
172 e.order_date_sk,
173 h.entry_date_sk,
174 null,
175 null,
176 null,
177 null,
178 null,
179 null,
180 null,
181 null,
182 f.request_delivery_date_sk,
183 order_amount,
184 quantity
185 FROM
186 rds.sales_order a,
187 customer_dim c,
188 product_dim d,
189 order_date_dim e,
190 request_delivery_date_dim f,
191 sales_order_attribute_dim g,
192 entry_date_dim h,
193 customer_zip_code_dim i,
194 shipping_zip_code_dim j,
195 rds.customer k,
196 rds.cdc_time l
197 WHERE
198 a.order_status = 'N'
199AND a.customer_number = c.customer_number
200AND a.status_date >= c.effective_date
201AND a.status_date < c.expiry_date
202AND a.customer_number = k.customer_number
203AND k.customer_zip_code = i.customer_zip_code
204AND a.status_date >= i.effective_date
205AND a.status_date <= i.expiry_date
206AND k.shipping_zip_code = j.shipping_zip_code
207AND a.status_date >= j.effective_date
208AND a.status_date <= j.expiry_date
209AND a.product_code = d.product_code
210AND a.status_date >= d.effective_date
211AND a.status_date < d.expiry_date
212AND to_date(a.status_date) = e.order_date
213AND to_date(a.entry_date) = h.entry_date
214AND to_date(a.request_delivery_date) = f.request_delivery_date
215AND a.verification_ind = g.verification_ind
216AND a.credit_check_flag = g.credit_check_flag
217AND a.new_customer_ind = g.new_customer_ind
218AND a.web_order_flag = g.web_order_flag
219AND a.entry_date >= l.last_load AND a.entry_date < l.current_load ;
220
221-- 重载PA客户维度
222TRUNCATE TABLE pa_customer_dim;
223INSERT INTO pa_customer_dim
224SELECT DISTINCT a.*
225 FROM customer_dim a,
226 sales_order_fact b,
227 customer_zip_code_dim c
228 WHERE c.customer_state = 'PA'
229 AND b.customer_zip_code_sk = c.customer_zip_code_sk
230 AND a.customer_sk = b.customer_sk;
231
232-- 处理分配库房、打包、配送和收货四个状态
233DROP TABLE IF EXISTS tmp;
234CREATE TABLE tmp AS
235select t0.order_number order_number,
236 t0.customer_sk customer_sk,
237 t0.customer_zip_code_sk,
238 t0.shipping_zip_code_sk,
239 t0.product_sk product_sk,
240 t0.sales_order_attribute_sk,
241 t0.order_date_sk order_date_sk,
242 t0.entry_date_sk entry_date_sk,
243 t2.allocate_date_sk allocate_date_sk,
244 t1.quantity allocate_quantity,
245 t0.packing_date_sk packing_date_sk,
246 t0.packing_quantity packing_quantity,
247 t0.ship_date_sk ship_date_sk,
248 t0.ship_quantity ship_quantity,
249 t0.receive_date_sk receive_date_sk,
250 t0.receive_quantity receive_quantity,
251 t0.request_delivery_date_sk request_delivery_date_sk,
252 t0.order_amount order_amount,
253 t0.order_quantity order_quantity
254 from sales_order_fact t0,
255 rds.sales_order t1,
256 allocate_date_dim t2,
257 rds.cdc_time t4
258 where t0.order_number = t1.order_number and t1.order_status = 'A'
259 and to_date(t1.status_date) = t2.allocate_date
260 and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
261
262DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
263INSERT INTO sales_order_fact SELECT * FROM tmp;
264
265DROP TABLE IF EXISTS tmp;
266CREATE TABLE tmp AS
267select t0.order_number order_number,
268 t0.customer_sk customer_sk,
269 t0.customer_zip_code_sk,
270 t0.shipping_zip_code_sk,
271 t0.product_sk product_sk,
272 t0.sales_order_attribute_sk,
273 t0.order_date_sk order_date_sk,
274 t0.entry_date_sk entry_date_sk,
275 t0.allocate_date_sk allocate_date_sk,
276 t0.allocate_quantity allocate_quantity,
277 t2.packing_date_sk packing_date_sk,
278 t1.quantity packing_quantity,
279 t0.ship_date_sk ship_date_sk,
280 t0.ship_quantity ship_quantity,
281 t0.receive_date_sk receive_date_sk,
282 t0.receive_quantity receive_quantity,
283 t0.request_delivery_date_sk request_delivery_date_sk,
284 t0.order_amount order_amount,
285 t0.order_quantity order_quantity
286 from sales_order_fact t0,
287 rds.sales_order t1,
288 packing_date_dim t2,
289 rds.cdc_time t4
290 where t0.order_number = t1.order_number and t1.order_status = 'P'
291 and to_date(t1.status_date) = t2.packing_date
292 and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
293
294DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
295INSERT INTO sales_order_fact SELECT * FROM tmp;
296
297DROP TABLE IF EXISTS tmp;
298CREATE TABLE tmp AS
299select t0.order_number order_number,
300 t0.customer_sk customer_sk,
301 t0.customer_zip_code_sk,
302 t0.shipping_zip_code_sk,
303 t0.product_sk product_sk,
304 t0.sales_order_attribute_sk,
305 t0.order_date_sk order_date_sk,
306 t0.entry_date_sk entry_date_sk,
307 t0.allocate_date_sk allocate_date_sk,
308 t0.allocate_quantity allocate_quantity,
309 t0.packing_date_sk packing_date_sk,
310 t0.packing_quantity packing_quantity,
311 t2.ship_date_sk ship_date_sk,
312 t1.quantity ship_quantity,
313 t0.receive_date_sk receive_date_sk,
314 t0.receive_quantity receive_quantity,
315 t0.request_delivery_date_sk request_delivery_date_sk,
316 t0.order_amount order_amount,
317 t0.order_quantity order_quantity
318 from sales_order_fact t0,
319 rds.sales_order t1,
320 ship_date_dim t2,
321 rds.cdc_time t4
322 where t0.order_number = t1.order_number and t1.order_status = 'S'
323 and to_date(t1.status_date) = t2.ship_date
324 and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
325
326DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
327INSERT INTO sales_order_fact SELECT * FROM tmp;
328
329DROP TABLE IF EXISTS tmp;
330CREATE TABLE tmp AS
331select t0.order_number order_number,
332 t0.customer_sk customer_sk,
333 t0.customer_zip_code_sk,
334 t0.shipping_zip_code_sk,
335 t0.product_sk product_sk,
336 t0.sales_order_attribute_sk,
337 t0.order_date_sk order_date_sk,
338 t0.entry_date_sk entry_date_sk,
339 t0.allocate_date_sk allocate_date_sk,
340 t0.allocate_quantity allocate_quantity,
341 t0.packing_date_sk packing_date_sk,
342 t0.packing_quantity packing_quantity,
343 t0.ship_date_sk ship_date_sk,
344 t0.ship_quantity ship_quantity,
345 t2.receive_date_sk receive_date_sk,
346 t1.quantity receive_quantity,
347 t0.request_delivery_date_sk request_delivery_date_sk,
348 t0.order_amount order_amount,
349 t0.order_quantity order_quantity
350 from sales_order_fact t0,
351 rds.sales_order t1,
352 receive_date_dim t2,
353 rds.cdc_time t4
354 where t0.order_number = t1.order_number and t1.order_status = 'R'
355 and to_date(t1.status_date) = t2.receive_date
356 and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
357
358DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
359INSERT INTO sales_order_fact SELECT * FROM tmp;
360
361-- 更新时间戳表的last_load字段
362INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;
363 |
执行修改后的定期装载脚本前,需要做一些准备工作。首先对源数据的客户信息做以下两处修改:
客户编号4的客户和送货邮编从17050改为17055新增一个编号15的客户 使用下面的语句进行修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 1update source.customer
2 set customer_street_address = '9999 Louise Dr.',
3 customer_zip_code = 17055,
4 customer_city = 'Pittsburgh',
5 shipping_address = '9999 Louise Dr.',
6 shipping_zip_code = 17055,
7 shipping_city = 'Pittsburgh'
8 where customer_number = 4;
9
10insert into source.customer
11values(15, 'Super Stores', '1000 Woodland St.', 17055, 'Pittsburgh', 'PA', '1000 Woodland St.', 17055, 'Pittsburgh', 'PA');
12
13COMMIT;
14 |
1 2
| 1 现在在装载新的客户数据前查询最后的客户和送货邮编。后面可以用改变后的信息和此查询的输出作对比。查询语句如下。
2 |
1 2 3 4 5 6 7 8 9 10 11 12 13
| 1use dw;
2SELECT order_date_sk odsk,
3 customer_number cn,
4 customer_zip_code czc,
5 shipping_zip_code szc
6 FROM customer_zip_code_dim a,
7 shipping_zip_code_dim b,
8 sales_order_fact c,
9 customer_dim d
10 WHERE a.customer_zip_code_sk = c.customer_zip_code_sk
11 AND b.shipping_zip_code_sk = c.shipping_zip_code_sk
12 AND d.customer_sk = c.customer_sk;
13 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| 1SET @order_date := from_unixtime(unix_timestamp('2016-08-08 00:00:01') + rand() * (unix_timestamp('2016-08-08 12:00:00') - unix_timestamp('2016-08-08 00:00:01')));
2SET @amount := floor(1000 + rand() * 9000);
3SET @quantity := floor(10 + rand() * 90);
4
5INSERT INTO source.sales_order VALUES
6 (null, 144, 4, 3, 'Y', 'Y', 'Y', 'N', @order_date, 'N', '2016-08-10',
7 @order_date, @amount, @quantity);
8
9SET @order_date := from_unixtime(unix_timestamp('2016-08-08 12:00:00') + rand() * (unix_timestamp('2016-08-09 00:00:00') - unix_timestamp('2016-08-08 12:00:00')));
10SET @amount := floor(1000 + rand() * 9000);
11SET @quantity := floor(10 + rand() * 90);
12
13INSERT INTO source.sales_order VALUES
14 (null, 145, 15, 4, 'Y', 'N', 'Y', 'N', @order_date, 'N', '2016-08-10',
15 @order_date, @amount, @quantity);
16commit;
17 |
1 2
| 1INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-08-08', '2016-08-08' FROM rds.cdc_time;
2 |
1 2
| 1 查询customer_dim表,确认两个改变的客户,即编号4和15的客户,已经正确装载。
2 |
1 2 3 4 5 6 7 8 9 10 11
| 1select customer_sk csk,
2 customer_number cnum,
3 customer_name cnam,
4 customer_street_address csd,
5 shipping_address sd,
6 version,
7 effective_date,
8 expiry_date
9 from dw.customer_dim
10 where customer_number in (4, 15);
11 |
查询sales_order_fact表里的两条新销售订单,确认邮编已经正确装载。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| 1use dw;
2select a.order_number onum,
3 f.customer_number cnum,
4 b.customer_zip_code czc,
5 c.shipping_zip_code szc,
6 g.product_code pc,
7 d.order_date od,
8 e.entry_date ed,
9 a.order_amount,
10 a.order_quantity
11 from sales_order_fact a,
12 customer_zip_code_dim b,
13 shipping_zip_code_dim c,
14 order_date_dim d,
15 entry_date_dim e,
16 customer_dim f,
17 product_dim g
18 where a.order_number IN (144, 145)
19 and a.customer_sk = f.customer_sk
20 and a.product_sk = g.product_sk
21 and a.customer_zip_code_sk = b.customer_zip_code_sk
22 and a.shipping_zip_code_sk = c.shipping_zip_code_sk
23 and a.order_date_sk = d.order_date_sk
24 and a.entry_date_sk = e.entry_date_sk;
25 |
查询pa_customer_dim表,确认PA客户正确装载。
1 2 3 4 5 6 7 8 9 10
| 1select customer_sk csk,
2 customer_number cnum,
3 customer_name cnam,
4 customer_street_address csa,
5 shipping_address sad,
6 version,
7 effective_date,
8 expiry_date
9 from dw.pa_customer_dim;
10 |
4. 修改产品定期装载
类似于对定期数据仓库装载的修改,需要删除工厂维度导入里所有与邮编相关的列,并在产品事实表导入时使用工厂邮编代理键。修改后的regular_etl_daily_production.sql脚本如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| 1-- 设置环境与时间窗口
2!run /root/set_time.sql
3
4-- 工厂信息很少修改,一般不需要保留历史,所以使用SCD1
5drop table if exists tmp;
6create table tmp as
7select a.factory_sk,
8 a.factory_code,
9 b.factory_name,
10 b.factory_street_address,
11 a.version,
12 a.effective_date,
13 a.expiry_date
14 from factory_dim a,rds.factory_master b
15 where a.factory_code = b.factory_code and
16 !(a.factory_name <=> b.factory_name
17 and a.factory_street_address <=> b.factory_street_address
18 );
19
20delete from factory_dim where factory_dim.factory_sk in (select factory_sk from tmp);
21insert into factory_dim select * from tmp;
22
23-- 添加新的工厂信息
24INSERT INTO factory_dim
25SELECT
26 ROW_NUMBER() OVER (ORDER BY t1.factory_code) + t2.sk_max,
27 t1.factory_code,
28 t1.factory_name,
29 t1.factory_street_address,
30 1,
31 ${hivevar:pre_date},
32 ${hivevar:max_date}
33FROM
34(
35SELECT t1.* FROM rds.factory_master t1 LEFT JOIN factory_dim t2 ON t1.factory_code = t2.factory_code
36 WHERE t2.factory_sk IS NULL) t1
37CROSS JOIN
38(SELECT COALESCE(MAX(factory_sk),0) sk_max FROM factory_dim) t2;
39
40-- 装载每日产品事实表
41INSERT INTO production_fact
42SELECT
43 b.product_sk
44, c.date_sk
45, d.factory_sk
46, e.factory_zip_code_sk
47, production_quantity
48FROM
49 rds.daily_production a
50, product_dim b
51, date_dim c
52, factory_dim d
53, factory_zip_code_dim e
54, rds.factory_master f
55WHERE
56 production_date = ${hivevar:pre_date}
57AND a.product_code = b.product_code
58AND a.production_date >= b.effective_date
59AND a.production_date <= b.expiry_date
60AND a.factory_code = f.factory_code
61AND f.factory_zip_code = e.factory_zip_code
62AND a.production_date >= e.effective_date
63AND a.production_date < e.expiry_date
64AND a.production_date = c.date
65AND a.factory_code = d.factory_code ;
66 |
1 2
| 1 **5. 测试修改后的产品定期装载**
2 |
添加一个新的工厂信息。
1 2 3 4
| 1insert into source.factory_master
2values (5,'Fifth Factory','90909 McNicholds Blvd.',17055,'Pittsburgh','PA');
3commit;
4 |
1 2
| 1 向daily_production表里添加三个日常产品记录。
2 |
1 2 3 4 5 6
| 1INSERT INTO source.daily_production VALUES
2 (1, '2016-08-08', 3, 400 )
3, (3, '2016-08-08', 4, 200 )
4, (5, '2016-08-08', 5, 100 );
5commit;
6 |
1 2
| 1INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-08-08', '2016-08-08' FROM rds.cdc_time;
2 |
1 2
| 1./regular_etl_daily_production.sh
2 |
1 2
| 1 查询factory_dim,确认导入是正确的。
2 |
1 2 3 4 5 6 7 8 9
| 1select factory_sk,
2 factory_code,
3 factory_name,
4 factory_street_address,
5 version,
6 effective_date,
7 expiry_date
8 from dw.factory_dim;
9 |
查询production_fact表确认三个新的日常产品被正确装载。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| 1use dw;
2select e.product_code pc,
3 b.date,
4 c.factory_code fc,
5 d.factory_zip_code fzc,
6 a.production_quantity qty
7 from production_fact a,
8 date_dim b,
9 factory_dim c,
10 factory_zip_code_dim d,
11 product_dim e
12 where a.product_sk = e.product_sk
13 and a.production_date_sk = b.date_sk
14 and a.factory_sk = c.factory_sk
15 and a.factory_zip_code_sk = d.factory_zip_code_sk;
16 |
查询结果如下图所示。