Netty游戏服务器实战开发(8):利用redis或者zookeeper实现3pc分布式事务锁(二)。支撑腾讯系列某手游百万级流量公测

释放双眼,带上耳机,听听看~!

导读:在上篇文章中介绍了分布式事务项目的基本原理和工程组件,我们了解到了分布式事务的理论知识。处于实战的经验,我们将理论知识使用到实际项目中。所以我们将借助idea中maven工程 来实战我们的项目。

回到正文:

在上篇文章中我们已经把需要的准备工作做好了。现在我们需要将如何实现分布式3PC事务提交锁。

先睹为快

首先我们先来体验一下事务提交锁的过程,在本项目中我们将在Windows环境下搭建redis环境和zookeeper环境。下面就是我们只需一段分布式加锁程序的过程。一段执行锁发生异常的行为:

定义事物锁的类型:
我们使用分布式事务锁的时候我们需要提供如下几种类型的锁:

  • 1:写锁,WRITE
  • 2:读锁,READ
  • 3:独占写锁,到时间释放:WRITE_TIME
  • 4:强制时间锁,无论获取锁成功,强制时间锁, 到时间时间释放,FORCE_WRITE_TIME

更具上面分析我们将定义一个枚举类来枚举上面所需要的锁。NettyTransactionLockType.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1package com.twjitm.transaction.transaction.enums;
2
3/**
4 * 事物锁类型
5 *
6 * @author twjitm- [Created on 2018-08-27 11:50]
7 * @jdk java version "1.8.0_77"
8 */
9public enum  NettyTransactionLockType {
10    /**独自占有锁 */
11    WRITE,
12    /**读锁*/
13    READ,
14    /** 独自写占有,到时间才会释放锁*/
15    WRITE_TIME,
16    /** 独自写占有,无论获取锁成功,强制时间锁, 到时间时间释放*/
17    FORCE_WRITE_TIME,
18    ;
19}
20
21
22

有了锁的枚举,那么我们将来实现事务实体的行为,试想一下。一个事务的执行过程。由如下行为:

  • 1:是否能够创建锁
  • 2:是否能够尝试提交锁
  • 3:是否能够正式提交锁
  • 4:提交过程是否发生异常
  • 5:发生异常按照不同粒度回滚锁
  • 6:回滚锁是否发生异常
  • 7:是否释放锁
  • 8:释放锁是否发生异常
  • 9:补偿提交锁

特别说明:事务、事务实体、和事务锁不是一个概念。不要混淆了。

根据上面说的我们来定义这些事务实体行为接口。来细化事务的每个阶段。NettyTransactionEntityInterface.java 接口


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
1package com.twjitm.transaction.transaction.entity;
2
3import com.twjitm.transaction.lock.NettyTransactionLockInterface;
4import com.twjitm.transaction.transaction.enums.NettyTransactionCommitResult;
5import com.twjitm.transaction.transaction.exception.NettyTransactionException;
6
7/**
8 * @author twjitm- [Created on 2018-08-27 10:16]
9 * @jdk java version "1.8.0_77"
10 */
11public interface NettyTransactionEntityInterface {
12
13    /**
14     * 事物提交后需要执行的逻辑,提交这个事物主要
15     * 的业务逻辑
16     *
17     * @throws NettyTransactionException
18     */
19    void commit() throws NettyTransactionException;
20
21
22    /**
23     * 回滚,事务在执行过程中发生异常后需要回滚操作的逻辑
24     *
25     * @throws NettyTransactionException
26     */
27    void rollback() throws NettyTransactionException;
28
29    /**
30     * 尝试性提交
31     */
32    NettyTransactionCommitResult tryCommit() throws NettyTransactionException;
33
34
35    /**
36     * 是否可以创建锁
37     *
38     * @return
39     */
40    boolean createNettyTransactionLock(long seconds) throws NettyTransactionException;
41
42    /**
43     * 释放锁
44     *
45     * @return
46     */
47    void releaseNettyTransactionLock();
48
49    /**
50     * 强制释放锁
51     *
52     * @return
53     */
54    void forceReleaseNettyTransactionLock();
55
56    String getInfo();
57
58    /**
59     * 是否需要执行
60     *
61     * @return
62     */
63    boolean needCommit();
64
65    /**
66     * 获取锁内容
67     *
68     * @return
69     */
70    NettyTransactionLockInterface getNettyTransactionLockInterface();
71}
72
73
74

一个事务实体需要由事务的操作。所以我们定义个事务实体来实现事务的行为操作。所以需要实现事务行为接口,由于系统中既对redis实现方式的支持也对zookeeper实现的方式支持。所以我们需要定义两种类型的事务实体。首先我们来描述一下redis实现的事务实体。

