Netty游戏服务器实战开发(9):利用redis或者zookeeper实现3pc分布式事务锁(三)。支撑腾讯系列某手游百万级流量公测

释放双眼,带上耳机,听听看~!

导读:在编写前面两篇文章的时候,博主其实已经把这个项目做完了,因为不可能一边写文章,一遍写项目。所以我都是把项目做完之后然后通过自己测试之后确保没有问题就来编写文章。

最近估计大家都听说了“华夏银行技术处长编写病毒植入系统,盗窃 700 余万受审”按键沸沸扬扬。很多人就想知道,如此可爱的程序员宝宝如何走向犯罪的道路,所以不管出于人性和社会道德,技术人员还是不要轻易的在系统中留下后门程序。否者被查出来一生清白毁于一旦。但是,从技术的脚步出发,整个事件属于跨行转账交易分布式事务在多阶段提交过程中出现了数据的不一致性,其整个过程涉及到了跨行转账,清算系统的关键流程。

在前几篇文章的介绍中我们已经了解到如何利用redis或者是zookeeper来实现分布式事务锁,也了解了分布式所务锁的基本原理已经2pc提交和3pc提交的区别。但是有没有小伙伴思考过这样一个问题?我们有了这些分布式事务锁组件。我们使用在项目中会带来什么影响?会不会增加系统的复杂度和效率问题?带着这些问题,我们来一一解答说明。

我们使用分布式事务锁给系统带来什么影响?

我们都知道传统本地事务有如下特性:

  • 1:原子性
  • 2:一致性
  • 3:隔离性
  • 4:持久性

传统单机应用一般都会使用一个关系型数据库,好处是应用可以使用 ACID transactions。为保证一致性我们只需要:开始一个事务,改变(插入,删除,更新)很多行,然后提交事务(如果有异常时回滚事务)。更进一步,借助开发平台中的数据访问技术和框架(如Spring),我们需要做的事情更少,只需要关注数据本身的改变。

在分布式系统中,事务的特性在满足传统本地事务的同时,还必须满足还同时具有2PC或者3PC提交等特性。关于他们的基本原理,我们在第一篇文章就已经做了详细的介绍。有需要的小伙伴可以查看。

当我们的系统逐渐庞大的时候,我们会考虑采用微服务的形式进行设计与开发。在微服务系统中,每个服务都访问自己的独立的数据库。而对其他系统都是采用API接口形式提供服务。对外全是黑盒。所以为了保证每个系统中的数据一致性。我们必须采用分布式事务锁来保证数据的一致性。

而我们的分布式事务锁都是采用全服共享数据来实现的,每个系统都会增加多余的网络IO连接到数据中心,所以当系统庞大的时候,分布式事务锁支持中心压力就会巨大,系统瓶颈可能就应为分布式事务锁申请而导致系统性能下降。

数据的一致性?

数据一致性是分布式系统中最重要的问题。在数据有多份副本的情况下,如果网络、服务器或者软件出现故障,会导致部分副本写入成功,部分副本写入失败。这就造成各个副本之间的数据不一致,数据内容冲突。造成事实上的数据不一致。

从一致性的本质来看,是要保证在一个业务逻辑中包含的服务要么都成功,要么都失败。怎么选择方向呢?保证成功还是保证失败呢?业务模式决定了我们的选择。实现最终一致性有三种模式:可靠事件模式、业务补偿模式、TCC模式。

可靠事件模式:

在游戏服务器中,玩家充值是一个常规的操作,而且也是游戏盈利的根本,所以,游戏中的充值订单是一个具有高危的操作,玩家充值的记录、日志相关的都得进行校验保存。就拿充值来说,一个玩家在游戏服众创建了一个订单,例如更新一个业务实体,游戏服务会向消息代理发布一个事件。消息代理会向订阅事件的支付服务推送事件,当订阅这些事件的支付服务接收此事件时,就可以完成自己的业务,也可能会引发更多的事件发布。

如订单服务创建一个待支付的订单,发布一个“创建订单”的事件。

支付服务消费“创建订单”事件,支付完成后发布一个“支付完成”事件。

订单服务消费“支付完成”事件,订单状态更新为待出库。

