十一、多重星型模式
从“进阶技术”开始,已经通过增加列和表扩展了数据仓库,在进阶技术(五) “快照”里增加了第二个事实表,month_end_sales_order_fact表。这之后数据仓库模式就有了两个事实表(第一个是在开始建立数据仓库时创建的sales_order_fact表)。有了这两个事实表的数据仓库就是一个标准的双星型模式。
本节将在现有的维度数据仓库上再增加一个新的星型结构。与现有的与销售关联的星型结构不同,新的星型结构关注的是产品业务领域。新的星型结构有一个事实表和一个维度表,用于存储数据仓库中的产品数据。
1. 一个新的星型模式
下图显示了扩展后的数据仓库模式。
模式中有三个星型结构。sales_order_fact表是第一个星型结构的事实表,与其相关的维度表是customer_dim、product_dim、date_dim和sales_order_attribute_dim表。month_end_sales_order_fact表是第二个星型结构的事实表。product_dim和month_dim是其对应的维度表。第一个和第二个星型结构共享product_dim维度表。第二个星型结构的事实表和月份维度数据分别来自于第一个星型结构的事实表和date_dim维度表。它们不从源数据获得数据。第三个星型模式的事实表是新建的production_fact表。它的维度除了存储在已有的date_dim和product_dim表,还有一个新的factory_dim表。第三个星型结构的数据来自源数据。
执行下面的脚本建立第三个星型模式中的新表和对应的源数据表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| 1-- 在MySQL源库上建立工厂表和每日产品表
2USE source;
3CREATE TABLE factory_master (
4 factory_code INT,
5 factory_name CHAR(30),
6 factory_street_address CHAR(50),
7 factory_zip_code INT(5),
8 factory_city CHAR(30),
9 factory_state CHAR(2)
10);
11alter table factory_master add primary key (factory_code);
12
13CREATE TABLE daily_production (
14 product_code INT,
15 production_date DATE,
16 factory_code INT,
17 production_quantity INT
18);
19
20ALTER TABLE daily_production ADD FOREIGN KEY (factory_code)
21REFERENCES factory_master(factory_code) ON DELETE CASCADE ON UPDATE CASCADE ;
22ALTER TABLE daily_production ADD FOREIGN KEY (product_code)
23REFERENCES product(product_code) ON DELETE CASCADE ON UPDATE CASCADE ;
24alter table daily_production add primary key (product_code,production_date,factory_code);
25
26-- 在Hive的rds库上建立相应的过渡表
27USE rds;
28CREATE TABLE factory_master (
29 factory_code INT,
30 factory_name VARCHAR(30),
31 factory_street_address VARCHAR(50),
32 factory_zip_code INT,
33 factory_city VARCHAR(30),
34 factory_state VARCHAR(2)
35);
36
37CREATE TABLE daily_production (
38 product_code INT,
39 production_date DATE,
40 factory_code INT,
41 production_quantity INT
42);
43
44-- 在Hive的dw库上建立相应的维度表和事实表
45USE dw;
46CREATE TABLE factory_dim (
47 factory_sk INT,
48 factory_code INT,
49 factory_name VARCHAR(30),
50 factory_street_address VARCHAR(50),
51 factory_zip_code INT,
52 factory_city VARCHAR(30),
53 factory_state VARCHAR(2),
54 version int,
55 effective_date DATE,
56 expiry_date DATE
57)
58clustered by (factory_sk) into 8 buckets
59stored as orc tblproperties ('transactional'='true');
60
61CREATE TABLE production_fact (
62 product_sk INT
63, production_date_sk INT
64, factory_sk INT
65, production_quantity INT
66);
67 |
(1)新建抽取作业
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| 1# 建立增量抽取每日产品表的作业,以production_date作为检查列,初始值是'1900-01-01'
2last_value='1900-01-01'
3sqoop job --delete myjob_incremental_import_daily_production --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop
4sqoop job \
5--meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop \
6--create myjob_incremental_import_daily_production \
7-- \
8import \
9--connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" \
10--table daily_production \
11--columns "product_code,production_date,factory_code,production_quantity" \
12--hive-import \
13--hive-table rds.daily_production \
14--incremental append \
15--check-column production_date \
16--last-value $last_value
17 |
1 2
| 1 新建定期装载每日产品脚本文件regular_etl_daily_production.sh,内容如下。
2 |
1 2 3 4 5 6 7 8 9
| 1#!/bin/bash
2# 全量抽取工厂表
3sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password mypassword --table factory_master --hive-impo
4rt --hive-table rds.factory_master --hive-overwrite
5# 增量抽取每日产品表
6sqoop job --exec myjob_incremental_import_daily_production --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop
7# 调用regular_etl_daily_production.sql文件执行定期装载
8beeline -u jdbc:hive2://cdh2:10000/dw -f regular_etl_daily_production.sql
9 |
1 2
| 1 为了和其它定期装载脚本共用环境和时间窗口设置,新建一个set_time.sql脚本,内容如下。
2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| 1-- 设置变量以支持事务
2set hive.support.concurrency=true;
3set hive.exec.dynamic.partition.mode=nonstrict;
4set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
5set hive.compactor.initiator.on=true;
6set hive.compactor.worker.threads=1;
7
8USE dw;
9
10-- 设置SCD的生效时间和过期时间
11SET hivevar:cur_date = CURRENT_DATE();
12SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);
13SET hivevar:max_date = CAST('2200-01-01' AS DATE);
14
15-- 设置CDC的上限时间
16INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;
17 |
1 2
| 1 新建regular_etl_daily_production.sql脚本文件内容如下。
2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| 1-- 设置环境与时间窗口
2!run /root/set_time.sql
3
4-- 工厂信息很少修改,一般不需要保留历史,所以使用SCD1
5drop table if exists tmp;
6create table tmp as
7select a.factory_sk,
8 a.factory_code,
9 b.factory_name,
10 b.factory_street_address,
11 b.factory_zip_code,
12 b.factory_city,
13 b.factory_state,
14 a.version,
15 a.effective_date,
16 a.expiry_date
17 from factory_dim a,rds.factory_master b
18 where a.factory_code = b.factory_code and
19 !(a.factory_name <=> b.factory_name
20 and a.factory_street_address <=> b.factory_street_address
21 and a.factory_zip_code <=> b.factory_zip_code
22 and a.factory_city <=> b.factory_city
23 and a.factory_state <=> b.factory_state);
24
25delete from factory_dim where factory_dim.factory_sk in (select factory_sk from tmp);
26insert into factory_dim select * from tmp;
27
28-- 添加新的工厂信息
29INSERT INTO factory_dim
30SELECT
31 ROW_NUMBER() OVER (ORDER BY t1.factory_code) + t2.sk_max,
32 t1.factory_code,
33 t1.factory_name,
34 t1.factory_street_address,
35 t1.factory_zip_code,
36 t1.factory_city,
37 t1.factory_state,
38 1,
39 ${hivevar:pre_date},
40 ${hivevar:max_date}
41FROM
42(
43SELECT t1.* FROM rds.factory_master t1 LEFT JOIN factory_dim t2 ON t1.factory_code = t2.factory_code
44 WHERE t2.factory_sk IS NULL) t1
45CROSS JOIN
46(SELECT COALESCE(MAX(factory_sk),0) sk_max FROM factory_dim) t2;
47
48-- 装载每日产品事实表
49INSERT INTO production_fact
50SELECT
51 b.product_sk
52, c.date_sk
53, d.factory_sk
54, production_quantity
55FROM
56 rds.daily_production a
57, product_dim b
58, date_dim c
59, factory_dim d
60WHERE
61 production_date = ${hivevar:pre_date}
62AND a.product_code = b.product_code
63AND a.production_date >= b.effective_date
64AND a.production_date <= b.expiry_date
65AND a.production_date = c.date
66AND a.factory_code = d.factory_code ;
67 |
到目前为止已经讨论了第三个星型结构里的所有表,现在做一些测试。首先需要一些工厂信息。执行下面的语句向源数据库的factory_master表中装载四个工厂信息。
1 2 3 4 5 6 7 8 9 10 11 12 13
| 1USE source;
2INSERT INTO factory_master VALUES
3 ( 1, 'First Factory', '11111 Lichtman St.', 17050,
4 'Mechanicsburg', 'PA' )
5, ( 2, 'Second Factory', '22222 Stobosky Ave.', 17055, 'Pittsburgh',
6 'PA' )
7, ( 3, 'Third Factory', '33333 Fritze Rd.', 17050, 'Mechanicsburg',
8 'PA' )
9, ( 4, 'Fourth Factory', '44444 Jenzen Blvd.', 17055, 'Pittsburgh',
10 'PA' );
11
12COMMIT ;
13 |
1 2
| 1 执行下面的语句向源数据库的daily_production表添加数据。
2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 1USE source;
2set @yesterday:=date_sub(current_date, interval 1 day);
3INSERT INTO daily_production VALUES
4 (1, @yesterday, 4, 100 )
5, (2, @yesterday, 3, 200 )
6, (3, @yesterday, 2, 300 )
7, (4, @yesterday, 1, 400 )
8, (1, @yesterday, 1, 400 )
9, (2, @yesterday, 2, 300 )
10, (3, @yesterday, 3, 200 )
11, (4, @yesterday, 4, 100 );
12
13COMMIT;
14 |
1 2
| 1 现在已经做好了测试产品定期装载的准备,使用下面的命令执行定期装载作业。
2 |
1 2
| 1./regular_etl_daily_production.sh
2 |
1 2
| 1 使用下面的SQL语句查询production_fact表,确认每天产品数据的定期装载是正确的。
2 |
1 2 3 4 5 6
| 1select product_sk product_sk,
2 production_date_sk date_sk,
3 factory_sk factory_sk,
4 production_quantity quantity
5 from dw.production_fact;
6 |
1 2
| 1 查询结果如下图所示,可以看到已经正确装载了8条每日产品记录。
2 |
为了确认工厂维度上成功应用了SCD1,使用下面的语句查询factory_dim表。
1 2 3 4 5 6 7 8 9 10 11 12
| 1select factory_sk sk,
2 factory_code c,
3 factory_name name,
4 factory_street_address address,
5 factory_zip_code zip,
6 factory_city city,
7 factory_state state,
8 version ver,
9 effective_date,
10 expiry_date
11 from dw.factory_dim a;
12 |
使用下面的语句修改源库中factory_master的数据。
1 2 3 4 5
| 1use source;
2update factory_master set factory_street_address= (case when factory_code=2 then '24242 Bunty La.' else '37373 Burbank Dr.' end)
3 where factory_code in (2,3);
4commit;
5 |
1 2
| 1./regular_etl_daily_production.sh
2 |
1 2
| 1 再次查询factory_dim表,查询结果如下图所示。
2 |
可以看到第二和第三个工厂已经正确修改了地址。