AbstractNettyTransactionEntity.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
1package com.twjitm.transaction.transaction.entity;
2
3import com.twjitm.transaction.lock.NettyTransactionLock;
4import com.twjitm.transaction.lock.NettyTransactionLockInterface;
5import com.twjitm.transaction.lock.NettyTransactionReadLock;
6import com.twjitm.transaction.service.redis.NettyTransactionRedisService;
7import com.twjitm.transaction.transaction.enums.NettyTransactionEntityCause;
8import com.twjitm.transaction.transaction.enums.NettyTransactionLockType;
9import com.twjitm.transaction.transaction.exception.NettyTransactionException;
10
11import java.util.BitSet;
12
13/**
14 * 基于redis实现分布式事物锁
15 * 抽象事物实体类,所有的事物锁实体必须继承
16 * 本抽象类,实现自己的commit后的方法和rollback方法等用户实现方法
17 * <p>
18 * 这些方法不是创建锁或者回滚锁方法,而是实现的是业务方法。需要注意的是
19 * 不能再次方法中创建和锁字段一样的字段,否者造成数据一致性错误
20 *
21 * @author twjitm- [Created on 2018-08-27 11:48]
22 * @jdk java version "1.8.0_77"
23 */
24public abstract class AbstractNettyTransactionEntity implements NettyTransactionEntityInterface {
25    /**
26     * 进度设置集合 主要用于rollback
27     */
28    private BitSet progressBitSet;
29
30    /**
31     * 事务锁
32     */
33    private NettyTransactionLockInterface nettyTransactionLock;
34
35    /**
36     * 锁类型
37     */
38    private NettyTransactionLockType nettyTransactionLockType;
39
40    /**
41     * 锁的正向标志(主要用于读取的时候)
42     */
43    private boolean rejectFlag = false;
44
45    public AbstractNettyTransactionEntity(NettyTransactionEntityCause cause,
46                                          String key,
47                                          NettyTransactionRedisService redisService) {
48        this.progressBitSet = new BitSet();
49        this.nettyTransactionLock = new NettyTransactionLock(key, redisService, cause);
50        this.nettyTransactionLockType = NettyTransactionLockType.WRITE;
51    }
52
53    /**
54     * 抽象分布式事物实体
55     *
56     * @param cause                    事物产生原因
57     * @param key                      事物key
58     * @param redisService             事物支持的redis服务
59     * @param nettyTransactionLockType 事物锁类型
60     */
61    public AbstractNettyTransactionEntity(NettyTransactionEntityCause cause,
62                                          String key,
63                                          NettyTransactionRedisService redisService,
64                                          NettyTransactionLockType nettyTransactionLockType) {
65        this.progressBitSet = new BitSet();
66        if (nettyTransactionLockType.equals(NettyTransactionLockType.READ)) {
67            this.nettyTransactionLock = new NettyTransactionReadLock(key, redisService, cause);
68            this.nettyTransactionLockType = NettyTransactionLockType.READ;
69        } else {
70            this.nettyTransactionLock = new NettyTransactionLock(key, redisService, cause);
71            this.nettyTransactionLockType = NettyTransactionLockType.WRITE;
72        }
73    }
74
75    public AbstractNettyTransactionEntity(NettyTransactionEntityCause cause,
76                                          String key,
77                                          NettyTransactionRedisService redisService,
78                                          NettyTransactionLockType nettyTransactionLockType, int lockTime) {
79        this.progressBitSet = new BitSet();
80        if (nettyTransactionLockType.equals(NettyTransactionLockType.READ)) {
81            this.nettyTransactionLock = new NettyTransactionReadLock(key, redisService, cause);
82            this.nettyTransactionLockType = NettyTransactionLockType.READ;
83        }
84        //独占锁
85        else if (nettyTransactionLockType.equals(NettyTransactionLockType.FORCE_WRITE_TIME)) {
86            this.nettyTransactionLock = new NettyTransactionLock(key, redisService, cause, lockTime, true);
87        } else {
88            //非独占锁
89            this.nettyTransactionLock = new NettyTransactionLock(key, redisService, cause, lockTime, false);
90        }
91        this.nettyTransactionLockType = nettyTransactionLockType;
92    }
93
94
95    /**
96     * 是否能够创建事物锁
97     *
98     * @param seconds
99     * @return
100     * @throws NettyTransactionException
101     */
102    @Override
103    public boolean createNettyTransactionLock(long seconds) throws NettyTransactionException {
104        boolean result = nettyTransactionLock.create(seconds);
105        if (rejectFlag) {
106            result = !result;
107        }
108        return result;
109    }
110
111    public void setRejectFlag(boolean rejectFlag) {
112        this.rejectFlag = rejectFlag;
113    }
114
115    /**
116     * 释放锁
117     */
118    @Override
119    public void releaseNettyTransactionLock() {
120        if (this.nettyTransactionLockType.equals(NettyTransactionLockType.FORCE_WRITE_TIME) || this.nettyTransactionLockType.equals(NettyTransactionLockType.WRITE_TIME)) {
121            return;
122        }
123        this.nettyTransactionLock.destroy();
124    }
125
126    /**
127     * 记录事务提交的进度,用于回滚操作。
128     * 根据进度进行不同程度的回滚
129     *
130     * @param step
131     */
132    public void setTransactionCommitProgress(int step) {
133        if (progressBitSet != null) {
134            progressBitSet.set(step);
135        }
136    }
137
138    /**
139     * 检查事物锁所处于的进度状态
140     *
141     * @param step
142     * @return
143     */
144    public boolean checkTransactionCommitProgress(int step) {
145        return this.progressBitSet.get(step);
146    }
147
148
149    @Override
150    public String getInfo() {
151        return this.nettyTransactionLock.getInfo();
152    }
153
154    /**
155     * 强制释放锁
156     */
157    @Override
158    public void forceReleaseNettyTransactionLock() {
159        this.nettyTransactionLock.destroy();
160    }
161
162
163    @Override
164    public boolean needCommit() {
165        return !this.nettyTransactionLockType.equals(NettyTransactionLockType.READ);
166    }
167
168    @Override
169    public NettyTransactionLockInterface getNettyTransactionLockInterface() {
170        return this.nettyTransactionLock;
171    }
172
173    public BitSet getProgressBitSet() {
174        return progressBitSet;
175    }
176}
177
178
179

代码中的描述一样。所有的具体事务类型都必须要继承此类。保证每个事务都具有基本的操作。而更具实现类来具体实现每个事务产生的不同结果做不同处理。

下面我们来描述一下使用zookeeper的方式来实现分布式事务实体。同样事务锁也具有相同的行为。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
1package com.twjitm.transaction.transaction.entity;
2
3import com.twjitm.transaction.lock.NettyTransactionLockInterface;
4import com.twjitm.transaction.lock.NettyTransactionZkLock;
5import com.twjitm.transaction.service.zookeeper.NettyTransactionZookeeperService;
6import com.twjitm.transaction.transaction.enums.NettyTransactionEntityCause;
7import com.twjitm.transaction.transaction.enums.NettyTransactionLockType;
8import com.twjitm.transaction.transaction.exception.NettyTransactionException;
9
10import java.util.BitSet;
11
12/**
13 * 基于zookeeper 实现的分布式事物锁,、
14 * <p>
15 * zookeeper 分布式锁仅仅支持独占锁模式
16 *
17 * @author twjtim- [Created on 2018-08-29 16:18]
18 * @jdk java version "1.8.0_77"
19 */
20public abstract class AbstractNettyTransactionZkEntity implements NettyTransactionEntityInterface {
21
22    /**
23     * 进度设置集合 主要用于rollback
24     */
25    private BitSet progressBitSet;
26
27    /**
28     * 事务锁
29     */
30    private NettyTransactionLockInterface nettyTransactionLock;
31
32    /**
33     * 锁类型
34     */
35    private NettyTransactionLockType nettyTransactionLockType;
36
37
38    /**
39     * 构建一个zookeeper类型的独占锁实体对象
40     *
41     * @param cause
42     * @param key
43     * @param zookeeperService
44     */
45    public AbstractNettyTransactionZkEntity(NettyTransactionEntityCause cause,
46                                            String key,
47                                            NettyTransactionZookeeperService
48                                                    zookeeperService) {
49        this.progressBitSet = new BitSet();
50        this.nettyTransactionLock = new NettyTransactionZkLock(key,
51                zookeeperService, cause);
52        this.nettyTransactionLockType = NettyTransactionLockType.WRITE;
53
54    }
55
56
57    /**
58     * 创建一个锁
59     *
60     * @param seconds
61     * @return
62     * @throws NettyTransactionException
63     */
64    @Override
65    public boolean createNettyTransactionLock(long seconds) throws NettyTransactionException {
66
67        return this.nettyTransactionLock.create(seconds);
68    }
69
70
71    /**
72     * 记录事务提交的进度,用于回滚操作。
73     * 根据进度进行不同程度的回滚
74     *
75     * @param step
76     */
77    public void setTransactionCommitProgress(int step) {
78        if (progressBitSet != null) {
79            progressBitSet.set(step);
80        }
81    }
82
83
84    /**
85     * 检查事物锁所处于的进度状态
86     *
87     * @param step
88     * @return
89     */
90    public boolean checkTransactionCommitProgress(int step) {
91
92        return this.progressBitSet.get(step);
93    }
94
95
96    /**
97     * 释放一个锁请求
98     */
99    @Override
100    public void releaseNettyTransactionLock() {
101        this.nettyTransactionLock.destroy();
102    }
103
104    @Override
105    public void forceReleaseNettyTransactionLock() {
106        this.nettyTransactionLock.destroy();
107    }
108
109    @Override
110    public String getInfo() {
111        return this.nettyTransactionLock.getInfo() + this.nettyTransactionLockType.name();
112    }
113
114    @Override
115    public boolean needCommit() {
116        return !this.nettyTransactionLockType.equals(NettyTransactionLockType.READ);
117    }
118
119    @Override
120    public NettyTransactionLockInterface getNettyTransactionLockInterface() {
121        return this.nettyTransactionLock;
122    }
123}
124
125
126