上面介绍的是正常的情况下完成的。但是这个过程并不是每次都是能够成功的,当我们创建订单失败时,或者订单消费者-支付服务失败的时候我们会出现这种情况。

这中情况出现的数据不一致性的地方在于:某个游戏服务在更新了业务实体后发布事件却失败;虽然分布式游戏服务发布事件成功,但是消息代理未能正确推送事件到订阅的支付服务;接受事件的支付服务重复消费了事件。

可靠事件模式在于保证可靠事件投递和避免重复消费,可靠事件投递定义为:
(a)每个服务原子性的业务操作和发布事件

(b)消息代理确保事件传递至少一次。避免重复消费要求服务实现幂等性,如支付服务不能因为重复收到事件而多次支付。

因为现在流行的消息队列都实现了事件的持久化和at least once的投递模式,(b)特性(消息代理确保事件投递至少一次)。

一般情况下上面的方法能够运行得很好,如果我们的分布式游戏服务是RPC类的服务我们需要更加小心,可能出现的问题在于,(1)过滤服务在业务处理完成后才将事件结果存储到事件存储中,但是在业务处理完成前有可能就已经收到重复事件,由于是RPC服务也不能依赖数据库的唯一性约束;(2)业务服务的处理结果可能出现位置状态,一般出现在正常提交请求但是没有收到响应的时候。

对于问题(1)可以按步骤记录事件处理过程,比如事件的记录事件的处理过程为“接收”、“发送请求”、“收到应答”、“处理完成”。好处是过滤服务能及时的发现重复事件,进一步还能根据事件状态作不同的处理。

对于问题(2)可以通过一次额外的查询请求来确定事件的实际处理状态,要注意额外的查询会带来更长时间的延时,更进一步可能某些RPC服务根本不提供查询接口。此时只能选择接收暂时的不一致,时候采用对账和人工接入的方式来保证一致性。

业务补偿模式

补偿模式使用一个额外的协调服务来协调各个需要保证一致性的微服务,协调服务按顺序调用各个服务,如果某个服务调用异常(包括业务异常和技术异常)就取消之前所有已经调用成功的服务。

  • 业务异常通常是指在数据部分不满足发生条件达不到。
  • 技术异常通常是指程序在执行过程中发生异常导致后续代码不能继续执行。

通常在游戏服务服务器中我们可能会有这样的一个需求,玩家需要创建战队,但是需要花费多少金币或者钻石来创建战队,通常情况下我们需要先扣除玩家的金币或者钻石,然后才提交战队信息。

在我们把钱币扣除以后,在创建战队的过程中发生不可以预知的异常,我们需要通过补偿式的将玩家的钱进行恢复。
为了降低开发的复杂性和提高效率,协调服务实现为一个通用的补偿框架。补偿框架提供服务编排和自动完成补偿的能力。

