Apache 开源的curator 基于Zookeeper实现分布式锁以及源码分析

释放双眼,带上耳机,听听看~!

前一段时间,我发表了一篇关于Redis实现分布式锁 分布式环境下利用Redis实现分布式锁,今天我带领大家熟悉用zookeeper实现分布式锁。

在学习分布式锁之前,让我们想一想,在什么业务场景下会用到分布式锁以及设计分布式锁要注意什么?

分布式锁介绍

1、在什么业务场景中会使用到分布式锁

当多个客户端访问服务器上同一个资源的时候,需要保证数据的一致性,比如秒杀系统,举个栗子:

某件商品在系统中的数量是5件,当秒杀时间到来,会有大量的用户抢购这件商品,瞬间会产生非常大的并发。正常的购买流程是:

step1、用户下单

step2、判断商品数量是否足够

step3、如果足够,库存–

step4、如果库存不够,秒杀失败。

假设此时商品只剩余一件,用户A对商品下单,商品数足够,下单成功,系统还没有来得及减库存,用户B也对同一件商品下单,此时商品数仍为1,最后导致系统会库存减两次,导致商品超卖现象。此时就需要对用户下单–>减库存的这一步操作进行加锁,使操作成为原子操作。在单机、单进程环境下,使用JDK的ReentrantLcok或者synchronized完全足够,但由于秒杀系统并发量极大,单机承受不了这样的压力极易宕机,此时就需要多台服务器、多进程支撑起这个业务,单机下的ReentrantLcok或者synchronized在此处毫无用武之地,此时就需要一把分布式锁来保证某个时间段只有一个用户访问共享资源。

2、分布式锁的注意事项

a、高效的获取锁和释放锁

b、在网络不稳定、中断、宕机情况下要自动释放锁,防止自锁

c、有阻塞锁的特性,即使没有获取锁,也会阻塞等待

d、具备非阻塞锁特性,即没有获取到锁,则直接返回获取锁失败

e、具备可重入行,同一线程可多次获得锁

zookeeper实现分布式锁

对于zookeeper,在此就不多介绍,我们可以利用zk的顺序临时节点这一特性来实现分布式锁。思路如下:

1、获取锁时,在zk的目录下创建一个节点,判断该节点的需要在其兄弟节点中是否是最小的,若是最小的,则获取锁成功。

2、若不是最小的,则锁已被占用,需要对比自己小的节点注册监听器,如果锁释放,监听到释放锁事件,判断此时节点在其兄弟节点是不是最小的,如果是,获取锁。

下面我来介绍Apache 开源的curator 实现 Zookeeper 分布式锁以及源码分析。

首先,导入相关的maven


1
2
3
4
5
6
1<dependency>
2   <groupId>org.apache.curator</groupId>
3   <artifactId>curator-recipes</artifactId>
4   <version>2.10.0</version>
5</dependency>
6

再看main方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
1public class Application {
2
3   private static String address = "192.168.1.100:2181";
4   public static void main(String[] args) {
5       //1、重试策略:初试时间为1s 重试3次
6       RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
7       //2、通过工厂创建连接
8       CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy);
9       //3、开启连接
10      client.start();
11      //4 分布式锁
12      final InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
13      //读写锁
14      //InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/readwriter");
15      ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
16      for (int i = 0; i < 5; i++) {
17          fixedThreadPool.submit(new Runnable() {
18              @Override
19              public void run() {
20                  boolean flag = false;
21                  try {
22                      //尝试获取锁,最多等待5秒
23                      flag = mutex.acquire(5, TimeUnit.SECONDS);
24                      Thread currentThread = Thread.currentThread();
25                      if(flag){
26                          System.out.println("线程"+currentThread.getId()+"获取锁成功");
27                      }else{
28                          System.out.println("线程"+currentThread.getId()+"获取锁失败");
29                      }
30                      //模拟业务逻辑,延时4秒
31                      Thread.sleep(4000);
32                  } catch (Exception e) {
33                      e.printStackTrace();
34                  } finally{
35                      if(flag){
36                          try {
37                              mutex.release();
38                          } catch (Exception e) {
39                              e.printStackTrace();
40                          }
41                      }
42                  }
43              }
44          });
45      }
46  }
47}
48

上面代码展示得知,


1
2
1public boolean acquire(long time, TimeUnit unit)
2

方法是获得锁的方法,参数是自旋的时间,所以我们分析这个方法的源码。