功能和redis实现的方式是一样的,只不过底层支持的方式不太一样而已。

定义完事务实体后,我们来描述一下事务。事务实体是提供事务行为的描述,而事务本身的属性还需要在事务本身上进行定义。所以我们来定义一个抽象的事务。在定义抽象事物之前我们也要描述事物的具体操作。

有如下定义:

  • 0:创建
  • 1:尝试提交
  • 2:提交
  • 3:回滚
  • 4:释放

同样我们定义一个事务接口来描述上面的抽象事物的行为。NettyTransactionInterface


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
1package com.twjitm.transaction.transaction;
2
3import com.twjitm.transaction.transaction.enums.NettyTransactionCause;
4import com.twjitm.transaction.transaction.exception.NettyTransactionException;
5
6/**
7 * @author twjitm- [Created on 2018-08-27 10:07]
8 * @jdk java version "1.8.0_77"
9 */
10public interface NettyTransactionInterface {
11    /** 激活,构造*/
12    int ACTIVE = 0;
13    /** 尝试提交*/
14  int TRYCOMMITED = 1;
15    /** 正式提交*/
16    int COMMITED = 2;
17    /** 正式回滚*/
18    int ROLLEDBACK = 3;
19
20    /**
21     * 事务提交
22     * @throws NettyTransactionException
23     */
24     void commit() throws NettyTransactionException;
25
26    /**
27     * 事务回滚
28     * @throws NettyTransactionException
29     */
30     void rollback() throws NettyTransactionException;
31
32    /**
33     * 是否可以提交
34     * @return
35     */
36     boolean canCommit();
37
38    /**
39     * 尝试性提交
40     */
41     void tryCommit() throws NettyTransactionException;
42
43    /**
44     * 获取事务原因
45     * @return
46     */
47     NettyTransactionCause getCause();
48
49    /**
50     * 是否可以创建锁
51     * @return
52     */
53     boolean createNettyTransactionLock() throws NettyTransactionException;
54
55
56    /**
57     * 释放锁
58     * @return
59     */
60     void releaseNettyTransactionLock();
61}
62
63
64

在接口中我们还做了抽象事物状态的定义。描述事物执行到某个阶段。接下来我们需要定义一个抽象类来实现这个接口


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
1package com.twjitm.transaction.transaction;
2
3import com.twjitm.transaction.transaction.entity.NettyTransactionEntityInterface;
4import com.twjitm.transaction.transaction.enums.NettyTransactionCause;
5import com.twjitm.transaction.transaction.enums.NettyTransactionCommitResult;
6
7import java.util.ArrayList;
8import java.util.List;
9
10/**
11 * @author twjitm- [Created on 2018-08-27 10:13]
12 * @jdk java version "1.8.0_77"
13 */
14public abstract class AbstractNettyTransaction implements NettyTransactionInterface {
15
16    /**
17     * 当前执行状态
18     */
19    protected int state;
20    /**
21     * 事务实体 可以批量提交事物
22     */
23    public List<NettyTransactionEntityInterface> entities;
24    /**
25     * 事务原因
26     */
27    private NettyTransactionCause cause;
28
29    /**
30     * 游戏事务提交结果
31     */
32    protected NettyTransactionCommitResult transactionTryCommitResult;
33
34    public AbstractNettyTransaction(NettyTransactionCause cause) {
35        this.cause = cause;
36        this.entities = new ArrayList<>();
37        transactionTryCommitResult = NettyTransactionCommitResult.SUCCESS;
38        this.state = ACTIVE;
39    }
40
41    public void addEntity(NettyTransactionEntityInterface entity) {
42        entities.add(entity);
43    }
44
45    @Override
46    public NettyTransactionCause getCause() {
47        return cause;
48    }
49
50    @Override
51    public boolean canCommit() {
52        return transactionTryCommitResult.equals(NettyTransactionCommitResult.SUCCESS);
53    }
54
55    public NettyTransactionCommitResult getTransactionTryCommitResult() {
56        return transactionTryCommitResult;
57    }
58}
59
60
61