要实现补偿过程,我们需要做到两点:
首先要确定失败的步骤和状态,从而确定需要补偿的范围。
在项目中我们通过记录事务执行到某个具体步骤,根据步骤进行不同程度的补偿恢复。
如下代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
1/**
2 * 基于redis实现分布式事物锁
3 * 抽象事物实体类,所有的事物锁实体必须继承
4 * 本抽象类,实现自己的commit后的方法和rollback方法等用户实现方法
5 * <p>
6 * 这些方法不是创建锁或者回滚锁方法,而是实现的是业务方法。需要注意的是
7 * 不能再次方法中创建和锁字段一样的字段,否者造成数据一致性错误
8 * @jdk java version "1.8.0_77"
9 */
10public abstract class AbstractNettyTransactionEntity implements NettyTransactionEntityInterface {
11    /**
12     * 进度设置集合 主要用于rollback
13     */
14    private BitSet progressBitSet;
15
16    /**
17     * 事务锁
18     */
19    private NettyTransactionLockInterface nettyTransactionLock;
20
21    /**
22     * 锁类型
23     */
24    private NettyTransactionLockType nettyTransactionLockType;
25
26    /**
27     * 锁的正向标志(主要用于读取的时候)
28     */
29    private boolean rejectFlag = false;
30
31    public AbstractNettyTransactionEntity(NettyTransactionEntityCause cause,
32                                          String key,
33                                          NettyTransactionRedisService redisService) {
34        this.progressBitSet = new BitSet();
35        this.nettyTransactionLock = new NettyTransactionLock(key, redisService, cause);
36        this.nettyTransactionLockType = NettyTransactionLockType.WRITE;
37    }
38
39    /**
40     * 抽象分布式事物实体
41     *
42     * @param cause                    事物产生原因
43     * @param key                      事物key
44     * @param redisService             事物支持的redis服务
45     * @param nettyTransactionLockType 事物锁类型
46     */
47    public AbstractNettyTransactionEntity(NettyTransactionEntityCause cause,
48                                          String key,
49                                          NettyTransactionRedisService redisService,
50                                          NettyTransactionLockType nettyTransactionLockType) {
51        this.progressBitSet = new BitSet();
52        if (nettyTransactionLockType.equals(NettyTransactionLockType.READ)) {
53            this.nettyTransactionLock = new NettyTransactionReadLock(key, redisService, cause);
54            this.nettyTransactionLockType = NettyTransactionLockType.READ;
55        } else {
56            this.nettyTransactionLock = new NettyTransactionLock(key, redisService, cause);
57            this.nettyTransactionLockType = NettyTransactionLockType.WRITE;
58        }
59    }
60
61    public AbstractNettyTransactionEntity(NettyTransactionEntityCause cause,
62                                          String key,
63                                          NettyTransactionRedisService redisService,
64                                          NettyTransactionLockType nettyTransactionLockType, int lockTime) {
65        this.progressBitSet = new BitSet();
66        if (nettyTransactionLockType.equals(NettyTransactionLockType.READ)) {
67            this.nettyTransactionLock = new NettyTransactionReadLock(key, redisService, cause);
68            this.nettyTransactionLockType = NettyTransactionLockType.READ;
69        }
70        //独占锁
71        else if (nettyTransactionLockType.equals(NettyTransactionLockType.FORCE_WRITE_TIME)) {
72            this.nettyTransactionLock = new NettyTransactionLock(key, redisService, cause, lockTime, true);
73        } else {
74            //非独占锁
75            this.nettyTransactionLock = new NettyTransactionLock(key, redisService, cause, lockTime, false);
76        }
77        this.nettyTransactionLockType = nettyTransactionLockType;
78    }
79
80
81    /**
82     * 是否能够创建事物锁
83     *
84     * @param seconds
85     * @return
86     * @throws NettyTransactionException
87     */
88    @Override
89    public boolean createNettyTransactionLock(long seconds) throws NettyTransactionException {
90        boolean result = nettyTransactionLock.create(seconds);
91        if (rejectFlag) {
92            result = !result;
93        }
94        return result;
95    }
96
97    public void setRejectFlag(boolean rejectFlag) {
98        this.rejectFlag = rejectFlag;
99    }
100
101    /**
102     * 释放锁
103     */
104    @Override
105    public void releaseNettyTransactionLock() {
106        if (this.nettyTransactionLockType.equals(NettyTransactionLockType.FORCE_WRITE_TIME) || this.nettyTransactionLockType.equals(NettyTransactionLockType.WRITE_TIME)) {
107            return;
108        }
109        this.nettyTransactionLock.destroy();
110
111
112    }
113
114    /**
115     * 记录事务提交的进度,用于回滚操作。
116     * 根据进度进行不同程度的回滚
117     *
118     * @param step
119     */
120    public void setTransactionCommitProgress(int step) {
121        if (progressBitSet != null) {
122            progressBitSet.set(step);
123        }
124    }
125
126
127    /**
128     * 检查事物锁所处于的进度状态
129     *
130     * @param step
131     * @return
132     */
133    public boolean checkTransactionCommitProgress(int step) {
134        return this.progressBitSet.get(step);
135    }
136
137
138    @Override
139    public String getInfo() {
140        return this.nettyTransactionLock.getInfo();
141    }
142
143    /**
144     * 强制释放锁
145     */
146    @Override
147    public void forceReleaseNettyTransactionLock() {
148        this.nettyTransactionLock.destroy();
149    }
150
151
152    @Override
153    public boolean needCommit() {
154        return !this.nettyTransactionLockType.equals(NettyTransactionLockType.READ);
155    }
156
157    @Override
158    public NettyTransactionLockInterface getNettyTransactionLockInterface() {
159        return this.nettyTransactionLock;
160    }
161
162    public BitSet getProgressBitSet() {
163        return progressBitSet;
164    }
165}
166
167
168