1
2
3
4
1public boolean acquire(long time, TimeUnit unit) throws Exception {
2    return this.internalLock(time, unit);
3}
4

可见,acquire调用的是internalLock方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1private boolean internalLock(long time, TimeUnit unit) throws Exception {
2        Thread currentThread = Thread.currentThread();
3        //获得当前线程的锁
4        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
5        //如果锁不为空,当前线程已经获得锁,可重入锁,lockCount++
6        if (lockData != null) {
7            lockData.lockCount.incrementAndGet();
8            return true;
9        } else {
10            //获取锁,返回锁的节点路径
11            String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
12            if (lockPath != null) {
13                //向当前锁的map集合添加一个记录
14                InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
15                this.threadData.put(currentThread, newLockData);
16                return true;
17            } else {
18                return false;//获取锁失败
19            }
20        }
21    }
22

下面是threadData的数据结构,是一个Map结构,key是当前线程,value是当前线程和锁的节点的一个封装对象。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;
2
3private static class LockData {
4        final Thread owningThread;
5        final String lockPath;
6        final AtomicInteger lockCount;
7
8        private LockData(Thread owningThread, String lockPath) {
9            this.lockCount = new AtomicInteger(1);
10            this.owningThread = owningThread;
11            this.lockPath = lockPath;
12        }
13}
14

由internalLock方法可看到,最重要的方法是attemptLock方法


1
2
1String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
2

 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
2        long startMillis = System.currentTimeMillis();
3        //将等待时间转化为毫秒
4        Long millisToWait = unit != null ? unit.toMillis(time) : null;
5        byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
6        //重试次数
7        int retryCount = 0;
8        String ourPath = null;
9        boolean hasTheLock = false;
10        boolean isDone = false;
11
12        while(!isDone) {
13            isDone = true;
14
15            try {
16                //在当前path下创建临时有序节点
17                ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
18                //判断是不是序号最小的节点,如果是返回true,否则阻塞等待
19                hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
20            } catch (NoNodeException var14) {
21                if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
22                    throw var14;
23                }
24
25                isDone = false;
26            }
27        }
28        //返回当前锁的节点路径
29
30        return hasTheLock ? ourPath : null;
31    }
32

下面来看internalLockLoop方法,判断是不是最小节点的方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
1private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
2        boolean haveTheLock = false;
3        boolean doDelete = false;
4
5        try {
6            if (this.revocable.get() != null) {
7                ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
8            }
9            //自旋
10            while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
11                //获得所有子节点
12                List<String> children = this.getSortedChildren();
13                String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
14                PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
15                //判断是否是最小节点
16                if (predicateResults.getsTheLock()) {
17                    haveTheLock = true;
18                } else {
19                    //给比自己小的节点设置监听器
20                    String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
21                    //同步,是为了实现公平锁
22                    synchronized(this) {
23                        try {
24                            ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
25                            //如果等待时间==null,一直阻塞等待
26                            if (millisToWait == null) {
27                                this.wait();
28                            } else {
29                                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
30                                startMillis = System.currentTimeMillis();
31                                //等待超时时间
32                                if (millisToWait > 0L) {
33                                    this.wait(millisToWait);
34                                } else {
35                                    doDelete = true;//如果超时则删除锁
36                                    break;
37                                }
38                            }
39                        } catch (NoNodeException var19) {
40                            ;
41                        }
42                    }
43                }
44            }
45        } catch (Exception var21) {
46            ThreadUtils.checkInterrupted(var21);
47            doDelete = true;
48            throw var21;
49        } finally {
50            if (doDelete) {
51                //如果锁超时,删除锁
52                this.deleteOurPath(ourPath);
53            }
54
55        }
56
57        return haveTheLock;
58    }
59

释放锁:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1public void release() throws Exception {
2        Thread currentThread = Thread.currentThread();
3        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
4        //当前线程没有获取锁,不能释放
5        if (lockData == null) {
6            throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
7        } else {
8            int newLockCount = lockData.lockCount.decrementAndGet();
9            if (newLockCount <= 0) {
10                if (newLockCount < 0) {
11                    throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
12                } else {
13                    //释放锁
14                    try {
15                        this.internals.releaseLock(lockData.lockPath);
16                    } finally {
17                        this.threadData.remove(currentThread);
18                    }
19
20                }
21            }
22        }
23    }
24

好了,这就是我这次的分享内容,如有错请提出,谢谢。

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全技术

Linux Shell命令行快捷键

2021-8-18 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索