真正的事物描述是把抽象类中没有实现的接口都实现了。具有全部功能的NettyTransaction


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
1package com.twjitm.transaction.transaction;
2
3import com.twjitm.transaction.config.GlobalConstants;
4import com.twjitm.transaction.transaction.entity.NettyTransactionEntityInterface;
5import com.twjitm.transaction.transaction.enums.NettyTransactionCause;
6import com.twjitm.transaction.transaction.enums.NettyTransactionCommitResult;
7import com.twjitm.transaction.transaction.exception.NettyTransactionException;
8import com.twjitm.transaction.utils.TimeUtil;
9
10
11/**
12 * @author twjitm- [Created on 2018-08-27 10:39]
13 * @jdk java version "1.8.0_77"
14 */
15public class NettyTransaction extends AbstractNettyTransaction {
16
17
18    /**
19     * 创建事务锁等待时间
20     */
21    private long waitTime;
22
23    /**
24     * 创建事务实体:
25     *
26     * @param cause    事务产生原因
27     * @param waitTime 等待时间
28     */
29    public NettyTransaction(NettyTransactionCause cause, long waitTime) {
30        super(cause);
31        this.waitTime = waitTime;
32    }
33
34    /**
35     * 创建事务实体:
36     *
37     * @param cause 事务产生原因
38     */
39    public NettyTransaction(NettyTransactionCause cause) {
40        super(cause);
41        //默认锁最长等待时间,防止网络延迟,服务器宕机等情况。
42        this.waitTime = GlobalConstants.Lock.TRAINSTACTION_LOCK_KEY_MAX_LIFE;
43    }
44
45    @Override
46    public void commit() throws NettyTransactionException {
47        if (state != TRYCOMMITED) {
48            throw new NettyTransactionException();
49        }
50        this.state = COMMITED;
51        for (NettyTransactionEntityInterface entity : entities) {
52            if (!entity.needCommit()) {
53                continue;
54            }
55            entity.commit();
56        }
57
58
59    }
60
61    @Override
62    public void rollback() throws NettyTransactionException {
63        state = ROLLEDBACK;
64        for (NettyTransactionEntityInterface entity : entities) {
65            entity.rollback();
66        }
67    }
68
69    @Override
70    public void tryCommit() throws NettyTransactionException {
71        if (state != ACTIVE) {
72            throw new NettyTransactionException();
73        }
74        this.state = TRYCOMMITED;
75        for (NettyTransactionEntityInterface entity : entities) {
76            if (!entity.needCommit()) {
77                continue;
78            }
79            //重复提交没有成功
80            NettyTransactionCommitResult transactionCommitResult = entity.tryCommit();
81            if (!transactionCommitResult.equals(NettyTransactionCommitResult.SUCCESS)) {
82                this.transactionTryCommitResult = transactionCommitResult;
83                break;
84            }
85        }
86
87    }
88
89    /**
90     * 是否可以创建一个分布式事物锁
91     *
92     * @return
93     * @throws NettyTransactionException
94     */
95    @Override
96    public boolean createNettyTransactionLock() throws NettyTransactionException {
97        if (state != ACTIVE) {
98            throw new NettyTransactionException();
99        }
100        long startSecond = TimeUtil.getSeconds();
101        boolean createFlag;
102        if (waitTime > 0) {
103            while (true) {
104                long currSeconds = TimeUtil.getSeconds();
105                createFlag = createNettyTransactionLock(currSeconds);
106                if (createFlag) {
107                    break;
108                }
109                try {
110                    Thread.sleep(TimeUtil.SECOND);
111                } catch (Throwable e) {
112
113                }
114                currSeconds = TimeUtil.getSeconds();
115                if (startSecond + waitTime < currSeconds) {
116                    createFlag = false;
117                    break;
118                }
119            }
120        } else {
121            startSecond = TimeUtil.getSeconds();
122            createFlag = createNettyTransactionLock(startSecond);
123        }
124        return createFlag;
125    }
126
127
128    private boolean createNettyTransactionLock(long currSeconds) throws NettyTransactionException {
129        boolean createFlag = false;
130        for (NettyTransactionEntityInterface entity : entities) {
131            try {
132                createFlag = entity.createNettyTransactionLock(currSeconds);
133            } catch (Exception e) {
134                throw new NettyTransactionException(e.getMessage());
135            }
136            if (!createFlag) {
137                break;
138            }
139        }
140        return createFlag;
141    }
142
143    /**
144     * 释放锁
145     */
146    @Override
147    public void releaseNettyTransactionLock() {
148        for (NettyTransactionEntityInterface entity : entities) {
149            entity.releaseNettyTransactionLock();
150        }
151    }
152
153    @Override
154    public String toString() {
155        StringBuffer buffer = new StringBuffer();
156        buffer.append("transaction ");
157        buffer.append(getCause());
158        buffer.append(":");
159        for (int i = 0; i < entities.size(); i++) {
160            NettyTransactionEntityInterface entity = entities.get(i);
161            buffer.append(entity.getInfo());
162            if (i < entities.size() - 1) {
163                buffer.append(",");
164            }
165        }
166        return buffer.toString();
167
168
169    }
170}
171
172
173

上面就是有关事务、事务实体核心功能的描述,下面我们继续来介绍事务锁的描述和定义,同样的方法我们在事务锁上的操作进行分析,事务锁实体,其实和jdk自带的锁在描述上来说是差不多的。都是为了保护数据而产生的一种行为的定义。

因此我们在项目中的lock包中定义如下类

锁的行为有:

  • 1:创建
  • 2:注销
  • 3:设置内容

根据上面定义的事务锁行为。我们有如下接口的描述NettyTransactionLockInterface.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1package com.twjitm.transaction.lock;
2
3
4import com.twjitm.transaction.transaction.exception.NettyTransactionException;
5
6/**
7 * 事务锁接口
8 * <p>
9 *     本类面向的是锁。主要是为事物实体提供原子操作
10 *     利用redis 的原子操作实现分布式事物锁,
11 *     该抽象接口定义了锁的基本操作。
12 * </p>
13 */
14public interface NettyTransactionLockInterface {
15    /**
16     * 销毁
17     */
18     void destroy();
19
20    /**
21     * 创建
22     * @return
23     */
24     boolean create(long seconds)  throws NettyTransactionException;
25
26    /**
27     * 获取信息
28     * @return
29     */
30     String getInfo();
31
32    /**
33     * 设置内容
34     */
35     void setContent(String lockContent);
36
37}
38
39
40
41