从代码中可以体会到我们通过记录事务执行到某个具体的步骤时,若发生异常我们更具不同的粒度进行回滚补偿。

其次要能提供补偿操作使用到的业务数据。

比如一个支付在游戏服务的补偿操作要求参数包括支付时的业务流水id、账号和金额。理论上说实际完成补偿操作可以根据唯一的业务流水id就可以。在创建战队这种操作的时候我们更可能的记录玩家操作的日志流水,通过日志分析最后补偿的数据是否具有一致性。提高系统的安全性。

分布式微服务实现补偿操作不是简单的回退到业务发生时的状态,因为可能还有其他的并发的请求同时更改了状态。一般都使用逆操作的方式完成补偿。

  • 补偿过程不需要严格按照与业务发生的相反顺序执行,可以依据工作服务的重用程度优先执行,甚至是可以并发的执行。

  • 有些服务的补偿过程是有依赖关系的,被依赖服务的补偿操作没有成功就要及时终止补偿过程。

  • 如果在一个业务中包含的工作服务不是都提供了补偿操作,那我们编排服务时应该把提供补偿操作的服务放在前面,这样当后面的工作服务错误时还有机会补偿。

  • 设计工作服务的补偿接口时应该以协调服务请求的业务要素作为条件,不要以工作服务的应答要素作为条件。因为还存在超时需要补偿的情况,这时补偿框架就没法提供补偿需要的业务要素。

TCC模式(Try-Confirm-Cancel)

一个完整的TCC业务由一个主业务服务和若干个从业务服务组成,主业务服务发起并完成整个业务活动,TCC模式要求从服务提供三个接口:Try、Confirm、Cancel。

  1. Try:完成所有业务检查 预留必须业务资源2) Confirm:真正执行业务 不作任何业务检查 只使用Try阶段预留的业务资源 Confirm操作满足幂等性3) Cancel: 释放Try阶段预留的业务资源 Cancel操作满足幂等性整个TCC业务分成两个阶段完成。

第一阶段:主业务服务分别调用所有从业务的try操作,并在活动管理器中登记所有从业务服务。当所有从业务服务的try操作都调用成功或者某个从业务服务的try操作失败,进入第二阶段。

第二阶段:活动管理器根据第一阶段的执行结果来执行confirm或cancel操作。如果第一阶段所有try操作都成功,则活动管理器调用所有从业务活动的confirm操作。否则调用所有从业务服务的cancel操作。

