ZooKeeper分布式锁实现java例子,附完整可运行源代码

释放双眼,带上耳机,听听看~!

本原创入门教程,涵盖ZooKeeper核心内容,通过实例和大量图表,结合实战,帮助学习者理解和运用,任何问题欢迎留言。

目录:

  • zookeeper介绍与核心概念
  • 安装和使用
  • ZooKeeper分布式锁实现
  • ZooKeeper框架Curator分布式锁实现及源代码分析
  • zookeeper 开发实战(java客户端)

分布式锁有多种实现方式,比如通过数据库、redis都可实现。作为分布式协同工具ZooKeeper,当然也有着标准的实现方式。本文介绍在zookeeper中如何实现排他锁。

 

1.0版本

首先我们先介绍一个简单的zookeeper实现分布式锁的思路:

  1. 用zookeeper中一个

临时节点代表锁,比如在
/exlusive_lock下创建临时子节点
/exlusive_lock/lock。

  1. 所有客户端

争相创建此节点,但只有一个客户端创建成功。

  1. 创建成功代表

获取锁成功,此客户端执行业务逻辑

  1. 未创建成功的客户端,

监听
/exlusive_lock
变更

  1. 获取锁的客户端执行完成后,删除

/exlusive_lock/lock,表示
锁被释放

  1. 锁被释放后,其他监听/exlusive_lock变更的客户端

得到通知,再次争相创建临时子节点
/exlusive_lock/lock。此时相当于回到了第2步。

我们的程序按照上述逻辑直至抢占到锁,执行完业务逻辑。

上述是较为简单的分布式锁实现方式。能够应付一般使用场景,但存在着如下两个问题:

1、锁的获取顺序和最初客户端争抢顺序不一致,这不是一个公平锁。每次锁获取都是当次最先抢到锁的客户端。

2、羊群效应,所有没有抢到锁的客户端都会监听/exlusive_lock变更。当并发客户端很多的情况下,所有的客户端都会接到通知去争抢锁,此时就出现了羊群效应。

为了解决上面的问题,我们重新设计。

2.0版本

我们在2.0版本中,让每个客户端在
/exlusive_lock下创建的临时节点为有序节点,这样每个客户端都在
/exlusive_lock下有自己对应的锁节点,而序号排在最前面的节点,代表对应的客户端获取锁成功。排在后面的客户端监听自己前面一个节点,那么在他前序客户端执行完成后,他将得到通知,获得锁成功。逻辑修改如下:

  1. 每个客户端往/exlusive_lock下创建有序临时节点/exlusive_lock/lock_。创建成功后/exlusive_lock下面会有每个客户端对应的节点,如/exlusive_lock/lock_000000001
  2. 客户端取得/exlusive_lock下子节点,并进行排序,判断排在最前面的是否为自己。
  3. 如果自己的锁节点在第一位,代表获取锁成功,此客户端执行业务逻辑
  4. 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点lock_000000002,那么则监听lock_000000001.
  5. 当前一位锁节点(lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(lock_000000002)的逻辑。
  6. 监听客户端重新执行第2步逻辑,判断自己是否获得了锁。

如此修改后,每个客户端只关心自己前序锁是否释放,所以每次只会有一个客户端得到通知。而且,所有客户端的执行顺序和最初锁创建的顺序是一致的。解决了1.0版本的两个问题。

接下来我们看看代码如何实现。

LockSample类

此类是分布式锁类,实现了2个分布式锁的相关方法:

1、获取锁

2、释放锁

主要程序逻辑围绕着这两个方法的实现,特别是获取锁的逻辑。我们先看一下该类的成员变量:


1
2
3
4
5
1private ZooKeeper zkClient;
2private static final String LOCK_ROOT_PATH = "/Locks";
3private static final String LOCK_NODE_NAME = "Lock_";
4private String lockPath;
5

定义了zkClient,用来操作zookeeper。

锁的根路径,及自增节点的前缀。
此处生产环境应该由客户端传入。

当前锁的路径。

构造方法


1
2
3
4
5
6
7
8
9
10
11
12
13
1public LockSample() throws IOException {
2    zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {
3        @Override
4        public void process(WatchedEvent event) {
5            if(event.getState()== Event.KeeperState.Disconnected){
6                System.out.println("失去连接");
7
8            }
9        }
10    });
11}
12
13

创建zkClient,同时创建了状态监听。此监听可以去掉,这里只是打印出失去连接状态。

获取锁实现

暴露出来的获取锁的方法为acquireLock(),逻辑很简单:


1
2
3
4
5
6
7
1public  void acquireLock() throws InterruptedException, KeeperException {
2    //创建锁节点
3    createLock();
4    //尝试获取锁
5    attemptLock();
6}
7

首先创建锁节点,然后尝试去取锁。真正的逻辑都在这两个方法中。

createLock()

先判断锁的根节点/Locks是否存在,不存在的话创建。然后在
/Locks下创建有序临时节点,并设置当前的锁路径变量lockPath。

