将MySQL去重操作优化到极致之三弹连发(二):多线程并行执行

释放双眼,带上耳机,听听看~!

        上一篇已经将单条查重语句调整到最优,但该语句是以单线程方式执行。能否利用多处理器,让去重操作多线程并行执行,从而进一步提高速度呢?比如我的实验环境是4处理器,如果使用4个线程同时执行查重sql,理论上应该接近4倍的性能提升。
一、数据分片
我们生成测试数据时,created_time采用每条记录加一秒的方式,也就是最大和在最小的时间差为50万秒,而且数据均匀分布。因此先把数据平均分成4份。

  1. 查询出4份数据的created_time边界值


1
2
3
4
5
6
1select date_add('2017-01-01',interval 125000 second) dt1,
2       date_add('2017-01-01',interval 2*125000 second) dt2,
3       date_add('2017-01-01',interval 3*125000 second) dt3,
4       max(created_time) dt4
5  from t_source;
6

1
2
1        查询结果如图一所示。  
2

图一

  1. 查看每份数据的记录数,确认数据平均分布


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
1select case when created_time >= '2017-01-01'
2             and created_time < '2017-01-02 10:43:20'
3            then '2017-01-01'
4            when created_time >= '2017-01-02 10:43:20'
5             and created_time < '2017-01-03 21:26:40'
6            then '2017-01-02 10:43:20'
7            when created_time >= '2017-01-03 21:26:40'
8             and created_time < '2017-01-05 08:10:00'
9            then '2017-01-03 21:26:40'
10            else '2017-01-05 08:10:00'
11        end min_dt,
12       case when created_time >= '2017-01-01'
13             and created_time < '2017-01-02 10:43:20'
14            then '2017-01-02 10:43:20'
15            when created_time >= '2017-01-02 10:43:20'
16             and created_time < '2017-01-03 21:26:40'
17            then '2017-01-03 21:26:40'
18            when created_time >= '2017-01-03 21:26:40'
19             and created_time < '2017-01-05 08:10:00'
20            then '2017-01-05 08:10:00'
21            else '2017-01-06 18:53:20'
22        end max_dt,
23       count(*)
24  from t_source
25 group by case when created_time >= '2017-01-01'
26             and created_time < '2017-01-02 10:43:20'
27            then '2017-01-01'
28            when created_time >= '2017-01-02 10:43:20'
29             and created_time < '2017-01-03 21:26:40'
30            then '2017-01-02 10:43:20'
31            when created_time >= '2017-01-03 21:26:40'
32             and created_time < '2017-01-05 08:10:00'
33            then '2017-01-03 21:26:40'
34            else '2017-01-05 08:10:00'
35        end,
36       case when created_time >= '2017-01-01'
37             and created_time < '2017-01-02 10:43:20'
38            then '2017-01-02 10:43:20'
39            when created_time >= '2017-01-02 10:43:20'
40             and created_time < '2017-01-03 21:26:40'
41            then '2017-01-03 21:26:40'
42            when created_time >= '2017-01-03 21:26:40'
43             and created_time < '2017-01-05 08:10:00'
44            then '2017-01-05 08:10:00'
45            else '2017-01-06 18:53:20'
46        end;
47

        查询结果如图二所示。

图二

        4份数据的并集应该覆盖整个源数据集,并且数据之间是不重复的。也就是说4份数据的created_time要连续且互斥,连续保证处理全部数据,互斥确保了不需要二次查重。实际上这和时间范围分区的概念类似,或许用分区表更好些,只是这里省略了重建表的步骤。
3. 建立查重的存储过程
有了以上信息我们就可以写出4条语句处理全部数据。为了调用接口尽量简单,建立下面的存储过程。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1delimiter //
2create procedure sp_unique(i smallint)    
3begin    
4    set @a:='0000-00-00 00:00:00';  
5    set @b:=' ';  
6   if (i<4) then
7        insert into t_target  
8        select * from t_source force index (idx_sort)  
9         where created_time >= date_add('2017-01-01',interval (i-1)*125000 second)
10           and created_time < date_add('2017-01-01',interval i*125000 second)
11           and (@a!=created_time or @b!=item_name)
12           and (@a:=created_time) is not null
13           and (@b:=item_name) is not null  
14         order by created_time,item_name;  
15        commit;
16    else
17  insert into t_target  
18        select * from t_source force index (idx_sort)  
19         where created_time >= date_add('2017-01-01',interval (i-1)*125000 second)
20           and created_time <= date_add('2017-01-01',interval i*125000 second)
21           and (@a!=created_time or @b!=item_name)
22           and (@a:=created_time) is not null
23           and (@b:=item_name) is not null  
24         order by created_time,item_name;  
25        commit;
26    end if;  
27end    
28//
29
30delimiter ;
31

        查询的执行计划都如图三所示。

图三

        mysql优化器进行索引范围扫描,并且使用索引条件下推(ICP)优化查询。
二、并行执行
下面分别使用shell后台进程和MySQL Schedule Event实现并行。

  1. shell后台进程

(1)建立duplicate_removal.sh文件,内容如下。


1
2
3
4
5
6
7
8
9
10
11
1#!/bin/bash
2mysql -vvv -u root -p123456 test -e "truncate t_target" &>/dev/null
3date '+%H:%M.%N'
4for y in {1..4}
5do
6  sql="call sp_unique($y)"
7  mysql -vvv -u root -p123456 test -e "$sql" &>par_sql1_$y.log &
8done
9wait
10date '+%H:%M.%N'
11

(2)执行脚本文件


1
2
3
1chmod 755 duplicate_removal.sh
2./duplicate_removal.sh
3

        执行输出入图四所示。

