五、快照
前面实验说明了处理维度的扩展。本节讨论两种事实表的扩展技术。
有些用户,尤其是管理者,经常要看某个特定时间点的数据。也就是说,他们需要数据的快照。周期快照和累积快照是两种常用的事实表扩展技术。
周期快照是在一个给定的时间对事实表进行一段时期的总计。例如,一个月销售订单周期快照汇总每个月底时总的销售订单金额。
累积快照用于跟踪事实表的变化。例如,数据仓库可能需要累积(存储)销售订单从下订单的时间开始,到订单中的商品被打包、运输和到达的各阶段的时间点数据来跟踪订单生命周期的进展情况。用户可能要取得在某个给定时间点,销售订单处理状态的累积快照。
下面说明周期快照和累积快照的细节问题。
1. 周期快照
下面以销售订单的月底汇总为例说明如何实现一个周期快照。
首先需要添加一个新的事实表。下图中的模式显示了一个名为month_end_sales_order_fact的新事实表。
该表中有两个度量值,month_order_amount和month_order_quantity。这两个值是不能加到sales_order_fact表中的,原因是,sales_order_fact表和新的度量值有不同的时间属性(数据的粒度不同)。sales_order_fact表包含的是每天一条记录。新的度量值要的是每月的数据。使用下面的脚本建立month_end_sales_order_fact表。
1 2 3 4 5 6 7 8 9 10 11
| 1USE dw;
2
3CREATE TABLE month_end_sales_order_fact (
4 order_month_sk INT COMMENT 'order month surrogate key',
5 product_sk INT COMMENT 'product surrogate key',
6 month_order_amount DECIMAL(10,2) COMMENT 'month order amount',
7 month_order_quantity INT COMMENT 'month order quantity'
8)
9CLUSTERED BY (order_month_sk) INTO 8 BUCKETS
10STORED AS ORC TBLPROPERTIES ('transactional'='true');
11 |
1 2
| 1 建立了month_end_sales_order_fact表后,现在需要向表中装载数据。月底销售订单事实表的数据源是已有的销售订单事实表。month_sum.sql文件用于装载月底销售订单事实表,该文件内容如下。
2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| 1-- 设置变量以支持事务
2set hive.support.concurrency=true;
3set hive.exec.dynamic.partition.mode=nonstrict;
4set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
5set hive.compactor.initiator.on=true;
6set hive.compactor.worker.threads=1;
7
8USE dw;
9
10SET hivevar:pre_month_date = add_months(current_date,-1);
11
12delete from month_end_sales_order_fact
13 where month_end_sales_order_fact.order_month_sk in
14 (select month_sk
15 from month_dim
16 where month = month(${hivevar:pre_month_date})
17 and year = year(${hivevar:pre_month_date}));
18
19insert into month_end_sales_order_fact
20select b.month_sk, a.product_sk, sum(order_amount), sum(order_quantity)
21 from sales_order_fact a,
22 month_dim b,
23 order_date_dim d -- 视图
24 where a.order_date_sk = d.order_date_sk
25 and b.month = d.month
26 and b.year = d.year
27 and b.month = month(${hivevar:pre_month_date})
28 and b.year = year(${hivevar:pre_month_date})
29 group by b.month_sk, a.product_sk ;
30 |
1 2
| 1 每个月第一天,在每天销售订单定期装载执行完后,执行此脚本,装载上个月的销售订单数据。为此需要修改Oozie的工作流定义。
2 |
(1)修改工作流作业配置文件
修改后的workflow.xml文件内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| 1<?xml version="1.0" encoding="UTF-8"?>
2<workflow-app xmlns="uri:oozie:workflow:0.1" name="regular_etl">
3 <start to="fork-node"/>
4 <fork name="fork-node">
5 <path start="sqoop-customer" />
6 <path start="sqoop-product" />
7 <path start="sqoop-sales_order" />
8 </fork>
9
10 <action name="sqoop-customer">
11 <sqoop xmlns="uri:oozie:sqoop-action:0.2">
12 <job-tracker>${jobTracker}</job-tracker>
13 <name-node>${nameNode}</name-node>
14 <arg>import</arg>
15 <arg>--connect</arg>
16 <arg>jdbc:mysql://cdh1:3306/source?useSSL=false</arg>
17 <arg>--username</arg>
18 <arg>root</arg>
19 <arg>--password</arg>
20 <arg>mypassword</arg>
21 <arg>--table</arg>
22 <arg>customer</arg>
23 <arg>--hive-import</arg>
24 <arg>--hive-table</arg>
25 <arg>rds.customer</arg>
26 <arg>--hive-overwrite</arg>
27 <file>/tmp/hive-site.xml#hive-site.xml</file>
28 <archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
29 </sqoop>
30 <ok to="joining"/>
31 <error to="fail"/>
32 </action>
33 <action name="sqoop-product">
34 <sqoop xmlns="uri:oozie:sqoop-action:0.2">
35 <job-tracker>${jobTracker}</job-tracker>
36 <name-node>${nameNode}</name-node>
37 <arg>import</arg>
38 <arg>--connect</arg>
39 <arg>jdbc:mysql://cdh1:3306/source?useSSL=false</arg>
40 <arg>--username</arg>
41 <arg>root</arg>
42 <arg>--password</arg>
43 <arg>mypassword</arg>
44 <arg>--table</arg>
45 <arg>product</arg>
46 <arg>--hive-import</arg>
47 <arg>--hive-table</arg>
48 <arg>rds.product</arg>
49 <arg>--hive-overwrite</arg>
50 <file>/tmp/hive-site.xml#hive-site.xml</file>
51 <archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
52 </sqoop>
53 <ok to="joining"/>
54 <error to="fail"/>
55 </action>
56 <action name="sqoop-sales_order">
57 <sqoop xmlns="uri:oozie:sqoop-action:0.2">
58 <job-tracker>${jobTracker}</job-tracker>
59 <name-node>${nameNode}</name-node>
60 <command>job --exec myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop</command>
61 <file>/tmp/hive-site.xml#hive-site.xml</file>
62 <archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
63 </sqoop>
64 <ok to="joining"/>
65 <error to="fail"/>
66 </action>
67
68 <join name="joining" to="hive-node"/>
69
70 <action name="hive-node">
71 <hive xmlns="uri:oozie:hive-action:0.2">
72 <job-tracker>${jobTracker}</job-tracker>
73 <name-node>${nameNode}</name-node>
74 <job-xml>/tmp/hive-site.xml</job-xml>
75 <script>/tmp/regular_etl.sql</script>
76 </hive>
77 <ok to="decision-node"/>
78 <error to="fail"/>
79 </action>
80
81 <decision name="decision-node">
82 <switch>
83 <case to="month-sum">
84 ${date eq 20}
85 </case>
86 <default to="end"/>
87 </switch>
88 </decision>
89
90 <action name="month-sum">
91 <hive xmlns="uri:oozie:hive-action:0.2">
92 <job-tracker>${jobTracker}</job-tracker>
93 <name-node>${nameNode}</name-node>
94 <job-xml>/tmp/hive-site.xml</job-xml>
95 <script>/tmp/month_sum.sql</script>
96 </hive>
97 <ok to="end"/>
98 <error to="fail"/>
99 </action>
100
101 <kill name="fail">
102 <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
103 </kill>
104 <end name="end"/>
105</workflow-app>
106 |
1 2
| 1 在该文件中增加了一个decision-node,当date参数的值等于20时,执行month_sum.sql文件后结束,否则直接结束。之所以这里是20是为了测试。month_sum.sql文件中用的是add_months(current_date,-1)取上个月的年月,因此不必要非得1号执行,任何一天都可以。这个工作流保证了每月汇总只有在每天汇总执行完后才执行,并且每月只执行一次。工作流DAG如下图所示。
2 |
(2)修改协调作业配置文件
修改后的coordinator.xml文件内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| 1<coordinator-app name="regular_etl-coord" frequency="${coord:days(1)}" start="${start}" end="${end}" timezone="${timezone}" xmlns="uri
2:oozie:coordinator:0.1">
3 <action>
4 <workflow>
5 <app-path>${workflowAppUri}</app-path>
6 <configuration>
7 <property>
8 <name>jobTracker</name>
9 <value>${jobTracker}</value>
10 </property>
11 <property>
12 <name>nameNode</name>
13 <value>${nameNode}</value>
14 </property>
15 <property>
16 <name>queueName</name>
17 <value>${queueName}</value>
18 </property>
19 <property>
20 <name>date</name>
21 <value>${date}</value>
22 </property>
23 </configuration>
24 </workflow>
25 </action>
26</coordinator-app>
27 |
1 2
| 1 在该文件中增加了一个date属性,用于向workflow.xml文件传递date参数的值。
2 |
(3)修改协调作业属性文件
修改后的job-coord.properties文件内容如下:
1 2 3 4 5 6 7 8 9 10
| 1nameNode=hdfs://cdh2:8020
2jobTracker=cdh2:8032
3queueName=default
4oozie.use.system.libpath=true
5oozie.coord.application.path=${nameNode}/user/${user.name}
6timezone=UTC
7start=2016-07-20T01:30Z
8end=2020-12-31T01:30Z
9workflowAppUri=${nameNode}/user/${user.name}
10 |
1 2
| 1 该文件中只修改了start和end属性的值以用于测试。
2 |
(4)部署工作流和协调作业
1 2 3 4 5 6 7
| 1hdfs dfs -put -f coordinator.xml /user/root/
2hdfs dfs -put -f /root/workflow.xml /user/root/
3hdfs dfs -put -f /etc/hive/conf.cloudera.hive/hive-site.xml /tmp/
4hdfs dfs -put -f /root/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar /tmp/
5hdfs dfs -put -f /root/regular_etl.sql /tmp/
6hdfs dfs -put -f /root/month_sum.sql /tmp/
7 |
1 2
| 1oozie job -oozie http://cdh2:11000/oozie -config /root/job-coord.properties -run -D date=`date +"%d"`
2 |
1 2
| 1 通过命令行的-D参数传递date属性的值,date为当前日期数,执行时是20号。
2 |
到了9点半工作流开始运行,等执行完全成功后,month_end_sales_order_fact表中有了2016年6月销售订单汇总的两条数据,如下图所示。
order_month_sk的值为198,使用下面的查询可以确认对应的年月是2016年6月。
1 2
| 1select year,month from month_dim where month_sk=198;
2 |
本小节说明如何在销售订单上实现累积快照。
该累加快照跟踪五个销售订单的里程碑:下订单、分配库房、打包、配送和收货。这五个里程碑的日期及其各自的数量来自源数据库的销售订单表。一个订单完整的生命周期由五行描述:下订单的时间一行,订单商品被分配到库房的时间一行,产品打包的时间一行,订单配送的时间一行,订单客户收货的时间一行。每个里程碑各有一个状态:N为新订单,A为已分配库房,P为已打包,S为已配送,R为已收货。源数据库的sales_order表结构必须做相应的改变,以处理五种不同的状态。
(1)修改数据库模式
执行下面的脚本修改数据库模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| 1-- MySQL
2USE source;
3-- 修改销售订单事务表
4ALTER TABLE sales_order
5 CHANGE order_date status_date datetime
6, ADD order_status VARCHAR(1) AFTER status_date
7, CHANGE order_quantity quantity INT;
8
9-- 删除sales_order表的主键
10alter table sales_order change order_number order_number int not null;
11alter table sales_order drop primary key;
12
13-- 建立新的主键
14alter table sales_order add id int unsigned not null auto_increment primary key comment '主键' first;
15
16-- Hive
17-- rds.sales_order并没有增加id列,原因有两个:
18-- 1. 该列只作为增量检查列,不用存储
19-- 2. 不用再重新导入所有数据
20use rds;
21alter table sales_order
22change order_date status_date timestamp comment 'status date';
23alter table sales_order
24change order_quantity quantity int comment 'quantity';
25alter table sales_order
26add columns (order_status varchar(1) comment 'order status');
27
28USE dw;
29-- 事实表增加八列
30alter table sales_order_fact rename to sales_order_fact_old;
31create table sales_order_fact
32(
33 order_sk int comment 'order surrogate key',
34 customer_sk int comment 'customer surrogate key',
35 product_sk int comment 'product surrogate key',
36 order_date_sk int comment 'order date surrogate key',
37 allocate_date_sk int comment 'allocate date surrogate key',
38 allocate_quantity int comment 'allocate quantity',
39 packing_date_sk int comment 'packing date surrogate key',
40 packing_quantity int comment 'packing quantity',
41 ship_date_sk int comment 'ship date surrogate key',
42 ship_quantity int comment 'ship quantity',
43 receive_date_sk int comment 'receive date surrogate key',
44 receive_quantity int comment 'receive quantity',
45 request_delivery_date_sk int comment 'request delivery date surrogate key',
46 order_amount decimal(10,2) comment 'order amount',
47 order_quantity int comment 'order quantity'
48)
49CLUSTERED BY (order_sk) INTO 8 BUCKETS
50STORED AS ORC TBLPROPERTIES ('transactional'='true');
51insert into sales_order_fact
52select order_sk,
53 customer_sk,
54 product_sk,
55 order_date_sk,
56 null,
57 null,
58 null,
59 null,
60 null,
61 null,
62 null,
63 null,
64 request_delivery_date_sk,
65 order_amount,
66 order_quantity
67 from sales_order_fact_old;
68drop table sales_order_fact_old;
69
70-- 建立四个日期维度视图
71CREATE VIEW allocate_date_dim (allocate_date_sk, allocate_date, month, month_name, quarter, year, promo_ind)
72AS
73SELECT date_sk,
74 date,
75 month,
76 month_name,
77 quarter,
78 year,
79 promo_ind
80 FROM date_dim ;
81
82CREATE VIEW packing_date_dim (packing_date_sk, packing_date, month, month_name, quarter, year, promo_ind)
83AS
84SELECT date_sk,
85 date,
86 month,
87 month_name,
88 quarter,
89 year,
90 promo_ind
91 FROM date_dim ;
92
93CREATE VIEW ship_date_dim (ship_date_sk, ship_date, month, month_name, quarter, year, promo_ind)
94AS
95SELECT date_sk,
96 date,
97 month,
98 month_name,
99 quarter,
100 year,
101 promo_ind
102 FROM date_dim ;
103
104CREATE VIEW receive_date_dim (receive_date_sk, receive_date, month, month_name, quarter, year, promo_ind)
105AS
106SELECT date_sk,
107 date,
108 month,
109 month_name,
110 quarter,
111 year,
112 promo_ind
113 FROM date_dim ;
114 |
对源数据库的修改如下:把order_date列改名为status_date,添加了名为order_status的列,并把order_quantity列改名为quantity。正如名字所表示的,order_status列用于存储N,A,P,S或R之一。它描述了status_date列对应的状态值。如果一条记录的状态为N,则status_date列是下订单的日期。如果状态是R,status_date列是收货日期。对数据仓库的修改如下:给现有的sales_order_fact表添加四个数量和四个日期代理键,要加的新列是allocate_date_sk、allocate_quantity、packing_date_sk、packing_quantity、ship_date_sk、ship_quantity、receive_date_sk、receive_quantity。还要在日期维度上使用数据库视图角色扮演生成四个新的日期代理键。
(2)重建Sqoop作业
使用下面的脚本重建Sqoop作业,因为源表会有多个相同的order_number,所以不能再用它作为检查字段,将检查字段改为id
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| 1last_value=`sqoop job --show myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop | grep incremental.last.value | awk '{print $3}'`
2sqoop job --delete myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop
3sqoop job \
4--meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop \
5--create myjob_incremental_import \
6-- \
7import \
8--connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" \
9--table sales_order \
10--columns "order_number, customer_number, product_code, status_date, entry_date, order_amount, quantity, request_delivery_date, order_status" \
11--hive-import \
12--hive-table rds.sales_order \
13--incremental append \
14--check-column id \
15--last-value $last_value
16 |
1 2
| 1(3)修改定期装载regular_etl.sql文件
2 |
需要依据数据库模式修改定期装载的HiveQL脚本,修改后的脚本如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
| 1-- 设置变量以支持事务
2set hive.support.concurrency=true;
3set hive.exec.dynamic.partition.mode=nonstrict;
4set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
5set hive.compactor.initiator.on=true;
6set hive.compactor.worker.threads=1;
7
8USE dw;
9
10-- 设置SCD的生效时间和过期时间
11SET hivevar:cur_date = CURRENT_DATE();
12SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);
13SET hivevar:max_date = CAST('2200-01-01' AS DATE);
14
15-- 设置CDC的上限时间
16INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;
17
18-- 装载customer维度
19-- 设置已删除记录和地址相关列上SCD2的过期,用<=>运算符处理NULL值。
20UPDATE customer_dim
21 SET expiry_date = ${hivevar:pre_date}
22 WHERE customer_dim.customer_sk IN
23(SELECT a.customer_sk
24 FROM (SELECT customer_sk,
25 customer_number,
26 customer_street_address,
27 customer_zip_code,
28 customer_city,
29 customer_state,
30 shipping_address,
31 shipping_zip_code,
32 shipping_city,
33 shipping_state
34 FROM customer_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN
35 rds.customer b ON a.customer_number = b.customer_number
36 WHERE b.customer_number IS NULL OR
37 ( !(a.customer_street_address <=> b.customer_street_address)
38 OR !(a.customer_zip_code <=> b.customer_zip_code)
39 OR !(a.customer_city <=> b.customer_city)
40 OR !(a.customer_state <=> b.customer_state)
41 OR !(a.shipping_address <=> b.shipping_address)
42 OR !(a.shipping_zip_code <=> b.shipping_zip_code)
43 OR !(a.shipping_city <=> b.shipping_city)
44 OR !(a.shipping_state <=> b.shipping_state)
45 ));
46
47-- 处理customer_street_addresses列上SCD2的新增行
48INSERT INTO customer_dim
49SELECT
50 ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
51 t1.customer_number,
52 t1.customer_name,
53 t1.customer_street_address,
54 t1.customer_zip_code,
55 t1.customer_city,
56 t1.customer_state,
57 t1.shipping_address,
58 t1.shipping_zip_code,
59 t1.shipping_city,
60 t1.shipping_state,
61 t1.version,
62 t1.effective_date,
63 t1.expiry_date
64FROM
65(
66SELECT
67 t2.customer_number customer_number,
68 t2.customer_name customer_name,
69 t2.customer_street_address customer_street_address,
70 t2.customer_zip_code customer_zip_code,
71 t2.customer_city customer_city,
72 t2.customer_state customer_state,
73 t2.shipping_address shipping_address,
74 t2.shipping_zip_code shipping_zip_code,
75 t2.shipping_city shipping_city,
76 t2.shipping_state shipping_state,
77 t1.version + 1 version,
78 ${hivevar:pre_date} effective_date,
79 ${hivevar:max_date} expiry_date
80 FROM customer_dim t1
81INNER JOIN rds.customer t2
82 ON t1.customer_number = t2.customer_number
83 AND t1.expiry_date = ${hivevar:pre_date}
84 LEFT JOIN customer_dim t3
85 ON t1.customer_number = t3.customer_number
86 AND t3.expiry_date = ${hivevar:max_date}
87WHERE (!(t1.customer_street_address <=> t2.customer_street_address)
88 OR !(t1.customer_zip_code <=> t2.customer_zip_code)
89 OR !(t1.customer_city <=> t2.customer_city)
90 OR !(t1.customer_state <=> t2.customer_state)
91 OR !(t1.shipping_address <=> t2.shipping_address)
92 OR !(t1.shipping_zip_code <=> t2.shipping_zip_code)
93 OR !(t1.shipping_city <=> t2.shipping_city)
94 OR !(t1.shipping_state <=> t2.shipping_state)
95 )
96 AND t3.customer_sk IS NULL) t1
97CROSS JOIN
98(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;
99
100-- 处理customer_name列上的SCD1
101-- 因为hive的update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update
102-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录
103DROP TABLE IF EXISTS tmp;
104CREATE TABLE tmp AS
105SELECT
106 a.customer_sk,
107 a.customer_number,
108 b.customer_name,
109 a.customer_street_address,
110 a.customer_zip_code,
111 a.customer_city,
112 a.customer_state,
113 a.shipping_address,
114 a.shipping_zip_code,
115 a.shipping_city,
116 a.shipping_state,
117 a.version,
118 a.effective_date,
119 a.expiry_date
120 FROM customer_dim a, rds.customer b
121 WHERE a.customer_number = b.customer_number AND !(a.customer_name <=> b.customer_name);
122DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);
123INSERT INTO customer_dim SELECT * FROM tmp;
124
125-- 处理新增的customer记录
126INSERT INTO customer_dim
127SELECT
128 ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
129 t1.customer_number,
130 t1.customer_name,
131 t1.customer_street_address,
132 t1.customer_zip_code,
133 t1.customer_city,
134 t1.customer_state,
135 t1.shipping_address,
136 t1.shipping_zip_code,
137 t1.shipping_city,
138 t1.shipping_state,
139 1,
140 ${hivevar:pre_date},
141 ${hivevar:max_date}
142FROM
143(
144SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number
145 WHERE t2.customer_sk IS NULL) t1
146CROSS JOIN
147(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;
148
149-- 重载PA客户维度
150TRUNCATE TABLE pa_customer_dim;
151INSERT INTO pa_customer_dim
152SELECT
153 customer_sk
154, customer_number
155, customer_name
156, customer_street_address
157, customer_zip_code
158, customer_city
159, customer_state
160, shipping_address
161, shipping_zip_code
162, shipping_city
163, shipping_state
164, version
165, effective_date
166, expiry_date
167FROM customer_dim
168WHERE customer_state = 'PA' ;
169
170-- 装载product维度
171-- 设置已删除记录和product_name、product_category列上SCD2的过期
172UPDATE product_dim
173 SET expiry_date = ${hivevar:pre_date}
174 WHERE product_dim.product_sk IN
175(SELECT a.product_sk
176 FROM (SELECT product_sk,product_code,product_name,product_category
177 FROM product_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN
178 rds.product b ON a.product_code = b.product_code
179 WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));
180
181-- 处理product_name、product_category列上SCD2的新增行
182INSERT INTO product_dim
183SELECT
184 ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,
185 t1.product_code,
186 t1.product_name,
187 t1.product_category,
188 t1.version,
189 t1.effective_date,
190 t1.expiry_date
191FROM
192(
193SELECT
194 t2.product_code product_code,
195 t2.product_name product_name,
196 t2.product_category product_category,
197 t1.version + 1 version,
198 ${hivevar:pre_date} effective_date,
199 ${hivevar:max_date} expiry_date
200 FROM product_dim t1
201INNER JOIN rds.product t2
202 ON t1.product_code = t2.product_code
203 AND t1.expiry_date = ${hivevar:pre_date}
204 LEFT JOIN product_dim t3
205 ON t1.product_code = t3.product_code
206 AND t3.expiry_date = ${hivevar:max_date}
207WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1
208CROSS JOIN
209(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;
210
211-- 处理新增的product记录
212INSERT INTO product_dim
213SELECT
214 ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,
215 t1.product_code,
216 t1.product_name,
217 t1.product_category,
218 1,
219 ${hivevar:pre_date},
220 ${hivevar:max_date}
221FROM
222(
223SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code
224 WHERE t2.product_sk IS NULL) t1
225CROSS JOIN
226(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;
227
228-- 装载order维度,
229-- 前一天新增的销售订单号
230INSERT INTO order_dim
231SELECT
232 ROW_NUMBER() OVER (ORDER BY t1.order_number) + t2.sk_max,
233 t1.order_number,
234 t1.version,
235 t1.effective_date,
236 t1.expiry_date
237 FROM
238(
239SELECT
240 order_number order_number,
241 1 version,
242 status_date effective_date,
243 '2200-01-01' expiry_date
244 FROM rds.sales_order, rds.cdc_time
245 WHERE order_status = 'N' AND entry_date >= last_load AND entry_date < current_load ) t1
246CROSS JOIN
247(SELECT COALESCE(MAX(order_sk),0) sk_max FROM order_dim) t2;
248
249-- 装载销售订单事实表
250-- 前一天新增的销售订单
251INSERT INTO sales_order_fact
252SELECT
253 order_sk,
254 customer_sk,
255 product_sk,
256 e.order_date_sk,
257 null,
258 null,
259 null,
260 null,
261 null,
262 null,
263 null,
264 null,
265 f.request_delivery_date_sk,
266 order_amount,
267 quantity
268 FROM
269 rds.sales_order a,
270 order_dim b,
271 customer_dim c,
272 product_dim d,
273 order_date_dim e,
274 request_delivery_date_dim f,
275 rds.cdc_time g
276 WHERE
277 a.order_status = 'N'
278AND a.order_number = b.order_number
279AND a.customer_number = c.customer_number
280AND a.status_date >= c.effective_date
281AND a.status_date < c.expiry_date
282AND a.product_code = d.product_code
283AND a.status_date >= d.effective_date
284AND a.status_date < d.expiry_date
285AND to_date(a.status_date) = e.order_date
286AND to_date(a.request_delivery_date) = f.request_delivery_date
287AND a.entry_date >= g.last_load AND a.entry_date < g.current_load ;
288
289-- 处理分配库房、打包、配送和收货四个状态
290DROP TABLE IF EXISTS tmp;
291CREATE TABLE tmp AS
292select t0.order_sk order_sk,
293 t0.customer_sk customer_sk,
294 t0.product_sk product_sk,
295 t0.order_date_sk order_date_sk,
296 t2.allocate_date_sk allocate_date_sk,
297 t1.quantity allocate_quantity,
298 t0.packing_date_sk packing_date_sk,
299 t0.packing_quantity packing_quantity,
300 t0.ship_date_sk ship_date_sk,
301 t0.ship_quantity ship_quantity,
302 t0.receive_date_sk receive_date_sk,
303 t0.receive_quantity receive_quantity,
304 t0.request_delivery_date_sk request_delivery_date_sk,
305 t0.order_amount order_amount,
306 t0.order_quantity order_quantity
307 from sales_order_fact t0,
308 rds.sales_order t1,
309 allocate_date_dim t2,
310 order_dim t3,
311 rds.cdc_time t4
312 where t0.order_sk = t3.order_sk
313 and t3.order_number = t1.order_number and t1.order_status = 'A'
314 and to_date(t1.status_date) = t2.allocate_date
315 and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
316
317DELETE FROM sales_order_fact WHERE sales_order_fact.order_sk IN (SELECT order_sk FROM tmp);
318INSERT INTO sales_order_fact SELECT * FROM tmp;
319
320DROP TABLE IF EXISTS tmp;
321CREATE TABLE tmp AS
322select t0.order_sk order_sk,
323 t0.customer_sk customer_sk,
324 t0.product_sk product_sk,
325 t0.order_date_sk order_date_sk,
326 t0.allocate_date_sk allocate_date_sk,
327 t0.allocate_quantity allocate_quantity,
328 t2.packing_date_sk packing_date_sk,
329 t1.quantity packing_quantity,
330 t0.ship_date_sk ship_date_sk,
331 t0.ship_quantity ship_quantity,
332 t0.receive_date_sk receive_date_sk,
333 t0.receive_quantity receive_quantity,
334 t0.request_delivery_date_sk request_delivery_date_sk,
335 t0.order_amount order_amount,
336 t0.order_quantity order_quantity
337 from sales_order_fact t0,
338 rds.sales_order t1,
339 packing_date_dim t2,
340 order_dim t3,
341 rds.cdc_time t4
342 where t0.order_sk = t3.order_sk
343 and t3.order_number = t1.order_number and t1.order_status = 'P'
344 and to_date(t1.status_date) = t2.packing_date
345 and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
346
347DELETE FROM sales_order_fact WHERE sales_order_fact.order_sk IN (SELECT order_sk FROM tmp);
348INSERT INTO sales_order_fact SELECT * FROM tmp;
349
350DROP TABLE IF EXISTS tmp;
351CREATE TABLE tmp AS
352select t0.order_sk order_sk,
353 t0.customer_sk customer_sk,
354 t0.product_sk product_sk,
355 t0.order_date_sk order_date_sk,
356 t0.allocate_date_sk allocate_date_sk,
357 t0.allocate_quantity allocate_quantity,
358 t0.packing_date_sk packing_date_sk,
359 t0.packing_quantity packing_quantity,
360 t2.ship_date_sk ship_date_sk,
361 t1.quantity ship_quantity,
362 t0.receive_date_sk receive_date_sk,
363 t0.receive_quantity receive_quantity,
364 t0.request_delivery_date_sk request_delivery_date_sk,
365 t0.order_amount order_amount,
366 t0.order_quantity order_quantity
367 from sales_order_fact t0,
368 rds.sales_order t1,
369 ship_date_dim t2,
370 order_dim t3,
371 rds.cdc_time t4
372 where t0.order_sk = t3.order_sk
373 and t3.order_number = t1.order_number and t1.order_status = 'S'
374 and to_date(t1.status_date) = t2.ship_date
375 and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
376
377DELETE FROM sales_order_fact WHERE sales_order_fact.order_sk IN (SELECT order_sk FROM tmp);
378INSERT INTO sales_order_fact SELECT * FROM tmp;
379
380DROP TABLE IF EXISTS tmp;
381CREATE TABLE tmp AS
382select t0.order_sk order_sk,
383 t0.customer_sk customer_sk,
384 t0.product_sk product_sk,
385 t0.order_date_sk order_date_sk,
386 t0.allocate_date_sk allocate_date_sk,
387 t0.allocate_quantity allocate_quantity,
388 t0.packing_date_sk packing_date_sk,
389 t0.packing_quantity packing_quantity,
390 t0.ship_date_sk ship_date_sk,
391 t0.ship_quantity ship_quantity,
392 t2.receive_date_sk receive_date_sk,
393 t1.quantity receive_quantity,
394 t0.request_delivery_date_sk request_delivery_date_sk,
395 t0.order_amount order_amount,
396 t0.order_quantity order_quantity
397 from sales_order_fact t0,
398 rds.sales_order t1,
399 receive_date_dim t2,
400 order_dim t3,
401 rds.cdc_time t4
402 where t0.order_sk = t3.order_sk
403 and t3.order_number = t1.order_number and t1.order_status = 'R'
404 and to_date(t1.status_date) = t2.receive_date
405 and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
406
407DELETE FROM sales_order_fact WHERE sales_order_fact.order_sk IN (SELECT order_sk FROM tmp);
408INSERT INTO sales_order_fact SELECT * FROM tmp;
409
410-- 更新时间戳表的last_load字段
411INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;
412 |
(1)新增两个销售订单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| 1USE source;
2/***
3新增订单日期为2016年7月20日的2条订单。
4***/
5SET @start_date := unix_timestamp('2016-07-20');
6SET @end_date := unix_timestamp('2016-07-21');
7SET @request_delivery_date := '2016-07-25';
8DROP TABLE IF EXISTS temp_sales_order_data;
9CREATE TABLE temp_sales_order_data AS SELECT * FROM sales_order WHERE 1=0;
10
11SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
12SET @amount := floor(1000 + rand() * 9000);
13SET @quantity := floor(10 + rand() * 90);
14INSERT INTO temp_sales_order_data VALUES (1, 1, 1, 1, @order_date, 'N', @request_delivery_date, @order_date, @amount, @quantity);
15
16SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
17SET @amount := floor(1000 + rand() * 9000);
18SET @quantity := floor(10 + rand() * 90);
19INSERT INTO temp_sales_order_data VALUES (2, 2, 2, 2, @order_date, 'N', @request_delivery_date, @order_date, @amount, @quantity);
20
21INSERT INTO sales_order
22select null,
23 (@order_number:=@order_number + 1) + 128,
24 customer_number,
25 product_code,
26 status_date,
27 order_status,
28 request_delivery_date,
29 entry_date,
30 order_amount,
31 quantity
32 from temp_sales_order_data t1,(select @order_number:=0) t2
33 order by t1.status_date;
34
35COMMIT ;
36 |
1 2 3
| 1use rds;
2INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-07-20', '2016-07-21' FROM rds.cdc_time;
3 |
1 2
| 1将regular_etl.sql文件中的SET hivevar:cur_date = CURRENT_DATE();行改为SET hivevar:cur_date = '2016-07-21';
2 |
(3)执行定期装载脚本
1 2
| 1(4)查询sales_order_fact表里的两个销售订单,确认定期装载成功
2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| 1use dw;
2select b.order_number,
3 c.order_date,
4 d.allocate_date,
5 e.packing_date,
6 f.ship_date,
7 g.receive_date
8 from sales_order_fact a
9 inner join order_dim b on a.order_sk = b.order_sk
10 left join order_date_dim c on a.order_date_sk = c.order_date_sk
11 left join allocate_date_dim d on a.allocate_date_sk = d.allocate_date_sk
12 left join packing_date_dim e on a.packing_date_sk = e.packing_date_sk
13 left join ship_date_dim f on a.ship_date_sk = f.ship_date_sk
14 left join receive_date_dim g on a.receive_date_sk = g.receive_date_sk
15 where b.order_number IN (129 , 130);
16 |
1 2
| 1 查询结果如下图所示,只有order_date列有值,其它日期都是NULL,因为这两个订单是新增的,并且还没有分配库房、打包、配送或收货。
2 |
(5)添加销售订单作为这两个订单的分配库房和/或打包的里程碑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| 1USE source;
2
3SET @start_date := unix_timestamp('2016-07-21');
4SET @end_date := unix_timestamp('2016-07-22');
5SET @mid_date := unix_timestamp('2016-07-21 12:00:00');
6SET @request_delivery_date := '2016-07-25';
7DROP TABLE IF EXISTS temp_sales_order_data;
8CREATE TABLE temp_sales_order_data AS SELECT * FROM sales_order WHERE 1=0;
9
10SET @order_date := from_unixtime(@start_date + rand() * (@mid_date - @start_date));
11select order_amount,quantity into @amount,@quantity from sales_order where order_number=129;
12INSERT INTO temp_sales_order_data VALUES (1, 129, 1, 1, @order_date, 'A', @request_delivery_date, @order_date, @amount, @quantity);
13
14SET @order_date := from_unixtime(@mid_date + rand() * (@end_date - @mid_date));
15INSERT INTO temp_sales_order_data VALUES (2, 129, 1, 1, @order_date, 'P', @request_delivery_date, @order_date, @amount, @quantity);
16
17SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
18select order_amount,quantity into @amount,@quantity from sales_order where order_number=130;
19INSERT INTO temp_sales_order_data VALUES (3, 130, 2, 2, @order_date, 'A', @request_delivery_date, @order_date, @amount, @quantity);
20
21INSERT INTO sales_order
22select null,
23 order_number,
24 customer_number,
25 product_code,
26 status_date,
27 order_status,
28 request_delivery_date,
29 entry_date,
30 order_amount,
31 quantity
32 from temp_sales_order_data
33 order by entry_date;
34
35COMMIT ;
36 |
1 2 3
| 1use rds;
2INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-07-21', '2016-07-22' FROM rds.cdc_time;
3 |
1 2
| 1将regular_etl.sql文件中的SET hivevar:cur_date = CURRENT_DATE();行改为SET hivevar:cur_date = '2016-07-22';
2 |
(7)执行定期装载脚本
1 2
| 1(8)查询sales_order_fact表里的两个销售订单,确认定期装载成功
2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| 1use dw;
2select b.order_number,
3 c.order_date,
4 d.allocate_date,
5 e.packing_date,
6 f.ship_date,
7 g.receive_date
8 from sales_order_fact a
9 inner join order_dim b on a.order_sk = b.order_sk
10 left join order_date_dim c on a.order_date_sk = c.order_date_sk
11 left join allocate_date_dim d on a.allocate_date_sk = d.allocate_date_sk
12 left join packing_date_dim e on a.packing_date_sk = e.packing_date_sk
13 left join ship_date_dim f on a.ship_date_sk = f.ship_date_sk
14 left join receive_date_dim g on a.receive_date_sk = g.receive_date_sk
15 where b.order_number IN (129 , 130);
16 |
1 2
| 1 查询结果如下图所示。第一个订单具有了allocate_date和packing_date,第二个只具有allocate_date。
2 |
(9)添加销售订单作为这两个订单后面的里程碑:打包、配送和/或收货。注意四个日期可能相同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| 1USE source;
2
3SET @start_date := unix_timestamp('2016-07-22');
4SET @end_date := unix_timestamp('2016-07-23');
5SET @mid_date := unix_timestamp('2016-07-22 12:00:00');
6SET @request_delivery_date := '2016-07-25';
7DROP TABLE IF EXISTS temp_sales_order_data;
8CREATE TABLE temp_sales_order_data AS SELECT * FROM sales_order WHERE 1=0;
9
10SET @order_date := from_unixtime(@start_date + rand() * (@mid_date - @start_date));
11select order_amount,quantity into @amount,@quantity from sales_order where order_number=129 limit 1;
12INSERT INTO temp_sales_order_data VALUES (1, 129, 1, 1, @order_date, 'S', @request_delivery_date, @order_date, @amount, @quantity);
13
14SET @order_date := from_unixtime(@mid_date + rand() * (@end_date - @mid_date));
15INSERT INTO temp_sales_order_data VALUES (2, 129, 1, 1, @order_date, 'R', @request_delivery_date, @order_date, @amount, @quantity);
16
17SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
18select order_amount,quantity into @amount,@quantity from sales_order where order_number=130 limit 1;
19INSERT INTO temp_sales_order_data VALUES (3, 130, 2, 2, @order_date, 'P', @request_delivery_date, @order_date, @amount, @quantity);
20
21INSERT INTO sales_order
22select null,
23 order_number,
24 customer_number,
25 product_code,
26 status_date,
27 order_status,
28 request_delivery_date,
29 entry_date,
30 order_amount,
31 quantity
32 from temp_sales_order_data
33 order by entry_date;
34
35COMMIT ;
36 |
1 2 3
| 1use rds;
2INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-07-22', '2016-07-23' FROM rds.cdc_time;
3 |
1 2
| 1将regular_etl.sql文件中的SET hivevar:cur_date = CURRENT_DATE();行改为SET hivevar:cur_date = '2016-07-23';
2 |
(11)执行定期装载脚本
1 2
| 1(12)查询sales_order_fact表里的两个销售订单,确认定期装载成功
2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| 1use dw;
2select b.order_number,
3 c.order_date,
4 d.allocate_date,
5 e.packing_date,
6 f.ship_date,
7 g.receive_date
8 from sales_order_fact a
9 inner join order_dim b on a.order_sk = b.order_sk
10 left join order_date_dim c on a.order_date_sk = c.order_date_sk
11 left join allocate_date_dim d on a.allocate_date_sk = d.allocate_date_sk
12 left join packing_date_dim e on a.packing_date_sk = e.packing_date_sk
13 left join ship_date_dim f on a.ship_date_sk = f.ship_date_sk
14 left join receive_date_dim g on a.receive_date_sk = g.receive_date_sk
15 where b.order_number IN (129 , 130);
16 |
1 2
| 1 查询结果如下图所示。第一个订单号为129的订单,具有了全部日期,这意味着订单已完成(客户已经收货)。第二个订单已经打包,但是还没有配送。
2 |
(13)还原
将regular_etl.sql文件中的SET hivevar:cur_date = DATE_ADD(CURRENT_DATE(),2);行改为SET hivevar:cur_date = CURRENT_DATE();