还记得我们前面描述的锁的种类吗?我们有写锁,读锁,强占锁,超时锁等,下面我们来实现这些锁的具体实体类。NettyTransactionLock


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
1package com.twjitm.transaction.lock;
2
3import com.twjitm.transaction.service.redis.NettyTransactionRedisService;
4import com.twjitm.transaction.transaction.enums.NettyTransactionEntityCause;
5import com.twjitm.transaction.transaction.enums.NettyTransactionLockStateEnum;
6import com.twjitm.transaction.transaction.exception.NettyTransactionException;
7import com.twjitm.transaction.utils.TimeUtil;
8import org.slf4j.Logger;
9import org.slf4j.LoggerFactory;
10
11/**
12 * netty 事物锁,基于redis实现
13 * <pre>
14 * 写锁
15 * </pre>
16 *
17 * @author twjitm- [Created on 2018-08-27 12:05]
18 * @jdk java version "1.8.0_77"
19 */
20public class NettyTransactionLock implements NettyTransactionLockInterface {
21
22
23    private Logger logger = LoggerFactory.getLogger(NettyTransactionLock.class);
24    /**
25     * 事物锁关键字
26     */
27    private String lockKey;
28    /**
29     * 事物锁创建需要的redis服务
30     */
31    private NettyTransactionRedisService redisService;
32
33    /**
34     * 事务支出的zookeeper服务器
35     *
36     *
37     */
38
39
40    /**
41     * 事物产生实体的原因
42     */
43    private NettyTransactionEntityCause cause;
44
45    /**
46     * 事物锁状态
47     */
48    private NettyTransactionLockStateEnum lockStateEnum;
49
50    /**
51     * 锁时长
52     */
53    private int lockTime;
54
55    /**
56     * 锁强制标识
57     */
58    private boolean forceFlag;
59
60    /**
61     * 锁内容
62     */
63    private String lockContext = "";
64
65    /**
66     * 初始化一个写锁,默认锁时间为系统配置时间。
67     *
68     * @param lockKey
69     * @param redisService
70     * @param cause
71     */
72    public NettyTransactionLock(String lockKey, NettyTransactionRedisService redisService, NettyTransactionEntityCause cause) {
73        super();
74        this.lockKey = lockKey;
75        this.redisService = redisService;
76        this.cause = cause;
77        this.lockStateEnum = NettyTransactionLockStateEnum.INIT;
78        this.lockTime = TimeUtil.MINUTE_SECOND;
79
80    }
81
82
83    /**
84     * 初始化一个写锁。并且指定锁时间,是否具有强制性。
85     *
86     * @param lockKey
87     * @param redisService
88     * @param cause
89     * @param lockTime
90     * @param forceFlag
91     */
92    public NettyTransactionLock(String lockKey, NettyTransactionRedisService redisService, NettyTransactionEntityCause cause, int lockTime, boolean forceFlag) {
93        super();
94        this.lockKey = lockKey;
95        this.redisService = redisService;
96        this.cause = cause;
97        this.lockStateEnum = NettyTransactionLockStateEnum.INIT;
98        this.lockTime = lockTime;
99        this.forceFlag = forceFlag;
100
101
102    }
103
104    /**
105     * 初始化一个具有内容的写锁,并且制定锁时间和强制性标志。以及所内容
106     *
107     * @param lockKey
108     * @param redisService
109     * @param cause
110     * @param lockTime
111     * @param forceFlag
112     * @param lockContext
113     */
114    public NettyTransactionLock(String lockKey, NettyTransactionRedisService redisService, NettyTransactionEntityCause cause, int lockTime, boolean forceFlag, String lockContext) {
115        super();
116        this.lockKey = lockKey;
117        this.redisService = redisService;
118        this.cause = cause;
119        this.lockStateEnum = NettyTransactionLockStateEnum.INIT;
120        this.lockTime = lockTime;
121        this.forceFlag = forceFlag;
122        this.lockContext = lockContext;
123    }
124
125
126    /**
127     * <p>
128     * 注销一个锁。
129     * 锁的注销是有条件的。锁不能再初始化的时候和创建的时候注销。
130     * 只能这个锁创建成功后才能注销。创建一个锁。将必须使用这个锁。若创建一个锁不进行使用
131     * 的话,将无法注销这个锁。只能等锁时间过期后才能自动注销锁
132     * </p>
133     */
134    @Override
135    public void destroy() {
136        if (this.lockStateEnum.equals(NettyTransactionLockStateEnum.INIT) ||
137                this.lockStateEnum.equals(
138                        NettyTransactionLockStateEnum.CREATE)) {
139            return;
140        }
141        boolean destroyFlag = true;
142        if (!lockContext.equals("".trim())) {
143            destroyFlag = checkLockContext();
144        }
145        String realLockKey = getLockKey(lockKey, cause);
146        if (destroyFlag) {
147            boolean delete = redisService.deleteKey(realLockKey);
148            if (!delete) {
149                logger.info("居然没有删除掉这个key=" + realLockKey);
150            }
151        }
152    }
153
154    /**
155     * 检测锁内容
156     *
157     * @return
158     */
159    private boolean checkLockContext() {
160        boolean checkFlag = false;
161        String content = redisService.getString(getLockKey(lockKey, cause));
162        if (content != null) {
163            checkFlag = content.equals(this.lockContext);
164        }
165        return checkFlag;
166
167    }
168
169
170    /**
171     * 获取锁可以
172     *
173     * @param lockKey
174     * @param cause
175     * @return
176     */
177    public String getLockKey(String lockKey, NettyTransactionEntityCause cause) {
178        return lockKey + "#" + cause.getCause();
179    }
180
181
182    /**
183     * 创建分布式事物锁,创建一个分布式事物锁的代价是比较高的,
184     * 应为需要将请求消息发送到对应的redis服务器或者是zookeeper服务器
185     * 但是当我们逻辑服务器和redis不在同一台服务器的时候,我们需要走网络层
186     * 连接,相当于开启一个tcp连接通道。这条通道主要是为了我们能够与redis或者zookeeper
187     * 服务器进行通讯,为可防止单点问题,我们可以将redis做成集群模式,同样zookeeper也
188     * 一样。当然在这个地方我们默认使用redis实现分布式所务锁,当别的逻辑服务器申请锁的
189     * 的时候也会进行创建。利用redis的原子性,保证本锁的原子性。
190     *
191     * @param seconds
192     * @return
193     * @throws NettyTransactionException
194     */
195    @Override
196    public boolean create(long seconds) throws NettyTransactionException {
197        this.lockStateEnum = NettyTransactionLockStateEnum.CREATE;
198        boolean createFlag;
199        String realLockKey = getLockKey(lockKey, cause);
200        try {
201            //设置锁标识
202            createFlag = redisService.setNxString(realLockKey, lockContext, lockTime);
203            if (createFlag) {
204                this.lockStateEnum = NettyTransactionLockStateEnum.SUCCESS;
205                logger.info("创建锁成功");
206                redisService.expire(realLockKey, lockTime);
207            } else {
208                if (forceFlag) {
209                    this.lockStateEnum = NettyTransactionLockStateEnum.SUCCESS;
210                    redisService.setString(realLockKey, lockContext, lockTime);
211                    redisService.expire(realLockKey, lockTime);
212                    createFlag = true;
213                    logger.info("创建强制锁:" + realLockKey + ",过期时间长度为: " + lockTime);
214                } else {
215                    createFlag = false;
216                    logger.info("创建锁失败" + realLockKey + ",过期时间为: " + lockTime);
217                }
218            }
219
220        } catch (Exception e) {
221            throw new NettyTransactionException("创建锁发生意想不到的错误,请检查");
222        }
223        return createFlag;
224    }
225
226    @Override
227    public String getInfo() {
228        return lockKey + cause + checkLockContext() + lockTime;
229    }
230
231    @Override
232    public void setContent(String lockContent) {
233        this.lockContext = lockContent;
234    }
235}
236
237
238

其实代码中已经有很详细的介绍了。每个部分的功能都可以在注释中可以看到。正如事务锁的创建。代码做了详细的描述。在这就省略了。

