上一篇已经将单条查重语句调整到最优,但该语句是以单线程方式执行。能否利用多处理器,让去重操作多线程并行执行,从而进一步提高速度呢?比如我的实验环境是4处理器,如果使用4个线程同时执行查重sql,理论上应该接近4倍的性能提升。
一、数据分片
我们生成测试数据时,created_time采用每条记录加一秒的方式,也就是最大和在最小的时间差为50万秒,而且数据均匀分布。因此先把数据平均分成4份。
-
查询出4份数据的created_time边界值
1
2
3
4
5
6 1select date_add('2017-01-01',interval 125000 second) dt1,
2 date_add('2017-01-01',interval 2*125000 second) dt2,
3 date_add('2017-01-01',interval 3*125000 second) dt3,
4 max(created_time) dt4
5 from t_source;
6
1
2 1 查询结果如图一所示。
2
图一
-
查看每份数据的记录数,确认数据平均分布
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 1select case when created_time >= '2017-01-01'
2 and created_time < '2017-01-02 10:43:20'
3 then '2017-01-01'
4 when created_time >= '2017-01-02 10:43:20'
5 and created_time < '2017-01-03 21:26:40'
6 then '2017-01-02 10:43:20'
7 when created_time >= '2017-01-03 21:26:40'
8 and created_time < '2017-01-05 08:10:00'
9 then '2017-01-03 21:26:40'
10 else '2017-01-05 08:10:00'
11 end min_dt,
12 case when created_time >= '2017-01-01'
13 and created_time < '2017-01-02 10:43:20'
14 then '2017-01-02 10:43:20'
15 when created_time >= '2017-01-02 10:43:20'
16 and created_time < '2017-01-03 21:26:40'
17 then '2017-01-03 21:26:40'
18 when created_time >= '2017-01-03 21:26:40'
19 and created_time < '2017-01-05 08:10:00'
20 then '2017-01-05 08:10:00'
21 else '2017-01-06 18:53:20'
22 end max_dt,
23 count(*)
24 from t_source
25 group by case when created_time >= '2017-01-01'
26 and created_time < '2017-01-02 10:43:20'
27 then '2017-01-01'
28 when created_time >= '2017-01-02 10:43:20'
29 and created_time < '2017-01-03 21:26:40'
30 then '2017-01-02 10:43:20'
31 when created_time >= '2017-01-03 21:26:40'
32 and created_time < '2017-01-05 08:10:00'
33 then '2017-01-03 21:26:40'
34 else '2017-01-05 08:10:00'
35 end,
36 case when created_time >= '2017-01-01'
37 and created_time < '2017-01-02 10:43:20'
38 then '2017-01-02 10:43:20'
39 when created_time >= '2017-01-02 10:43:20'
40 and created_time < '2017-01-03 21:26:40'
41 then '2017-01-03 21:26:40'
42 when created_time >= '2017-01-03 21:26:40'
43 and created_time < '2017-01-05 08:10:00'
44 then '2017-01-05 08:10:00'
45 else '2017-01-06 18:53:20'
46 end;
47
查询结果如图二所示。
图二
4份数据的并集应该覆盖整个源数据集,并且数据之间是不重复的。也就是说4份数据的created_time要连续且互斥,连续保证处理全部数据,互斥确保了不需要二次查重。实际上这和时间范围分区的概念类似,或许用分区表更好些,只是这里省略了重建表的步骤。
3. 建立查重的存储过程
有了以上信息我们就可以写出4条语句处理全部数据。为了调用接口尽量简单,建立下面的存储过程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 1delimiter //
2create procedure sp_unique(i smallint)
3begin
4 set @a:='0000-00-00 00:00:00';
5 set @b:=' ';
6 if (i<4) then
7 insert into t_target
8 select * from t_source force index (idx_sort)
9 where created_time >= date_add('2017-01-01',interval (i-1)*125000 second)
10 and created_time < date_add('2017-01-01',interval i*125000 second)
11 and (@a!=created_time or @b!=item_name)
12 and (@a:=created_time) is not null
13 and (@b:=item_name) is not null
14 order by created_time,item_name;
15 commit;
16 else
17 insert into t_target
18 select * from t_source force index (idx_sort)
19 where created_time >= date_add('2017-01-01',interval (i-1)*125000 second)
20 and created_time <= date_add('2017-01-01',interval i*125000 second)
21 and (@a!=created_time or @b!=item_name)
22 and (@a:=created_time) is not null
23 and (@b:=item_name) is not null
24 order by created_time,item_name;
25 commit;
26 end if;
27end
28//
29
30delimiter ;
31
查询的执行计划都如图三所示。
图三
mysql优化器进行索引范围扫描,并且使用索引条件下推(ICP)优化查询。
二、并行执行
下面分别使用shell后台进程和MySQL Schedule Event实现并行。
- shell后台进程
(1)建立duplicate_removal.sh文件,内容如下。
1
2
3
4
5
6
7
8
9
10
11 1#!/bin/bash
2mysql -vvv -u root -p123456 test -e "truncate t_target" &>/dev/null
3date '+%H:%M.%N'
4for y in {1..4}
5do
6 sql="call sp_unique($y)"
7 mysql -vvv -u root -p123456 test -e "$sql" &>par_sql1_$y.log &
8done
9wait
10date '+%H:%M.%N'
11
(2)执行脚本文件
1
2
3 1chmod 755 duplicate_removal.sh
2./duplicate_removal.sh
3
执行输出入图四所示。
图四
这种方法用时3.4秒,并行执行的4个过程调用分别用时如图五所示。
图五
可以看到,每个过程的执行时间均不到3.4秒,因为是并行执行,总的过程执行时间也小于3.4秒,
比单线程sql速度提高了近3倍。
2. MySQL Schedule Event
吴老师也用到了并行,但他是利用MySQL自带的Schedule Event功能实现的,代码应该和下面的类似。
(1)建立事件历史日志表
1
2
3
4
5
6
7
8
9
10
11
12 1-- 用于查看事件执行时间等信息
2create table t_event_history (
3 dbname varchar(128) not null default '',
4 eventname varchar(128) not null default '',
5 starttime datetime(3) not null default '0000-00-00 00:00:00',
6 endtime datetime(3) default null,
7 issuccess int(11) default null,
8 duration int(11) default null,
9 errormessage varchar(512) default null,
10 randno int(11) default null
11);
12
(2)修改event_scheduler参数
1
2 1set global event_scheduler = 1;
2
(3)为每个并发线程创建一个事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119 1delimiter //
2create event ev1 on schedule at current_timestamp + interval 1 hour on completion preserve disable do
3begin
4 declare r_code char(5) default '00000';
5 declare r_msg text;
6 declare v_error integer;
7 declare v_starttime datetime default now(3);
8 declare v_randno integer default floor(rand()*100001);
9
10 insert into t_event_history (dbname,eventname,starttime,randno)
11 #作业名
12 values(database(),'ev1', v_starttime,v_randno);
13
14 begin
15 #异常处理段
16 declare continue handler for sqlexception
17 begin
18 set v_error = 1;
19 get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;
20 end;
21
22 #此处为实际调用的用户程序过程
23 call sp_unique(1);
24 end;
25
26 update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;
27
28end
29//
30
31create event ev2 on schedule at current_timestamp + interval 1 hour on completion preserve disable do
32begin
33 declare r_code char(5) default '00000';
34 declare r_msg text;
35 declare v_error integer;
36 declare v_starttime datetime default now(3);
37 declare v_randno integer default floor(rand()*100001);
38
39 insert into t_event_history (dbname,eventname,starttime,randno)
40 #作业名
41 values(database(),'ev2', v_starttime,v_randno);
42
43 begin
44 #异常处理段
45 declare continue handler for sqlexception
46 begin
47 set v_error = 1;
48 get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;
49 end;
50
51 #此处为实际调用的用户程序过程
52 call sp_unique(2);
53 end;
54
55 update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;
56
57end
58//
59
60create event ev3 on schedule at current_timestamp + interval 1 hour on completion preserve disable do
61begin
62 declare r_code char(5) default '00000';
63 declare r_msg text;
64 declare v_error integer;
65 declare v_starttime datetime default now(3);
66 declare v_randno integer default floor(rand()*100001);
67
68 insert into t_event_history (dbname,eventname,starttime,randno)
69 #作业名
70 values(database(),'ev3', v_starttime,v_randno);
71
72 begin
73 #异常处理段
74 declare continue handler for sqlexception
75 begin
76 set v_error = 1;
77 get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;
78 end;
79
80 #此处为实际调用的用户程序过程
81 call sp_unique(3);
82 end;
83
84 update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;
85
86end
87//
88
89create event ev4 on schedule at current_timestamp + interval 1 hour on completion preserve disable do
90begin
91 declare r_code char(5) default '00000';
92 declare r_msg text;
93 declare v_error integer;
94 declare v_starttime datetime default now(3);
95 declare v_randno integer default floor(rand()*100001);
96
97 insert into t_event_history (dbname,eventname,starttime,randno)
98 #作业名
99 values(database(),'ev4', v_starttime,v_randno);
100
101 begin
102 #异常处理段
103 declare continue handler for sqlexception
104 begin
105 set v_error = 1;
106 get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;
107 end;
108
109 #此处为实际调用的用户程序过程
110 call sp_unique(4);
111 end;
112
113 update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;
114
115end
116//
117
118delimiter ;
119
说明:为了记录每个事件执行的时间,在事件定义中增加了操作日志表的逻辑,因为每个事件中只多执行了一条insert,一条update,4个事件总共多执行8条很简单的语句,对测试的影响可以忽略不计。执行时间精确到毫秒。
(4)触发事件执行
1
2 1mysql -vvv -u root -p123456 test -e "truncate t_target;alter event ev1 on schedule at current_timestamp enable;alter event ev2 on schedule at current_timestamp enable;alter event ev3 on schedule at current_timestamp enable;alter event ev4 on schedule at current_timestamp enable;"
2
说明:该命令行顺序触发了4个事件,但不会等前一个执行完才执行下一个,而是立即向下执行。从图六的输出也可以清楚地看到这一点。因此四次过程调用是并行执行的。
图六
(5)查看事件执行日志
1
2 1select * from t_event_history;
2
查询结果如图7所示。
图七
可以看到,每个过程的执行均为3.5秒,又因为是并行执行的,因此
总的执行之间也是3.5秒,优化效果和shell后台进程方式几乎相同。
参考:
Increasing slow query performance with the parallel query execution
Mysql Event 调度历史记录