Java分布式锁之数据库方式实现

释放双眼,带上耳机,听听看~!

之前的文章《Java分布式锁实现》中列举了分布式锁的3种实现方式,分别是基于数据库实现,基于缓存实现和基于zookeeper实现。三种实现方式各有可取之处,本篇文章就详细讲解一下Java分布式锁之基于数据库的实现方式,也是最简单最易理解的实现方式。

首先,先来阐述下“锁”的概念,锁作为一种安全防御工具,既能上锁防止别人打开,又能让持有钥匙的人打开锁,这是锁的基本功能。那再来说一下“分布式锁”,分布式锁是在分布式系统(多个独立运行系统)内的锁,相对来说,这把锁的安全级别以及作用范围更大,所以从设计上就要考虑更多东西。

现在来说,怎么基于数据库实现这把分布式锁。其实说白了就是,把锁作为数据资源存入数据库,当持有这把锁的访问者来决定是否开锁。

以下详细讲解了在多个应用服务里,怎样用数据库去实现分布式锁。

结合案例:

1.客户app取出交易(同一个客户在某一个时间点只能对某种资产做取现操作)

2.交易重试补偿(交易过程服务宕机,扫描重试补偿)

一、数据库的设计

数据库锁表的表结构如下:

field type comment
ID bigint 主键
OUTER_SERIAL_NO varchar 流水号
CUST_NO char 客户号
SOURCE_CODE varchar 锁操作
THREAD_NO varchar 线程号
STATUS char 锁状态
REMARK varchar 备注
CREATED_AT timestamp 创建时间
UPDATED_AT timestamp 更新时间

1
1

作为锁的必要属性有5个:系统流水号,客户号,锁操作,线程号和锁状态,下面来解释一下每种属性

流水号:锁的具体指向,比如可以是产品,可以是交易流水号(后面会说到交易同步锁、交易补偿锁的使用方式)

客户号:客户的唯一标识

锁操作:客户的某种操作,比如客户取现操作,取现补偿重试操作

线程号:当前操作线程的线程号,比如取当前线程的uuid

锁状态:P处理中,F失败,Y成功

二、代码设计

代码的目录结构如下: 

Java分布式锁之数据库方式实现

主要贴一下锁操作的核心代码实现:

锁接口定义:DbLockManager.java

Java分布式锁之数据库方式实现Java分布式锁之数据库方式实现


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1/**
2 * 锁接口 <br>
3 *
4 * @Author fugaoyang
5 *
6 */
7public interface DbLockManager {
8
9    /**
10     * 加锁
11     */
12    boolean lock(String outerSerialNo, String custNo, LockSource source);
13
14    /**
15     * 解锁
16     */
17    void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus);
18
19}
20

View Code

锁接口实现类:DbLockManagerImpl.java

Java分布式锁之数据库方式实现Java分布式锁之数据库方式实现


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
1/**
2 *
3 * 数据库锁实现<br>
4 *
5 * @author fugaoyang
6 *
7 */
8@Service
9public class DbLockManagerImpl implements DbLockManager {
10
11    private final Logger LOG = LoggerFactory.getLogger(this.getClass());
12
13    @Autowired
14    private DbSyncLockMapper lockMapper;
15
16    @Transactional
17    public boolean lock(String outerSerialNo, String custNo, LockSource source) {
18
19        boolean isLock = false;
20        TradeSyncLock lock = null;
21        try {
22            lock = lockMapper.find(outerSerialNo, custNo, source.getCode());
23
24            if (null == lock) {
25                lock = new TradeSyncLock();
26                createLock(lock, outerSerialNo, custNo, source);
27
28                int num = lockMapper.insert(lock);
29                if (num == 1) {
30                    isLock = true;
31                }
32
33                LOG.info(ThreadLogUtils.getLogPrefix() + "加入锁,客户号[{}],锁类型[{}]", custNo, source.getCode());
34                return isLock;
35            }
36
37            // 根据交易类型进行加锁
38            isLock = switchSynsLock(lock, source);
39            LOG.info(ThreadLogUtils.getLogPrefix() + "更新锁,客户号[{}],锁类型[{}]", custNo, source.getCode());
40
41        } catch (Exception e) {
42            LOG.error(ThreadLogUtils.getLogPrefix() + "交易加锁异常, 客户号:" + custNo, e);
43        }
44        return isLock;
45    }
46
47    @Transactional
48    public void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus) {
49
50        try {
51            TradeSyncLock lock = lockMapper.find(outerSerialNo, custNo, source.getCode());
52
53            if (null != lock) {
54                lockMapper.update(lock.getId(), targetStatus.getName(), LockStatus.P.getName(),
55                        ThreadLogUtils.getCurrThreadUuid(), ThreadLogUtils.getCurrThreadUuid());
56            }
57
58            LOG.info(ThreadLogUtils.getLogPrefix() + "释放锁,客户号[{}],锁类型[{}]", custNo, source.getCode());
59        } catch (Exception e) {
60            LOG.error(ThreadLogUtils.getLogPrefix() + "释放锁异常, 客户号:{}", custNo, e);
61        }
62    }
63
64    /**
65     * 匹配加锁
66     */
67    private boolean switchSynsLock(TradeSyncLock lock, LockSource source) {
68        boolean isLock = false;
69
70        switch (source) {
71        case WITHDRAW:
72            ;
73            isLock = tradeSynsLock(lock);
74            break;
75        case WITHDRAW_RETRY:
76            ;
77            isLock = retrySynsLock(lock);
78            break;
79        default:
80            ;
81        }
82        return isLock;
83    }
84
85    /**
86     * 交易同步锁
87     */
88    private boolean tradeSynsLock(TradeSyncLock lock) {
89        // 处理中的不加锁,即不执行交易操作
90        if (LockStatus.P.getName().equals(lock.getStatus())) {
91            return false;
92        }
93
94        int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.S.getName(),
95                ThreadLogUtils.getCurrThreadUuid(), null);
96        if (num == 1) {
97            return true;
98        }
99        return false;
100    }
101
102    /**
103     * 补偿同步锁
104     */
105    private boolean retrySynsLock(TradeSyncLock lock) {
106        // 处理中或处理完成的不加锁,即不执行补偿操作
107        if (LockStatus.P.getName().equals(lock.getStatus()) || LockStatus.S.getName().equals(lock.getStatus())) {
108            return false;
109        }
110
111        int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.F.getName(),
112                ThreadLogUtils.getCurrThreadUuid(), null);
113        if (num == 1) {
114            return true;
115        }
116        return false;
117    }
118
119    private void createLock(TradeSyncLock lock, String outerSerialNo, String custNo, LockSource source) {
120        lock.setOuterSerialNo(outerSerialNo);
121        lock.setCustNo(custNo);
122        lock.setSourceCode(source.getCode());
123        lock.setThreadNo(ThreadLogUtils.getCurrThreadUuid());
124        lock.setStatus(LockStatus.P.getName());
125        lock.setRemark(source.getDesc());
126    }
127
128}
129