读锁


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
1package com.twjitm.transaction.lock;
2
3import com.twjitm.transaction.service.redis.NettyTransactionRedisService;
4import com.twjitm.transaction.transaction.enums.NettyTransactionEntityCause;
5import com.twjitm.transaction.transaction.enums.NettyTransactionLockStateEnum;
6import com.twjitm.transaction.transaction.exception.NettyTransactionException;
7import org.apache.commons.lang.StringUtils;
8import org.slf4j.Logger;
9import org.slf4j.LoggerFactory;
10
11/**
12 * netty 游戏事物锁,基于redis实现的分布式游戏事务锁
13 * <pre>
14 *     读锁
15 * </pre>
16 *
17 * @author twjitm - [Created on 2018-08-27 12:14]
18 * @jdk java version "1.8.0_77"
19 */
20public class NettyTransactionReadLock implements NettyTransactionLockInterface {
21    private Logger logger = LoggerFactory.getLogger(NettyTransactionReadLock.class);
22    /**
23     * 事物锁key
24     */
25    private String lockKey;
26    /**
27     * 锁提供的redis
28     */
29    private NettyTransactionRedisService redisService;
30    /**
31     * 事物锁参数原因
32     */
33    private NettyTransactionEntityCause cause;
34
35    /**
36     * 分布式读锁状态
37     */
38    private NettyTransactionLockStateEnum lockState;
39
40    /**
41     * 分布式读锁内容
42     */
43    private String lockContext;
44
45
46    public NettyTransactionReadLock(String lockKey, NettyTransactionRedisService redisService, NettyTransactionEntityCause cause) {
47        super();
48        this.lockKey = lockKey;
49        this.redisService = redisService;
50        this.cause = cause;
51        this.lockState = NettyTransactionLockStateEnum.INIT;
52
53
54    }
55
56    /**
57     * 注销一个锁
58     */
59    @Override
60    public void destroy() {
61        if (this.lockState == NettyTransactionLockStateEnum.INIT || this.lockState == NettyTransactionLockStateEnum.CREATE) {
62            return;
63        }
64        boolean exists = redisService.exists(getLockKey(lockKey, cause));
65        if (exists && !StringUtils.isEmpty(lockContext)) {
66            exists = this.checkLockContext();
67            if (exists) {
68                redisService.deleteKey(getLockKey(lockKey, cause));
69            }
70        }
71
72
73    }
74
75    @Override
76    public boolean create(long seconds) throws NettyTransactionException {
77        this.lockState = NettyTransactionLockStateEnum.CREATE;
78        //检测值是否存在
79        boolean exists = redisService.exists(getLockKey(lockKey, cause));
80        //检测内容是否为空
81        if (exists && !StringUtils.isEmpty(lockContext)) {
82            exists = this.checkLockContext();
83        }
84        return exists;
85    }
86
87
88    private boolean checkLockContext() {
89
90        boolean checkFlag = false;
91        String realLockKey = getLockKey(lockKey, cause);
92        String content = redisService.getString(realLockKey);
93        if (!StringUtils.isEmpty(content)) {
94            logger.info("read content realLockKey:" + realLockKey);
95            checkFlag = content.equals(this.lockContext);
96        }
97        return checkFlag;
98    }
99
100    /**
101     * 获取锁可以
102     *
103     * @param lockKey
104     * @param cause
105     * @return
106     */
107    public String getLockKey(String lockKey, NettyTransactionEntityCause cause) {
108        return lockKey + "#" + cause.getCause();
109    }
110
111
112    @Override
113    public String getInfo() {
114        return this.lockKey + this.cause + this.lockContext;
115    }
116
117    @Override
118    public void setContent(String lockContent) {
119        this.lockContext = lockContent;
120    }
121}
122
123
124

基于zookeeper实现的写锁。NettyTransactionZkLock


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
1package com.twjitm.transaction.lock;
2
3import com.twjitm.transaction.service.zookeeper.NettyTransactionZookeeperService;
4import com.twjitm.transaction.transaction.enums.NettyTransactionEntityCause;
5import com.twjitm.transaction.transaction.enums.NettyTransactionLockStateEnum;
6import com.twjitm.transaction.transaction.exception.NettyTransactionException;
7import org.slf4j.Logger;
8import org.slf4j.LoggerFactory;
9
10/**
11 * 基于zookeeper分布式事务锁实体,zookeeper 实现分布式锁
12 * 基于zookeeper实现分布式锁存在的缺点:
13 * 由于zookeeper天生的特性,我们在创建节点的时候最好创建临时节点
14 * 防止长期占用锁,造成死锁。由于未知原因,可能程序释放锁失败。
15 *
16 * @author twjitm- [Created on 2018-08-29 14:52]
17 */
18public class NettyTransactionZkLock implements NettyTransactionLockInterface {
19
20    private Logger logger = LoggerFactory.getLogger(NettyTransactionZkLock.class);
21    /**
22     * 事物锁关键字
23     */
24    private String lockKey;
25    /**
26     * 事物锁创建需要的zookeeper服务
27     */
28
29    private NettyTransactionZookeeperService zookeeperService;
30
31    /**
32     * 事物锁参数原因
33     */
34    private NettyTransactionEntityCause cause;
35
36    /**
37     * 事物锁装填
38     */
39    private NettyTransactionLockStateEnum lockStateEnum;
40
41    /**
42     * 分布式读锁内容
43     */
44    private String lockContext="";
45
46
47    public NettyTransactionZkLock(String lockKey, NettyTransactionZookeeperService
48            zookeeperService,
49                                  NettyTransactionEntityCause cause) {
50        super();
51        this.lockKey = lockKey;
52        this.zookeeperService = zookeeperService;
53        this.cause = cause;
54        this.lockContext="";
55    }
56
57    public NettyTransactionZkLock(String lockKey,
58                                  NettyTransactionZookeeperService zookeeperService,
59                                  NettyTransactionEntityCause cause,
60                                  NettyTransactionLockStateEnum lockState) {
61        super();
62        this.lockKey = lockKey;
63        this.zookeeperService = zookeeperService;
64        this.cause = cause;
65        this.lockStateEnum = lockState;
66    }
67
68    public NettyTransactionZkLock(String lockKey, NettyTransactionZookeeperService
69            zookeeperService,
70                                  NettyTransactionEntityCause cause,
71                                  NettyTransactionLockStateEnum lockState, String
72                                          lockContext) {
73        super();
74        this.lockKey = lockKey;
75        this.zookeeperService = zookeeperService;
76        this.cause = cause;
77        this.lockStateEnum = lockState;
78        this.lockContext = lockContext;
79    }
80
81    /**
82     * 注销一个锁
83     */
84    @Override
85    public void destroy() {
86        //这两种状态不能注销锁
87        if (this.lockStateEnum.equals(NettyTransactionLockStateEnum.INIT) ||
88                this.lockStateEnum.equals(
89                        NettyTransactionLockStateEnum.CREATE)) {
90            return;
91        }
92        String realLockKey = getLockKey(lockKey, cause);
93        boolean delete = zookeeperService.deleteNode(realLockKey);
94        if (!delete) {
95            logger.info("居然没有删除掉这个key=" + realLockKey);
96        }
97    }
98
99
100    /**
101     * 创建锁节点
102     *
103     * @param lockKey
104     * @param cause
105     * @return
106     */
107    public String getLockKey(String lockKey, NettyTransactionEntityCause cause) {
108        return lockKey + "_" + cause.getCause();
109    }
110
111
112    /**
113     * 创建锁
114     *
115     * @param
116     * @return
117     * @throws NettyTransactionException
118     */
119    @Override
120    public boolean create(long seconds) throws NettyTransactionException {
121        this.lockStateEnum = NettyTransactionLockStateEnum.CREATE;
122        boolean createFlag;
123        String realKey = getLockKey(lockKey, cause);
124        //创建节点
125        createFlag = zookeeperService.createNode(realKey, lockContext);
126        if (createFlag) {
127            this.lockStateEnum = NettyTransactionLockStateEnum.SUCCESS;
128            logger.info("创建锁成功" + this.getInfo());
129        } else{
130            logger.info("获得锁失败" + this.getInfo());
131        }
132        return createFlag;
133    }
134
135    @Override
136    public String getInfo() {
137        return this.lockKey + cause.getCause() + this.lockStateEnum.name() +
138                this.lockContext;
139    }
140
141    @Override
142    public void setContent(String lockContent) {
143        this.lockContext = lockContent;
144    }
145}
146
147
148