图四

        
这种方法用时3.4秒,并行执行的4个过程调用分别用时如图五所示。

图五

        可以看到,每个过程的执行时间均不到3.4秒,因为是并行执行,总的过程执行时间也小于3.4秒,
比单线程sql速度提高了近3倍。
2. MySQL Schedule Event
吴老师也用到了并行,但他是利用MySQL自带的Schedule Event功能实现的,代码应该和下面的类似。

(1)建立事件历史日志表


1
2
3
4
5
6
7
8
9
10
11
12
1-- 用于查看事件执行时间等信息
2create table t_event_history  (  
3   dbname  varchar(128) not null default '',  
4   eventname  varchar(128) not null default '',  
5   starttime  datetime(3) not null default '0000-00-00 00:00:00',  
6   endtime  datetime(3) default null,  
7   issuccess  int(11) default null,  
8   duration  int(11) default null,  
9   errormessage  varchar(512) default null,  
10   randno  int(11) default null
11);  
12

(2)修改event_scheduler参数


1
2
1set global event_scheduler = 1;
2

(3)为每个并发线程创建一个事件


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
1delimiter //
2create event ev1 on schedule at current_timestamp + interval 1 hour on completion preserve disable do
3begin
4    declare r_code char(5) default '00000';  
5    declare r_msg text;  
6    declare v_error integer;  
7    declare v_starttime datetime default now(3);  
8    declare v_randno integer default floor(rand()*100001);  
9      
10    insert into t_event_history (dbname,eventname,starttime,randno)
11    #作业名   
12    values(database(),'ev1', v_starttime,v_randno);    
13    
14    begin  
15        #异常处理段  
16        declare continue handler for sqlexception    
17        begin  
18            set v_error = 1;  
19            get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;  
20        end;  
21          
22        #此处为实际调用的用户程序过程  
23        call sp_unique(1);  
24    end;  
25      
26    update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;  
27      
28end
29//    
30
31create event ev2 on schedule at current_timestamp + interval 1 hour on completion preserve disable do
32begin
33    declare r_code char(5) default '00000';  
34    declare r_msg text;  
35    declare v_error integer;  
36    declare v_starttime datetime default now(3);  
37    declare v_randno integer default floor(rand()*100001);  
38      
39    insert into t_event_history (dbname,eventname,starttime,randno)
40    #作业名   
41    values(database(),'ev2', v_starttime,v_randno);    
42    
43    begin  
44        #异常处理段  
45        declare continue handler for sqlexception    
46        begin  
47            set v_error = 1;  
48            get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;  
49        end;  
50          
51        #此处为实际调用的用户程序过程  
52        call sp_unique(2);  
53    end;  
54      
55    update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;  
56      
57end
58//  
59
60create event ev3 on schedule at current_timestamp + interval 1 hour on completion preserve disable do
61begin
62    declare r_code char(5) default '00000';  
63    declare r_msg text;  
64    declare v_error integer;  
65    declare v_starttime datetime default now(3);  
66    declare v_randno integer default floor(rand()*100001);  
67      
68    insert into t_event_history (dbname,eventname,starttime,randno)
69    #作业名   
70    values(database(),'ev3', v_starttime,v_randno);    
71    
72    begin  
73        #异常处理段  
74        declare continue handler for sqlexception    
75        begin  
76            set v_error = 1;  
77            get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;  
78        end;  
79          
80        #此处为实际调用的用户程序过程  
81        call sp_unique(3);  
82    end;  
83      
84    update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;  
85      
86end
87//  
88
89create event ev4 on schedule at current_timestamp + interval 1 hour on completion preserve disable do
90begin
91    declare r_code char(5) default '00000';  
92    declare r_msg text;  
93    declare v_error integer;  
94    declare v_starttime datetime default now(3);  
95    declare v_randno integer default floor(rand()*100001);  
96      
97    insert into t_event_history (dbname,eventname,starttime,randno)
98    #作业名   
99    values(database(),'ev4', v_starttime,v_randno);    
100    
101    begin  
102        #异常处理段  
103        declare continue handler for sqlexception    
104        begin  
105            set v_error = 1;  
106            get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;  
107        end;  
108          
109        #此处为实际调用的用户程序过程  
110        call sp_unique(4);  
111    end;  
112      
113    update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;  
114      
115end
116//
117
118delimiter ;  
119

        说明:为了记录每个事件执行的时间,在事件定义中增加了操作日志表的逻辑,因为每个事件中只多执行了一条insert,一条update,4个事件总共多执行8条很简单的语句,对测试的影响可以忽略不计。执行时间精确到毫秒。

(4)触发事件执行


1
2
1mysql -vvv -u root -p123456 test -e "truncate t_target;alter event ev1 on schedule at current_timestamp enable;alter event ev2 on schedule at current_timestamp enable;alter event ev3 on schedule at current_timestamp enable;alter event ev4 on schedule at current_timestamp enable;"
2

        说明:该命令行顺序触发了4个事件,但不会等前一个执行完才执行下一个,而是立即向下执行。从图六的输出也可以清楚地看到这一点。因此四次过程调用是并行执行的。

图六
(5)查看事件执行日志


1
2
1select * from t_event_history;
2

        查询结果如图7所示。

图七

        可以看到,每个过程的执行均为3.5秒,又因为是并行执行的,因此
总的执行之间也是3.5秒,优化效果和shell后台进程方式几乎相同。
参考:
Increasing slow query performance with the parallel query execution
Mysql Event 调度历史记录

给TA打赏
共{{data.count}}人
人已打赏
安全运维

OpenSSH-8.7p1离线升级修复安全漏洞

2021-10-23 10:13:25

安全运维

设计模式的设计原则

2021-12-12 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索