**一、增加列 **
数据仓库最常碰到的扩展是给一个已经存在的维度表和事实表添加列。本节说明如何在客户维度表和销售订单事实表上添加列,并在新列上应用SCD2,以及对定时装载脚本所做的修改。假设需要在客户维度中增加送货地址属性,并在销售订单事实表中增加数量度量值。
先看一下增加列时模式发生的变化。
修改后源数据库模式如下图所示。
修改后DW数据库模式如下图所示。
1. 修改数据库模式
使用下面的SQL脚本修改源数据库模式。
1 2 3 4 5 6 7 8 9 10 11
| 1USE source;
2
3ALTER TABLE customer
4 ADD shipping_address VARCHAR(50) AFTER customer_state
5, ADD shipping_zip_code INT AFTER shipping_address
6, ADD shipping_city VARCHAR(30) AFTER shipping_zip_code
7, ADD shipping_state VARCHAR(2) AFTER shipping_city ;
8
9ALTER TABLE sales_order
10 ADD order_quantity INT AFTER order_amount ;
11 |
1 2
| 1 使用下面的HiveQL脚本修改RDS数据库模式。
2 |
1 2 3 4 5 6 7 8 9 10 11
| 1USE rds;
2
3ALTER TABLE customer ADD COLUMNS
4 (shipping_address VARCHAR(50) COMMENT 'shipping_address'
5, shipping_zip_code INT COMMENT 'shipping_zip_code'
6, shipping_city VARCHAR(30) COMMENT 'shipping_city'
7, shipping_state VARCHAR(2) COMMENT 'shipping_state') ;
8
9ALTER TABLE sales_order ADD COLUMNS
10 (order_quantity INT COMMENT 'order_quantity') ;
11 |
1 2
| 1 使用下面的HiveQL脚本修改DW数据库模式。
2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| 1USE dw;
2
3-- 修改客户维度表
4ALTER TABLE customer_dim RENAME TO customer_dim_old;
5CREATE TABLE customer_dim (
6 customer_sk INT comment 'surrogate key',
7 customer_number INT comment 'number',
8 customer_name VARCHAR(50) comment 'name',
9 customer_street_address VARCHAR(50) comment 'address',
10 customer_zip_code INT comment 'zipcode',
11 customer_city VARCHAR(30) comment 'city',
12 customer_state VARCHAR(2) comment 'state',
13 shipping_address VARCHAR(50) COMMENT 'shipping_address',
14 shipping_zip_code INT COMMENT 'shipping_zip_code',
15 shipping_city VARCHAR(30) COMMENT 'shipping_city',
16 shipping_state VARCHAR(2) COMMENT 'shipping_state',
17 version INT comment 'version',
18 effective_date DATE comment 'effective date',
19 expiry_date DATE comment 'expiry date'
20)
21CLUSTERED BY (customer_sk) INTO 8 BUCKETS
22STORED AS ORC TBLPROPERTIES ('transactional'='true');
23INSERT INTO customer_dim
24SELECT customer_sk,
25 customer_number,
26 customer_name,
27 customer_street_address,
28 customer_zip_code,
29 customer_city,
30 customer_state,
31 NULL,
32 NULL,
33 NULL,
34 NULL,
35 version,
36 effective_date,
37 expiry_date
38 FROM customer_dim_old;
39DROP TABLE customer_dim_old;
40
41-- 修改销售订单事实表
42ALTER TABLE sales_order_fact RENAME TO sales_order_fact_old;
43CREATE TABLE sales_order_fact (
44 order_sk INT comment 'order surrogate key',
45 customer_sk INT comment 'customer surrogate key',
46 product_sk INT comment 'product surrogate key',
47 order_date_sk INT comment 'date surrogate key',
48 order_amount DECIMAL(10 , 2 ) comment 'order amount',
49 order_quantity INT COMMENT 'order_quantity'
50)
51CLUSTERED BY (order_sk) INTO 8 BUCKETS
52STORED AS ORC TBLPROPERTIES ('transactional'='true');
53INSERT INTO sales_order_fact SELECT *,NULL FROM sales_order_fact_old;
54DROP TABLE sales_order_fact_old;
55 |
1 2
| 1 上面这段代码中修改DW数据库模式这部分很奇怪:明明可以直接在表上添加列,为何要新建一个表,再把数据装载到新表中去呢?原因是老版本的Hive对ORC格式表的模式修改尤其是增加列的支持有很多问题,只有通过新建表并重新组织数据的方式才能正常执行。看一下面的简单例子就会一目了然。
2 |
1 2 3 4 5 6 7 8 9 10
| 1use test;
2drop table if exists t1;
3create table t1(c1 int, c2 string)
4clustered by (c1) into 8 buckets
5stored as orc TBLPROPERTIES ('transactional'='true');
6insert into t1 values (1,'aaa');
7alter table t1 add columns (c3 string) ;
8update t1 set c2='ccc' where c1=1;
9select * from t1;
10 |
1 2
| 1 上面的代码建了一个ORC表,插入一行数据,添加一列,修改数据,最后再查询数据。这些在关系数据库中很普通的操作,最后一步查询居然出错,如下图所示。
2 |
这个例子是在Hive1.1.0上执行的,jira上说2.0.0修复了ORC表模式修改的问题,可以参考以下链接的说明:https://issues.apache.org/jira/browse/HIVE-11981
注意,在低版本的Hive上修改ORC表的模式,特别是增加列时一定要慎重。当数据量很大时,这会是一个相当费时并会占用大量空间的操作。
2. 重建Sqoop作业
使用下面的脚本重建Sqoop作业,增加order_quantity列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| 1last_value=`sqoop job --show myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop | grep incremental.last.value | awk '{print $3}'`
2sqoop job --delete myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop
3sqoop job \
4--meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop \
5--create myjob_incremental_import \
6-- \
7import \
8--connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" \
9--table sales_order \
10--columns "order_number, customer_number, product_code, order_date, entry_date, order_amount, order_quantity" \
11--hive-import \
12--hive-table rds.sales_order \
13--incremental append \
14--check-column order_number \
15--last-value $last_value
16 |
1 2
| 1 其中$last_value是上次ETL执行后,被检查列的最大值。
2 |
3. 修改定期装载regular_etl.sql文件
修改数据库模式后,还要修改已经使用的定期装载HiveQL脚本。修改后的脚本如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
| 1-- 设置变量以支持事务
2set hive.support.concurrency=true;
3set hive.exec.dynamic.partition.mode=nonstrict;
4set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
5set hive.compactor.initiator.on=true;
6set hive.compactor.worker.threads=1;
7
8USE dw;
9
10-- 设置SCD的生效时间和过期时间
11SET hivevar:cur_date = CURRENT_DATE();
12SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);
13SET hivevar:max_date = CAST('2200-01-01' AS DATE);
14
15-- 设置CDC的上限时间
16INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;
17
18-- 装载customer维度
19-- 设置已删除记录和地址相关列上SCD2的过期,用<=>运算符处理NULL值。
20UPDATE customer_dim
21 SET expiry_date = ${hivevar:pre_date}
22 WHERE customer_dim.customer_sk IN
23(SELECT a.customer_sk
24 FROM (SELECT customer_sk,
25 customer_number,
26 customer_street_address,
27 customer_zip_code,
28 customer_city,
29 customer_state,
30 shipping_address,
31 shipping_zip_code,
32 shipping_city,
33 shipping_state
34 FROM customer_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN
35 rds.customer b ON a.customer_number = b.customer_number
36 WHERE b.customer_number IS NULL OR
37 ( !(a.customer_street_address <=> b.customer_street_address)
38 OR !(a.customer_zip_code <=> b.customer_zip_code)
39 OR !(a.customer_city <=> b.customer_city)
40 OR !(a.customer_state <=> b.customer_state)
41 OR !(a.shipping_address <=> b.shipping_address)
42 OR !(a.shipping_zip_code <=> b.shipping_zip_code)
43 OR !(a.shipping_city <=> b.shipping_city)
44 OR !(a.shipping_state <=> b.shipping_state)
45 ));
46
47-- 处理customer_street_addresses列上SCD2的新增行
48INSERT INTO customer_dim
49SELECT
50 ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
51 t1.customer_number,
52 t1.customer_name,
53 t1.customer_street_address,
54 t1.customer_zip_code,
55 t1.customer_city,
56 t1.customer_state,
57 t1.shipping_address,
58 t1.shipping_zip_code,
59 t1.shipping_city,
60 t1.shipping_state,
61 t1.version,
62 t1.effective_date,
63 t1.expiry_date
64FROM
65(
66SELECT
67 t2.customer_number customer_number,
68 t2.customer_name customer_name,
69 t2.customer_street_address customer_street_address,
70 t2.customer_zip_code customer_zip_code,
71 t2.customer_city customer_city,
72 t2.customer_state customer_state,
73 t2.shipping_address shipping_address,
74 t2.shipping_zip_code shipping_zip_code,
75 t2.shipping_city shipping_city,
76 t2.shipping_state shipping_state,
77 t1.version + 1 version,
78 ${hivevar:pre_date} effective_date,
79 ${hivevar:max_date} expiry_date
80 FROM customer_dim t1
81INNER JOIN rds.customer t2
82 ON t1.customer_number = t2.customer_number
83 AND t1.expiry_date = ${hivevar:pre_date}
84 LEFT JOIN customer_dim t3
85 ON t1.customer_number = t3.customer_number
86 AND t3.expiry_date = ${hivevar:max_date}
87WHERE (!(t1.customer_street_address <=> t2.customer_street_address)
88 OR !(t1.customer_zip_code <=> t2.customer_zip_code)
89 OR !(t1.customer_city <=> t2.customer_city)
90 OR !(t1.customer_state <=> t2.customer_state)
91 OR !(t1.shipping_address <=> t2.shipping_address)
92 OR !(t1.shipping_zip_code <=> t2.shipping_zip_code)
93 OR !(t1.shipping_city <=> t2.shipping_city)
94 OR !(t1.shipping_state <=> t2.shipping_state)
95 )
96 AND t3.customer_sk IS NULL) t1
97CROSS JOIN
98(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;
99
100-- 处理customer_name列上的SCD1
101-- 因为hive的update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update
102-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录
103DROP TABLE IF EXISTS tmp;
104CREATE TABLE tmp AS
105SELECT
106 a.customer_sk,
107 a.customer_number,
108 b.customer_name,
109 a.customer_street_address,
110 a.customer_zip_code,
111 a.customer_city,
112 a.customer_state,
113 a.shipping_address,
114 a.shipping_zip_code,
115 a.shipping_city,
116 a.shipping_state,
117 a.version,
118 a.effective_date,
119 a.expiry_date
120 FROM customer_dim a, rds.customer b
121 WHERE a.customer_number = b.customer_number AND !(a.customer_name <=> b.customer_name);
122DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);
123INSERT INTO customer_dim SELECT * FROM tmp;
124
125-- 处理新增的customer记录
126INSERT INTO customer_dim
127SELECT
128 ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
129 t1.customer_number,
130 t1.customer_name,
131 t1.customer_street_address,
132 t1.customer_zip_code,
133 t1.customer_city,
134 t1.customer_state,
135 t1.shipping_address,
136 t1.shipping_zip_code,
137 t1.shipping_city,
138 t1.shipping_state,
139 1,
140 ${hivevar:pre_date},
141 ${hivevar:max_date}
142FROM
143(
144SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number
145 WHERE t2.customer_sk IS NULL) t1
146CROSS JOIN
147(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;
148
149-- 装载product维度
150-- 设置已删除记录和product_name、product_category列上SCD2的过期
151UPDATE product_dim
152 SET expiry_date = ${hivevar:pre_date}
153 WHERE product_dim.product_sk IN
154(SELECT a.product_sk
155 FROM (SELECT product_sk,product_code,product_name,product_category
156 FROM product_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN
157 rds.product b ON a.product_code = b.product_code
158 WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));
159
160-- 处理product_name、product_category列上SCD2的新增行
161INSERT INTO product_dim
162SELECT
163 ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,
164 t1.product_code,
165 t1.product_name,
166 t1.product_category,
167 t1.version,
168 t1.effective_date,
169 t1.expiry_date
170FROM
171(
172SELECT
173 t2.product_code product_code,
174 t2.product_name product_name,
175 t2.product_category product_category,
176 t1.version + 1 version,
177 ${hivevar:pre_date} effective_date,
178 ${hivevar:max_date} expiry_date
179 FROM product_dim t1
180INNER JOIN rds.product t2
181 ON t1.product_code = t2.product_code
182 AND t1.expiry_date = ${hivevar:pre_date}
183 LEFT JOIN product_dim t3
184 ON t1.product_code = t3.product_code
185 AND t3.expiry_date = ${hivevar:max_date}
186WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1
187CROSS JOIN
188(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;
189
190-- 处理新增的product记录
191INSERT INTO product_dim
192SELECT
193 ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,
194 t1.product_code,
195 t1.product_name,
196 t1.product_category,
197 1,
198 ${hivevar:pre_date},
199 ${hivevar:max_date}
200FROM
201(
202SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code
203 WHERE t2.product_sk IS NULL) t1
204CROSS JOIN
205(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;
206
207-- 装载order维度
208INSERT INTO order_dim
209SELECT
210 ROW_NUMBER() OVER (ORDER BY t1.order_number) + t2.sk_max,
211 t1.order_number,
212 t1.version,
213 t1.effective_date,
214 t1.expiry_date
215 FROM
216(
217SELECT
218 order_number order_number,
219 1 version,
220 order_date effective_date,
221 '2200-01-01' expiry_date
222 FROM rds.sales_order, rds.cdc_time
223 WHERE entry_date >= last_load AND entry_date < current_load ) t1
224CROSS JOIN
225(SELECT COALESCE(MAX(order_sk),0) sk_max FROM order_dim) t2;
226
227-- 装载销售订单事实表
228INSERT INTO sales_order_fact
229SELECT
230 order_sk,
231 customer_sk,
232 product_sk,
233 date_sk,
234 order_amount,
235 order_quantity
236 FROM
237 rds.sales_order a,
238 order_dim b,
239 customer_dim c,
240 product_dim d,
241 date_dim e,
242 rds.cdc_time f
243 WHERE
244 a.order_number = b.order_number
245AND a.customer_number = c.customer_number
246AND a.order_date >= c.effective_date
247AND a.order_date < c.expiry_date
248AND a.product_code = d.product_code
249AND a.order_date >= d.effective_date
250AND a.order_date < d.expiry_date
251AND to_date(a.order_date) = e.date
252AND a.entry_date >= f.last_load AND a.entry_date < f.current_load ;
253
254-- 更新时间戳表的last_load字段
255INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;
256 |
(1)执行下面的SQL脚本准备客户和销售订单测试数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| 1USE source;
2
3/***
4客户数据的改变如下:
5更新已有八个客户的送货地址
6新增客户9
7***/
8UPDATE customer SET
9 shipping_address = customer_street_address
10, shipping_zip_code = customer_zip_code
11, shipping_city = customer_city
12, shipping_state = customer_state ;
13
14INSERT INTO customer
15(customer_name
16, customer_street_address
17, customer_zip_code
18, customer_city
19, customer_state
20, shipping_address
21, shipping_zip_code
22, shipping_city
23, shipping_state)
24VALUES
25('Online Distributors'
26, '2323 Louise Dr.'
27, 17055
28, 'Pittsburgh'
29, 'PA'
30, '2323 Louise Dr.'
31, 17055
32, 'Pittsburgh'
33, 'PA') ;
34
35/***
36新增订单日期为2016年7月12日的9条订单。
37***/
38SET @start_date := unix_timestamp('2016-07-12');
39SET @end_date := unix_timestamp('2016-07-13');
40DROP TABLE IF EXISTS temp_sales_order_data;
41CREATE TABLE temp_sales_order_data AS SELECT * FROM sales_order WHERE 1=0;
42
43SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
44SET @amount := floor(1000 + rand() * 9000);
45SET @quantity := floor(10 + rand() * 90);
46INSERT INTO temp_sales_order_data VALUES (117, 1, 1, @order_date, @order_date, @amount, @quantity);
47
48SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
49SET @amount := floor(1000 + rand() * 9000);
50SET @quantity := floor(10 + rand() * 90);
51INSERT INTO temp_sales_order_data VALUES (118, 2, 2, @order_date, @order_date, @amount, @quantity);
52
53SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
54SET @amount := floor(1000 + rand() * 9000);
55SET @quantity := floor(10 + rand() * 90);
56INSERT INTO temp_sales_order_data VALUES (119, 3, 3, @order_date, @order_date, @amount, @quantity);
57
58SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
59SET @amount := floor(1000 + rand() * 9000);
60SET @quantity := floor(10 + rand() * 90);
61INSERT INTO temp_sales_order_data VALUES (120, 4, 4, @order_date, @order_date, @amount, @quantity);
62
63SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
64SET @amount := floor(1000 + rand() * 9000);
65SET @quantity := floor(10 + rand() * 90);
66INSERT INTO temp_sales_order_data VALUES (121, 5, 1, @order_date, @order_date, @amount, @quantity);
67
68SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
69SET @amount := floor(1000 + rand() * 9000);
70SET @quantity := floor(10 + rand() * 90);
71INSERT INTO temp_sales_order_data VALUES (122, 6, 2, @order_date, @order_date, @amount, @quantity);
72
73SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
74SET @amount := floor(1000 + rand() * 9000);
75SET @quantity := floor(10 + rand() * 90);
76INSERT INTO temp_sales_order_data VALUES (123, 7, 3, @order_date, @order_date, @amount, @quantity);
77
78SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
79SET @amount := floor(1000 + rand() * 9000);
80SET @quantity := floor(10 + rand() * 90);
81INSERT INTO temp_sales_order_data VALUES (124, 8, 4, @order_date, @order_date, @amount, @quantity);
82
83SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
84SET @amount := floor(1000 + rand() * 9000);
85SET @quantity := floor(10 + rand() * 90);
86INSERT INTO temp_sales_order_data VALUES (125, 9, 1, @order_date, @order_date, @amount, @quantity);
87
88INSERT INTO sales_order
89SELECT NULL,customer_number,product_code,order_date,entry_date,order_amount,order_quantity FROM temp_sales_order_data ORDER BY order_date;
90
91COMMIT ;
92 |
修改后的销售订单源数据如下图所示。
(2)执行定期装载并查看结果。
使用下面的命令执行定期装载。
1 2 3 4 5 6 7 8 9 10 11
| 1use dw;
2select customer_number no,
3 customer_name name,
4 shipping_city,
5 shipping_zip_code zip,
6 shipping_state st,
7 version ver,
8 effective_date eff,
9 expiry_date exp
10 from customer_dim;
11 |
1 2
| 1 已存在客户的新记录有了送货地址。老的(过期)记录没有。9号客户是新加的,具有送货地址。如下图所示。
2 |
1 2 3 4 5 6 7 8 9
| 1select order_sk o_sk,
2 customer_sk c_sk,
3 product_sk p_sk,
4 order_date_sk od_sk,
5 order_amount amt,
6 order_quantity qty
7 from sales_order_fact
8cluster by o_sk;
9 |
1 2
| 1 只有9个订单有数量,老的销售数据没有。如下图所示。
2 |
1 2
| 1select * from rds.cdc_time;
2 |
1 2
| 1 时间戳表的最后装载日期已经更新。如下图所示。
2 |
至此,增加列所需的修改已完成。