代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1private void createLock() throws KeeperException, InterruptedException {
2    //如果根节点不存在,则创建根节点
3    Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
4    if (stat == null) {
5        zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
6    }
7
8    // 创建EPHEMERAL_SEQUENTIAL类型节点
9    String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
10            Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
11            CreateMode.EPHEMERAL_SEQUENTIAL);
12    System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);
13    this.lockPath=lockPath;
14}
15

attemptLock()

这是最核心的方法,客户端尝试去获取锁,是对2.0版本逻辑的实现,这里就不再重复逻辑,直接看代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
1private void attemptLock() throws KeeperException, InterruptedException {
2    // 获取Lock所有子节点,按照节点序号排序
3    List<String> lockPaths = null;
4
5    lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);
6
7    Collections.sort(lockPaths);
8
9    int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
10
11    // 如果lockPath是序号最小的节点,则获取锁
12    if (index == 0) {
13        System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);
14        return ;
15    } else {
16        // lockPath不是序号最小的节点,监听前一个节点
17        String preLockPath = lockPaths.get(index - 1);
18
19        Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
20
21        // 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁
22        if (stat == null) {
23            attemptLock();
24        } else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock
25            System.out.println(" 等待前锁释放,prelocakPath:"+preLockPath);
26            synchronized (watcher) {
27                watcher.wait();
28            }
29            attemptLock();
30        }
31    }
32}
33

注意这一行代码


1
2
1Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
2

我们在获取前一个节点的时候,同时设置了监听watcher。如果前锁存在,则阻塞主线程。

watcher定义代码如下:


1
2
3
4
5
6
7
8
9
10
1private Watcher watcher = new Watcher() {
2    @Override
3    public void process(WatchedEvent event) {
4        System.out.println(event.getPath() + " 前锁释放");
5        synchronized (this) {
6            notifyAll();
7        }
8    }
9};
10

watcher只是notifyAll,让主线程继续执行,以便再次调用attemptLock(),去尝试获取lock。如果没有异常情况的话,此时当前客户端应该能够成功获取锁。

释放锁实现

释放锁原语实现很简单,参照releaseLock()方法。代码如下:


1
2
3
4
5
6
1public void releaseLock() throws KeeperException, InterruptedException {
2    zkClient.delete(lockPath, -1);
3    zkClient.close();
4    System.out.println(" 锁释放:" + lockPath);
5}
6

关于分布式锁的代码到此就讲解完了,我们再看下客户端如何使用它。

我们创建一个TicketSeller类,作为客户端来使用分布式锁。

 TicketSeller类

sell()

不带锁的业务逻辑方法,代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
1private void sell(){
2    System.out.println("售票开始");
3    // 线程随机休眠数毫秒,模拟现实中的费时操作
4    int sleepMillis = (int) (Math.random() * 2000);
5    try {
6        //代表复杂逻辑执行了一段时间
7        Thread.sleep(sleepMillis);
8    } catch (InterruptedException e) {
9        e.printStackTrace();
10    }
11    System.out.println("售票结束");
12}
13

仅是为了演示,sleep了一段时间。

sellTicketWithLock()

此方法中,加锁后执行业务逻辑,代码如下:


1
2
3
4
5
6
7
1public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
2    LockSample lock = new LockSample();
3    lock.acquireLock();
4    sell();
5    lock.releaseLock();
6}
7

测试入口

接下来我们写一个main函数做测试:


1
2
3
4
5
6
7
1public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
2    TicketSeller ticketSeller = new TicketSeller();
3    for(int i=0;i<1000;i++){
4        ticketSeller.sellTicketWithLock();
5    }
6}
7

main函数中我们循环调用
ticketSeller.sellTicketWithLock(),执行加锁后的卖票逻辑。

测试方法

1、先启动一个java程序运行,可以看到日志输出如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1main 锁创建: /Locks/Lock_0000000391
2main 锁获得, lockPath: /Locks/Lock_0000000391
3售票开始
4售票结束
5 锁释放:/Locks/Lock_0000000391
6main 锁创建: /Locks/Lock_0000000392
7main 锁获得, lockPath: /Locks/Lock_0000000392
8售票开始
9售票结束
10 锁释放:/Locks/Lock_0000000392
11main 锁创建: /Locks/Lock_0000000393
12main 锁获得, lockPath: /Locks/Lock_0000000393
13售票开始
14售票结束
15 锁释放:/Locks/Lock_0000000393
16

可见每次执行都是按照锁的顺序执行,而且由于只有一个进程,并没有锁的争抢发生。

2、我们再启动一个同样的程序,锁的争抢此时发生了,可以看到双方的日志输出如下:

程序1:


1
2
3
4
5
6
7
8
9
10
11
12
1main 锁获得, lockPath: /Locks/Lock_0000000471
2售票开始
3售票结束
4 锁释放:/Locks/Lock_0000000471
5main 锁创建: /Locks/Lock_0000000473
6 等待前锁释放,prelocakPath:Lock_0000000472
7/Locks/Lock_0000000472 前锁释放
8main 锁获得, lockPath: /Locks/Lock_0000000473
9售票开始
10售票结束
11 锁释放:/Locks/Lock_0000000473
12