到此,有关事物锁的定义就完成。基础组件定义完成之后我们需要对外提供事务锁服务。这是我们系统所具有的意义。对外提供分布式事务锁,是我们的核心功能。上诉的功能执行为了实现这个分布式事务锁功能的必备组件,因此我们定义一个服务接口。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
1package com.twjitm.transaction.service.transaction;
2
3import com.twjitm.transaction.transaction.entity.AbstractNettyTransactionEntity;
4import com.twjitm.transaction.transaction.entity.AbstractNettyTransactionZkEntity;
5import com.twjitm.transaction.transaction.enums.NettyTransactionCause;
6import com.twjitm.transaction.transaction.enums.NettyTransactionCommitResult;
7
8/**
9 * 事物对外提供的服务
10 * 可以批量提交事物
11 *
12 * @author twjitm- [Created on 2018-08-27 15:42]
13 * @jdk java version "1.8.0_77"
14 */
15public interface NettyTransactionService {
16
17    /**
18     * redis 模式提交事务
19     * @param cause
20     * @param abstractGameTransactionEntity
21     * @return
22     */
23    NettyTransactionCommitResult commitTransaction(NettyTransactionCause cause, AbstractNettyTransactionEntity... abstractGameTransactionEntity);
24
25    /**
26     * redis 模式提交事务
27     * @param cause
28     * @param waitTime
29     * @param abstractGameTransactionEntity
30     * @return
31     */
32    NettyTransactionCommitResult commitTransaction(NettyTransactionCause cause, long waitTime, AbstractNettyTransactionEntity... abstractGameTransactionEntity);
33
34    /**
35     * zookeeper 模式来提交事物锁
36     *
37     * @param cause
38     * @param abstractNettyTransactionZkEntities
39     * @return
40     */
41
42    NettyTransactionCommitResult commitTransaction(NettyTransactionCause cause, AbstractNettyTransactionZkEntity... abstractNettyTransactionZkEntities);
43
44
45}
46
47
48

接口定义的几个方法都是具有相同的意义。都是提交事物。只不过一个是redis实现的,一个是zookeeper实现的。接口定义好了我们需要实现接口中的方法。下面是本系统的最核心的关键之处。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
1package com.twjitm.transaction.service.transaction;
2
3import com.twjitm.transaction.transaction.NettyTransaction;
4import com.twjitm.transaction.transaction.entity.AbstractNettyTransactionEntity;
5import com.twjitm.transaction.transaction.entity.AbstractNettyTransactionZkEntity;
6import com.twjitm.transaction.transaction.entity.NettyTransactionEntityInterface;
7import com.twjitm.transaction.transaction.enums.NettyTransactionCause;
8import com.twjitm.transaction.transaction.enums.NettyTransactionCommitResult;
9import com.twjitm.transaction.transaction.exception.NettyTransactionException;
10import org.slf4j.Logger;
11import org.slf4j.LoggerFactory;
12import org.springframework.stereotype.Service;
13
14/**
15 * 事物提交服务
16 *
17 * @author twjitm- [Created on 2018-08-27 15:44]
18 * @jdk java version "1.8.0_77"
19 */
20@Service
21public class NettyTransactionServiceImpl implements NettyTransactionService {
22    Logger logger = LoggerFactory.getLogger(NettyTransactionServiceImpl.class);
23
24    @Override
25    public NettyTransactionCommitResult commitTransaction(NettyTransactionCause transactionCause, AbstractNettyTransactionEntity... abstractGameTransactionEntity) {
26        NettyTransaction transaction = new NettyTransaction(transactionCause);
27        return commitTransaction(transaction, abstractGameTransactionEntity);
28    }
29
30    @Override
31    public NettyTransactionCommitResult commitTransaction(NettyTransactionCause gameTransactionCause, long waitTime, AbstractNettyTransactionEntity... abstractGameTransactionEntity) {
32        NettyTransaction transaction = new NettyTransaction(gameTransactionCause, waitTime);
33        return commitTransaction(transaction, abstractGameTransactionEntity);
34    }
35
36    @Override
37    public NettyTransactionCommitResult commitTransaction(NettyTransactionCause cause, AbstractNettyTransactionZkEntity... abstractNettyTransactionZkEntities) {
38        NettyTransaction transaction = new NettyTransaction(cause);
39
40        return commitTransaction(transaction, abstractNettyTransactionZkEntities);
41    }
42
43    /**
44     * 二阶段和三阶段的区别
45     * http://www.hollischuang.com/archives/681
46     *
47     * @param transaction                     事务
48     * @param abstractGameTransactionEntities 事务实体集和
49     * @return                                事务执行返回结果
50     */
51    private NettyTransactionCommitResult commitTransaction(NettyTransaction transaction, NettyTransactionEntityInterface... abstractGameTransactionEntities) {
52        NettyTransactionCommitResult tryCommitResult = NettyTransactionCommitResult.SUCCESS;
53        for (NettyTransactionEntityInterface entityInterface : abstractGameTransactionEntities) {
54            transaction.addEntity(entityInterface);
55        }
56        try {
57            //如果能够创建分布式服务器锁
58            if (transaction.createNettyTransactionLock()) {
59                logger.info("成功获得锁: " + transaction.toString());
60                logger.info("尝试提交锁: " + transaction.toString());
61                transaction.tryCommit();
62                if (transaction.canCommit()) {
63                    logger.info("正式提交锁: " + transaction.toString());
64                    transaction.commit();
65                    logger.info("提交锁成功: " + transaction.toString());
66                } else {
67                    logger.info("重复提交锁: " + transaction.toString());
68                    tryCommitResult = transaction.getTransactionTryCommitResult();
69                    logger.info("重复提交锁失败: " + transaction.toString());
70                }
71            } else {
72                logger.info("获得锁失败: " + transaction.toString());
73                tryCommitResult = NettyTransactionCommitResult.LOCK_ERROR;
74            }
75        } catch (Exception e) {
76            logger.info("提交锁发生异常: " + transaction.toString());
77            try {
78                logger.info("开始回滚锁: " + transaction.toString());
79                transaction.rollback();
80                logger.info("回滚锁成功: " + transaction.toString());
81            } catch (NettyTransactionException e1) {
82                e1.printStackTrace();
83                logger.info("回滚锁发生异常: " + transaction.toString());
84            }
85            //异常事务原因
86            tryCommitResult = NettyTransactionCommitResult.COMMON_ERROR;
87            if (e instanceof NettyTransactionException) {
88                NettyTransactionException exception = (NettyTransactionException) e;
89                NettyTransactionCommitResult tempGameTransactionTryCommitResult =
90                        exception.getResult();
91                if (tempGameTransactionTryCommitResult != null) {
92                    tryCommitResult = tempGameTransactionTryCommitResult;
93                }
94            }
95
96        } finally {
97            //释放锁
98            logger.info("释放锁开始: " + transaction.toString());
99            transaction.releaseNettyTransactionLock();
100            logger.info("释放锁成功: " + transaction.toString());
101        }
102        return tryCommitResult;
103    }
104
105
106}
107
108
109