View Code

获取当前线程号以及打印uuid工具类ThreadLogUtils.Java

Java分布式锁之数据库方式实现Java分布式锁之数据库方式实现


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
1/**
2 *
3 * 线程处理<br>
4 *
5 * @author fugaoyang
6 *
7 */
8public class ThreadLogUtils {
9
10    private static ThreadLogUtils instance = null;
11
12    private ThreadLogUtils() {
13        setInstance(this);
14    }
15
16    // 初始化标志
17    private static final Object __noop = new Object();
18    private static ThreadLocal<Object> __flag = new InheritableThreadLocal<Object>() {
19        @Override
20        protected Object initialValue() {
21            return null;
22        }
23    };
24
25    // 当前线程的UUID信息,主要用于打印日志;
26    private static ThreadLocal<String> currLogUuid = new InheritableThreadLocal<String>() {
27        @Override
28        protected String initialValue() {
29            return UUID.randomUUID().toString()/* .toUpperCase() */;
30        }
31    };
32
33    private static ThreadLocal<String> currThreadUuid = new ThreadLocal<String>() {
34        @Override
35        protected String initialValue() {
36            return UUIDGenerator.getUuid();
37        }
38    };
39
40    public static void clear(Boolean isNew) {
41        if (isNew) {
42
43            currLogUuid.remove();
44
45            __flag.remove();
46
47            currThreadUuid.remove();
48
49        }
50    }
51
52    public static String getCurrLogUuid() {
53        if (!isInitialized()) {
54            throw new IllegalStateException("TLS未初始化");
55        }
56
57        return currLogUuid.get();
58    }
59
60    public static String getCurrThreadUuid() {
61        return currThreadUuid.get();
62    }
63
64    public static void clearCurrThreadUuid() {
65        currThreadUuid.remove();
66    }
67
68    public static String getLogPrefix() {
69        if (!isInitialized()) {
70            return "";
71        }
72
73        return "<uuid=" + getCurrLogUuid() + ">";
74    }
75
76    private static boolean isInitialized() {
77        return __flag.get() != null;
78    }
79
80    /**
81     * 初始化上下文,如果已经初始化则返回false,否则返回true<br/>
82     *
83     * @return
84     */
85    public static boolean initialize() {
86        if (isInitialized()) {
87            return false;
88        }
89
90        __flag.set(__noop);
91        return true;
92    }
93
94    private static void setInstance(ThreadLogUtils instance) {
95        ThreadLogUtils.instance = instance;
96    }
97
98    public static ThreadLogUtils getInstance() {
99        return instance;
100    }
101
102}
103

View Code

两种锁的实现的大致思路如下:

1.交易同步锁

当一个客户在app取现,第一次进入时,会插入一条当前线程,状态是P,操作是取现的锁,取现成功后根据当前线程号会更新成功;

当一个客户同时多个取现操作时,只有一个取现操作会加锁成功,其它会加锁失败;

当一个客户已经在取现中,这时数据库已经有一条状态P的锁,该客户同时又做了取现,这个取现动作会尝试加锁而退出;

2.交易重试补偿锁

1.当一个客户取现加锁成功,因调用第三方支付接口超时时,后台会对该笔交易重新发起重试打款操作,这时会新加一条当前交易流水号,当前线程号,状态是P,操作是取现重试的锁,重试的支付结果是成功的话,更新该条锁数据为Y状态,否则更新该条数据为F状态;

2.当重试支付失败后,再去重试打款时,发现锁的状态是F,这时把F更新为P,继续重试,根据重试结果更新锁状态。

上面实现的是一个最基本的数据库分布式锁,满足的并发量也是基于数据库所能扛得住的,性能基本可以满足普通的交易量。

后续可以优化的部分:

1.当一个用户同时多次获取lock时,因为目前是用的乐观锁,只会有一个加锁成功,可以优化成加入while(true)循环获取lock,当失败次数到达指定次数时退出,当前的操作结束。

2.当锁表数据量随着时间增大时,可以考虑按用户对锁表进行分表分库,以减小数据库方面的压力。

3.对锁的操作可以抽象出来,作为抽象实现,比如具体的取现操作只关心取现这个业务实现。

 

因为时间有限,写的比较仓促,希望大家有问题可以提出,相互探讨~~

完整示例代码后续会更新到github。

 

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全技术

Node.js Web 模块

2021-12-21 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索