需要注意的是第二阶段confirm或cancel操作本身也是满足最终一致性的过程,在调用confirm或cancel的时候也可能因为某种原因(比如网络)导致调用失败,所以需要活动管理支持重试的能力,同时这也就要求confirm和cancel操作具有幂等性。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
1private NettyTransactionCommitResult commitTransaction(NettyTransaction transaction, NettyTransactionEntityInterface... abstractGameTransactionEntities) {
2        NettyTransactionCommitResult tryCommitResult = NettyTransactionCommitResult.SUCCESS;
3        for (NettyTransactionEntityInterface entityInterface : abstractGameTransactionEntities) {
4            transaction.addEntity(entityInterface);
5        }
6        try {
7            //如果能够创建分布式服务器锁
8            if (transaction.createNettyTransactionLock()) {
9                logger.info("成功获得锁: " + transaction.toString());
10                logger.info("尝试提交锁: " + transaction.toString());
11                transaction.tryCommit();
12                if (transaction.canCommit()) {
13                    logger.info("正式提交锁: " + transaction.toString());
14                    transaction.commit();
15                    logger.info("提交锁成功: " + transaction.toString());
16                } else {
17                    logger.info("重复提交锁: " + transaction.toString());
18                    tryCommitResult = transaction.getTransactionTryCommitResult();
19                    logger.info("重复提交锁失败: " + transaction.toString());
20                }
21            } else {
22                logger.info("获得锁失败: " + transaction.toString());
23                tryCommitResult = NettyTransactionCommitResult.LOCK_ERROR;
24            }
25        } catch (Exception e) {
26            logger.info("提交锁发生异常: " + transaction.toString());
27            try {
28                logger.info("开始回滚锁: " + transaction.toString());
29                transaction.rollback();
30                logger.info("回滚锁成功: " + transaction.toString());
31            } catch (NettyTransactionException e1) {
32                e1.printStackTrace();
33                logger.info("回滚锁发生异常: " + transaction.toString());
34            }
35            //异常事务原因
36            tryCommitResult = NettyTransactionCommitResult.COMMON_ERROR;
37            if (e instanceof NettyTransactionException) {
38                NettyTransactionException exception = (NettyTransactionException) e;
39                NettyTransactionCommitResult tempGameTransactionTryCommitResult =
40                        exception.getResult();
41                if (tempGameTransactionTryCommitResult != null) {
42                    tryCommitResult = tempGameTransactionTryCommitResult;
43                }
44            }
45
46        } finally {
47            //释放锁
48            logger.info("释放锁开始: " + transaction.toString());
49            transaction.releaseNettyTransactionLock();
50            logger.info("释放锁成功: " + transaction.toString());
51        }
52        return tryCommitResult;
53    }
54
55

在补偿模式中一个比较明显的缺陷是,没有隔离性。从第一个工作服务步骤开始一直到所有工作服务完成(或者补偿过程完成),不一致是对其他服务可见的。另外最终一致性的保证还充分的依赖了协调服务的健壮性,如果协调服务异常,就没法达到一致性。

TCC模式在一定程度上弥补了上述的缺陷,在TCC模式中直到明确的confirm动作,所有的业务操作都是隔离的(由业务层面保证)。另外工作服务可以通过指定try操作的超时时间,主动的cancel预留的业务资源,从而实现自治的微服务。

TCC模式和补偿模式一样需要需要有协调服务和工作服务,协调服务也可以作为通用服务一般实现为框架。与补偿模式不同的是TCC服务框架不需要记录详细的业务流水,完成confirm和cancel操作的业务要素由业务服务提供。

TCC模式也不能百分百保证一致性,如果业务服务向TCC服务框架提交confirm后,TCC服务框架向某个工作服务提交confirm失败(比如网络故障),那么就会出现不一致,一般称为heuristic exception。

需要说明的是为保证业务成功率,业务服务向TCC服务框架提交confirm以及TCC服务框架向工作服务提交confirm/cancel时都要支持重试,这也就要confirm/cancel的实现必须具有幂等性。如果业务服务向TCC服务框架提交confirm/cancel失败,不会导致不一致,因为服务最后都会超时而取消。

另外heuristic exception是不可杜绝的,但是可以通过设置合适的超时时间,以及重试频率和监控措施使得出现这个异常的可能性降低到很小。如果出现了heuristic exception是可以通过人工的手段补救的。

如果有些业务由于瞬时的网络故障或调用超时等问题,通过上文所讲的3种模式一般都能得到很好的解决。但是在当今云计算环境下,很多服务是依赖于外部系统的可用性情况,在一些重要的业务场景下还需要周期性的对账来保证真实的一致性。

总结:
分布式事务,本质上是对多个数据库的事务进行统一控制,按照控制力度可以分为:不控制、部分控制和完全控制。不控制就是不引入分布式事务,部分控制就是各种变种的两阶段提交,包括上面提到的消息事务、业务补偿、TCC模式,而完全控制就是完全实现两阶段提交。部分控制的好处是并发量和性能很好,缺点是数据一致性减弱了,完全控制则是牺牲了性能,保障了一致性,具体用哪种方式,最终还是取决于业务场景。作为技术人员,一定不能忘了技术是为业务服务的,不要为了技术而技术,针对不同业务进行技术选型也是一种很重要的能力。

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全网络

基于netty-socketio的web推送服务

2021-8-18 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索