由于篇幅原因,我们就不统统描述全部代码了。其实代码已经做了详细的介绍。相信有点java基础的同学都能够看明白。只不过此工程描述的是一种思想而已。
到此核心代码基本介绍完。那我们就来测试一下。前面在先睹为快的地方的时候我们已经看到了。下面我们来详细说明一下如何使用分布式事务锁框架。

如何使用

互斥锁
首先我们定义一个实体。MutexEntity需要继承AbstractNettyTransactionEntity实现为实现的方法。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
1package com.twjitm.transaction.entity;
2
3import com.twjitm.transaction.service.redis.NettyTransactionRedisService;
4import com.twjitm.transaction.transaction.entity.AbstractNettyTransactionEntity;
5import com.twjitm.transaction.transaction.enums.NettyTransactionCommitResult;
6import com.twjitm.transaction.transaction.enums.NettyTransactionEntityCause;
7import com.twjitm.transaction.transaction.exception.NettyTransactionException;
8
9import java.util.BitSet;
10
11/**
12 * 互斥锁测试 实体
13 *
14 * @author twjtim- [Created on 2018-08-27 18:12]
15 * @jdk java version "1.8.0_77"
16 */
17public class MutexEntity extends AbstractNettyTransactionEntity {
18
19    NettyTransactionRedisService redisService;
20    String testKey;
21
22    public MutexEntity(NettyTransactionEntityCause cause, String key,
23                       NettyTransactionRedisService redisService) {
24        super(cause, key, redisService);
25        this.redisService = redisService;
26        this.testKey = key;
27    }
28
29    @Override
30    public void commit() throws NettyTransactionException {
31        redisService.setString("mutex_test", "twjitm");
32        throw new NullPointerException();
33    }
34
35    @Override
36    public void rollback() throws NettyTransactionException {
37        BitSet bitset = getProgressBitSet();
38        for (int i = 0; i < bitset.size(); i++) {
39            //@TODO 不同粒度回滚
40        }
41        redisService.deleteKey("mutex_test");
42    }
43
44    @Override
45    public NettyTransactionCommitResult tryCommit() throws NettyTransactionException {
46
47        return NettyTransactionCommitResult.SUCCESS;
48    }
49}
50
51
52

在测试中我们模拟在提交锁阶段抛出一个空指针异常,在回滚阶段我们来做一些操作。
测试类


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
1package com.twjitm.transaction.mtex;
2
3import com.twjitm.transaction.entity.MutexEntity;
4import com.twjitm.transaction.service.redis.impl.NettyTransactionRedisServiceImpl;
5import com.twjitm.transaction.service.transaction.NettyTransactionServiceImpl;
6import com.twjitm.transaction.spring.TestSpring;
7import com.twjitm.transaction.transaction.enums.NettyTransactionCause;
8import com.twjitm.transaction.transaction.enums.NettyTransactionCommitResult;
9import com.twjitm.transaction.transaction.enums.NettyTransactionEntityCause;
10import org.springframework.context.support.ClassPathXmlApplicationContext;
11import org.springframework.util.Assert;
12
13/**
14 * 互斥锁测试
15 *
16 * @author twjitm- [Created on 2018-08-28 10:31]
17 */
18public class TestMutex {
19
20    public static void main(String[] args) {
21
22
23
24        ClassPathXmlApplicationContext applicationContext = TestSpring.initSpring();
25        NettyTransactionRedisServiceImpl nettyTransactionRedisService =
26                (NettyTransactionRedisServiceImpl) applicationContext.getBean
27                        ("nettyTransactionRedisService");
28
29        NettyTransactionServiceImpl nettyTransactionService = (NettyTransactionServiceImpl) applicationContext.getBean("nettyTransactionServiceImpl");
30
31
32        NettyTransactionEntityCause cause = new NettyTransactionEntityCause("mutex");
33
34        MutexEntity mutexEntity = new MutexEntity(cause, "mutex", nettyTransactionRedisService);
35        NettyTransactionCause transactionCause = new NettyTransactionCause("mutex");
36
37        NettyTransactionCommitResult result =
38                nettyTransactionService.commitTransaction(transactionCause, mutexEntity);
39        System.out.println(result.getResult());
40        Assert.isTrue(true,"");
41    }
42}
43
44
45

用一个简单的main函数启动系统,测试功能。
执行结果:

下面我们通过一个流程图来简单总结本系统

项目源码开源道我的GitHub,有需要的同学可以下载。欢迎star

分布式事务锁源码

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全经验

微服务技术栈

2021-11-28 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索