可以看到Lock_0000000471执行完成后,该进程获取的锁为Lock_0000000473,这说明Lock_0000000472被另外一个进程创建了。此时Lock_0000000473在等待前锁释放。Lock_0000000472释放后,Lock_0000000473才获得锁,然后才执行业务逻辑。

我们再看程序2的日志:


1
2
3
4
5
6
7
8
9
10
11
12
1main 锁获得, lockPath: /Locks/Lock_0000000472
2售票开始
3售票结束
4 锁释放:/Locks/Lock_0000000472
5main 锁创建: /Locks/Lock_0000000474
6 等待前锁释放,prelocakPath:Lock_0000000473
7/Locks/Lock_0000000473 前锁释放
8main 锁获得, lockPath: /Locks/Lock_0000000474
9售票开始
10售票结束
11 锁释放:/Locks/Lock_0000000474
12

可以看到,确实是进程2获取了Lock_0000000472。

zookeeper实现分布式锁就先讲到这。
注意代码只做演示用,并不适合生产环境使用。

代码清单如下:

1、LockSample


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
1import org.apache.zookeeper.*;
2import org.apache.zookeeper.data.Stat;
3
4import java.io.IOException;
5import java.util.Collections;
6import java.util.List;
7
8public class LockSample {
9
10    //ZooKeeper配置信息
11    private ZooKeeper zkClient;
12    private static final String LOCK_ROOT_PATH = "/Locks";
13    private static final String LOCK_NODE_NAME = "Lock_";
14    private String lockPath;
15
16    // 监控lockPath的前一个节点的watcher
17    private Watcher watcher = new Watcher() {
18        @Override
19        public void process(WatchedEvent event) {
20            System.out.println(event.getPath() + " 前锁释放");
21            synchronized (this) {
22                notifyAll();
23            }
24
25        }
26    };
27
28    public LockSample() throws IOException {
29        zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {
30            @Override
31            public void process(WatchedEvent event) {
32                if(event.getState()== Event.KeeperState.Disconnected){
33                    System.out.println("失去连接");
34
35                }
36            }
37        });
38    }
39
40    //获取锁的原语实现.
41    public  void acquireLock() throws InterruptedException, KeeperException {
42        //创建锁节点
43        createLock();
44        //尝试获取锁
45        attemptLock();
46    }
47
48    //创建锁的原语实现。在lock节点下创建该线程的锁节点
49    private void createLock() throws KeeperException, InterruptedException {
50        //如果根节点不存在,则创建根节点
51        Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
52        if (stat == null) {
53            zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
54        }
55
56        // 创建EPHEMERAL_SEQUENTIAL类型节点
57        String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
58                Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
59                CreateMode.EPHEMERAL_SEQUENTIAL);
60        System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);
61        this.lockPath=lockPath;
62    }
63
64    private void attemptLock() throws KeeperException, InterruptedException {
65        // 获取Lock所有子节点,按照节点序号排序
66        List<String> lockPaths = null;
67
68        lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);
69
70        Collections.sort(lockPaths);
71
72        int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
73
74        // 如果lockPath是序号最小的节点,则获取锁
75        if (index == 0) {
76            System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);
77            return ;
78        } else {
79            // lockPath不是序号最小的节点,监控前一个节点
80            String preLockPath = lockPaths.get(index - 1);
81
82            Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
83
84            // 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁
85            if (stat == null) {
86                attemptLock();
87            } else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock
88                System.out.println(" 等待前锁释放,prelocakPath:"+preLockPath);
89                synchronized (watcher) {
90                    watcher.wait();
91                }
92                attemptLock();
93            }
94        }
95    }
96
97    //释放锁的原语实现
98    public void releaseLock() throws KeeperException, InterruptedException {
99        zkClient.delete(lockPath, -1);
100        zkClient.close();
101        System.out.println(" 锁释放:" + lockPath);
102    }
103
104
105}
106

2、TicketSeller


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1import org.apache.zookeeper.KeeperException;
2
3import java.io.IOException;
4
5public class TicketSeller {
6    private void sell(){
7        System.out.println("售票开始");
8        // 线程随机休眠数毫秒,模拟现实中的费时操作
9        int sleepMillis = (int) (Math.random() * 2000);
10        try {
11            //代表复杂逻辑执行了一段时间
12            Thread.sleep(sleepMillis);
13        } catch (InterruptedException e) {
14            e.printStackTrace();
15        }
16        System.out.println("售票结束");
17    }
18
19    public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
20        LockSample lock = new LockSample();
21        lock.acquireLock();
22        sell();
23        lock.releaseLock();
24    }
25
26    public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
27        TicketSeller ticketSeller = new TicketSeller();
28        for(int i=0;i<1000;i++){
29            ticketSeller.sellTicketWithLock();
30
31        }
32    }
33}
34

 

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全资讯

良心网剧《鬼吹灯》主演靳东:玩命拍出来的

2016-12-27 10:01:28

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索