方案一:
利用Zookeeper不能重复创建一个节点(临时节点)的特性来实现一个分布式锁
流程:
- 查看目标Node是否已经创建,已经创建,那么等待锁。
- 如果未创建,创建一个瞬时Node,表示已经占有锁。
- 如果创建失败,那么证明锁已经被其他线程占有了,那么同样等待锁。
- 当释放锁,或者当前Session超时的时候,节点被删除,唤醒之前等待锁的线程去争抢锁。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 1package com.codertom.params.engine;
2
3import com.google.common.base.Strings;
4import org.apache.zookeeper.*;
5import java.io.IOException;
6import java.util.concurrent.ExecutorService;
7import java.util.concurrent.Executors;
8import java.util.concurrent.locks.Lock;
9
10/**
11 * Zookeepr实现分布式锁
12 */
13public class LockTest {
14
15 private String zkQurom = "localhost:2181";
16
17 private String lockNameSpace = "/mylock";
18
19 private String nodeString = lockNameSpace + "/test1";
20
21 private Lock mainLock;
22
23 private ZooKeeper zk;
24
25 public LockTest(){
26 try {
27 zk = new ZooKeeper(zkQurom, 6000, new Watcher() {
28 @Override
29 public void process(WatchedEvent watchedEvent) {
30 System.out.println("Receive event "+watchedEvent);
31 if(Event.KeeperState.SyncConnected == watchedEvent.getState())
32 System.out.println("connection is established...");
33 }
34 });
35 } catch (IOException e) {
36 e.printStackTrace();
37 }
38
39
40 }
41
42 private void ensureRootPath() throws InterruptedException {
43 try {
44 if (zk.exists(lockNameSpace,true)==null){
45 zk.create(lockNameSpace,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
46 }
47 } catch (KeeperException e) {
48 e.printStackTrace();
49 }
50 }
51
52 private void watchNode(String nodeString, final Thread thread) throws InterruptedException {
53 try {
54 zk.exists(nodeString, new Watcher() {
55 @Override
56 public void process(WatchedEvent watchedEvent) {
57 System.out.println( "==" + watchedEvent.toString());
58 if(watchedEvent.getType() == Event.EventType.NodeDeleted){
59 System.out.println("Threre is a Thread released Lock==============");
60 thread.interrupt();
61 }
62 try {
63 zk.exists(nodeString,new Watcher() {
64 @Override
65 public void process(WatchedEvent watchedEvent) {
66 System.out.println( "==" + watchedEvent.toString());
67 if(watchedEvent.getType() == Event.EventType.NodeDeleted){
68 System.out.println("Threre is a Thread released Lock==============");
69 thread.interrupt();
70 }
71 try {
72 zk.exists(nodeString,true);
73 } catch (KeeperException e) {
74 e.printStackTrace();
75 } catch (InterruptedException e) {
76 e.printStackTrace();
77 }
78 }
79
80 });
81 } catch (KeeperException e) {
82 e.printStackTrace();
83 } catch (InterruptedException e) {
84 e.printStackTrace();
85 }
86 }
87
88 });
89 } catch (KeeperException e) {
90 e.printStackTrace();
91 }
92 }
93
94 /**
95 * 获取锁
96 * @return
97 * @throws InterruptedException
98 */
99 public boolean lock() throws InterruptedException {
100 String path = null;
101 ensureRootPath();
102 watchNode(nodeString,Thread.currentThread());
103 while (true) {
104 try {
105 path = zk.create(nodeString, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
106 } catch (KeeperException e) {
107 System.out.println(Thread.currentThread().getName() + " getting Lock but can not get");
108 try {
109 Thread.sleep(5000);
110 }catch (InterruptedException ex){
111 System.out.println("thread is notify");
112 }
113 }
114 if (!Strings.nullToEmpty(path).trim().isEmpty()) {
115 System.out.println(Thread.currentThread().getName() + " get Lock...");
116 return true;
117 }
118 }
119 }
120
121 /**
122 * 释放锁
123 */
124 public void unlock(){
125 try {
126 zk.delete(nodeString,-1);
127 System.out.println("Thread.currentThread().getName() + release Lock...");
128 } catch (InterruptedException e) {
129 e.printStackTrace();
130 } catch (KeeperException e) {
131 e.printStackTrace();
132 }
133 }
134
135 public static void main(String args[]) throws InterruptedException {
136 ExecutorService service = Executors.newFixedThreadPool(10);
137 for (int i = 0;i<4;i++){
138 service.execute(()-> {
139 LockTest test = new LockTest();
140 try {
141 test.lock();
142 Thread.sleep(3000);
143 } catch (InterruptedException e) {
144 e.printStackTrace();
145 }
146 test.unlock();
147 });
148 }
149 service.shutdown();
150 }
151}
152
其实上面的实现有优点也有缺点:
优点:
实现比较简单,有通知机制,能提供较快的响应,有点类似reentrantlock的思想,对于节点删除失败的场景由Session超时保证节点能够删除掉。
缺点:
重量级,同时在大量锁的情况下会有“惊群”的问题。
“惊群”就是在一个节点删除的时候,大量对这个节点的删除动作有订阅Watcher的线程会进行回调,这对Zk集群是十分不利的。所以需要避免这种现象的发生。
方案二:
采用另一种方式,创建临时顺序节点,解决了惊群问题
实现流程:
- 我们将锁抽象成目录,多个线程在此目录下创建瞬时的顺序节点,因为Zk会为我们保证节点的顺序性,所以可以利用节点的顺序进行锁的判断。
- 首先创建顺序节点,然后获取当前目录下最小的节点,判断最小节点是不是当前节点,如果是那么获取锁成功,如果不是那么获取锁失败。
- 获取锁失败的节点获取当前节点上一个顺序节点,对此节点注册监听,当节点删除的时候通知当前节点。
- 当unlock的时候删除节点之后会通知下一个节点。
实现代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148 1package com.codertom.params.engine;
2
3import com.google.common.base.Strings;
4import org.apache.zookeeper.*;
5import org.apache.zookeeper.data.Stat;
6import java.io.IOException;
7import java.util.Collections;
8import java.util.List;
9import java.util.concurrent.ExecutorService;
10import java.util.concurrent.Executors;
11
12/**
13 * Created by zhiming on 2017-02-05.
14 */
15public class FairLockTest {
16
17 private String zkQurom = "localhost:2181";
18
19 private String lockName = "/mylock";
20
21 private String lockZnode = null;
22
23 private ZooKeeper zk;
24
25 public FairLockTest(){
26 try {
27 zk = new ZooKeeper(zkQurom, 6000, new Watcher() {
28 @Override
29 public void process(WatchedEvent watchedEvent) {
30 System.out.println("Receive event "+watchedEvent);
31 if(Event.KeeperState.SyncConnected == watchedEvent.getState())
32 System.out.println("connection is established...");
33 }
34 });
35 } catch (IOException e) {
36 e.printStackTrace();
37 }
38
39
40 }
41
42 private void ensureRootPath(){
43 try {
44 if (zk.exists(lockName,true)==null){
45 zk.create(lockName,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
46 }
47 } catch (Exception e) {
48 e.printStackTrace();
49 }
50 }
51 /**
52 * 获取锁
53 * @return
54 * @throws InterruptedException
55 */
56 public void lock(){
57 String path = null;
58 ensureRootPath();
59 try {
60 path = zk.create(lockName+"/mylock_", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
61 lockZnode = path;
62 List<String> minPath = zk.getChildren(lockName,false);
63 System.out.println(minPath);
64 Collections.sort(minPath);
65 System.out.println(minPath.get(0)+" and path "+path);
66 if (!Strings.nullToEmpty(path).trim().isEmpty()&&!Strings.nullToEmpty(minPath.get(0)).trim().isEmpty()&&path.equals(lockName+"/"+minPath.get(0))) {
67 System.out.println(Thread.currentThread().getName() + " get Lock...");
68 return;
69 }
70 String watchNode = null;
71 for (int i=minPath.size()-1;i>=0;i--){
72 if(minPath.get(i).compareTo(path.substring(path.lastIndexOf("/") + 1))<0){
73 watchNode = minPath.get(i);
74 break;
75 }
76 }
77
78 if (watchNode!=null){
79 final String watchNodeTmp = watchNode;
80 final Thread thread = Thread.currentThread();
81 Stat stat = zk.exists(lockName + "/" + watchNodeTmp,new Watcher() {
82 @Override
83 public void process(WatchedEvent watchedEvent) {
84 if(watchedEvent.getType() == Event.EventType.NodeDeleted){
85 thread.interrupt();
86 }
87 try {
88 zk.exists(lockName + "/" + watchNodeTmp,true);
89 } catch (KeeperException e) {
90 e.printStackTrace();
91 } catch (InterruptedException e) {
92 e.printStackTrace();
93 }
94 }
95
96 });
97 if(stat != null){
98 System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + lockName + "/" + watchNode);
99 }
100 }
101 try {
102 Thread.sleep(1000000000);
103 }catch (InterruptedException ex){
104 System.out.println(Thread.currentThread().getName() + " notify");
105 System.out.println(Thread.currentThread().getName() + " get Lock...");
106 return;
107 }
108
109 } catch (Exception e) {
110 e.printStackTrace();
111 }
112 }
113
114 /**
115 * 释放锁
116 */
117 public void unlock(){
118 try {
119 System.out.println(Thread.currentThread().getName() + "release Lock...");
120 zk.delete(lockZnode,-1);
121 } catch (InterruptedException e) {
122 e.printStackTrace();
123 } catch (KeeperException e) {
124 e.printStackTrace();
125 }
126 }
127
128
129
130 public static void main(String args[]) throws InterruptedException {
131 ExecutorService service = Executors.newFixedThreadPool(10);
132 for (int i = 0;i<4;i++){
133 service.execute(()-> {
134 FairLockTest test = new FairLockTest();
135 try {
136 test.lock();
137 Thread.sleep(3000);
138 } catch (InterruptedException e) {
139 e.printStackTrace();
140 }
141 test.unlock();
142 });
143 }
144 service.shutdown();
145 }
146
147}
148
同样上面的程序也有几点需要注意:
- Zookeeper的API没有提供直接的获取上一个节点或者最小节点的API需要我们自己实现。
- 使用了interrupt做线程的唤醒,这样不科学,因为不想将JVM的lock引进来所以没有用countdownlatch来做流程控制。
- Watch也是要重新设置的,这里使用了Watch的复用,所以代码简单些。
其实上面的实现还是很复杂的,因为你需要反复的去关注Watcher,实现一个Demo可以,做一个生产环境可用的Lock并不容易。因为你的代码bug在生产环境上会引起很严重的bug。
其实对于Zookeeper的一些常用功能是有一些成熟的包实现的,像Curator。Curator的确是足够牛逼,不仅封装了Zookeeper的常用API,也包装了很多常用Case的实现。但是它的编程风格其实还是吧比较难以接受的。
方案三:
使用Curator实现一个分布式锁
代码:
1
2
3
4
5
6 1<dependency>
2 <groupId>org.apache.curator</groupId>
3 <artifactId>curator-recipes</artifactId>
4 <version>2.4.1</version>
5</dependency>
6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 1import org.apache.curator.framework.CuratorFramework;
2import org.apache.curator.framework.CuratorFrameworkFactory;
3import org.apache.curator.framework.recipes.locks.InterProcessMutex;
4import org.apache.curator.retry.ExponentialBackoffRetry;
5import org.slf4j.Logger;
6import org.slf4j.LoggerFactory;
7
8import java.util.concurrent.TimeUnit;
9
10/**
11 * classname:DistributedLock
12 * desc:基于zookeeper的开源客户端Cruator实现分布式锁
13 * author:simonsfan
14 */
15public class DistributedLock {
16 public static Logger log = LoggerFactory.getLogger(DistributedLock.class);
17 private InterProcessMutex interProcessMutex; //可重入排它锁
18 private String lockName; //竞争资源标志
19 private String root = "/distributed/lock/";//根节点
20 private static CuratorFramework curatorFramework;
21 private static String ZK_URL = "zookeeper1.tq.master.cn:2181,zookeeper3.tq.master.cn:2181,zookeeper2.tq.master.cn:2181,zookeeper4.tq.master.cn:2181,zookeeper5.tq.master.cn:2181";
22 static{
23 curatorFramework= CuratorFrameworkFactory.newClient(ZK_URL,new ExponentialBackoffRetry(1000,3));
24 curatorFramework.start();
25 }
26
27 /**
28 * 实例化
29 * @param lockName
30 */
31 public DistributedLock(String lockName){
32 try {
33 this.lockName = lockName;
34 interProcessMutex = new InterProcessMutex(curatorFramework, root + lockName);
35 }catch (Exception e){
36 log.error("initial InterProcessMutex exception="+e);
37 }
38 }
39
40 /**
41 * 获取锁
42 */
43 public void acquireLock(){
44 int flag = 0;
45 try {
46 //重试2次,每次最大等待2s,也就是最大等待4s
47 while (!interProcessMutex.acquire(2, TimeUnit.SECONDS)){
48 flag++;
49 if(flag>1){ //重试两次
50 break;
51 }
52 }
53 } catch (Exception e) {
54 log.error("distributed lock acquire exception="+e);
55 }
56 if(flag>1){
57 log.info("Thread:"+Thread.currentThread().getId()+" acquire distributed lock busy");
58 }else{
59 log.info("Thread:"+Thread.currentThread().getId()+" acquire distributed lock success");
60 }
61 }
62
63 /**
64 * 释放锁
65 */
66 public void releaseLock(){
67 try {
68 if(interProcessMutex != null && interProcessMutex.isAcquiredInThisProcess()){
69 interProcessMutex.release();
70 curatorFramework.delete().inBackground().forPath(root+lockName);
71 log.info("Thread:"+Thread.currentThread().getId()+" release distributed lock success");
72 }
73 }catch (Exception e){
74 log.info("Thread:"+Thread.currentThread().getId()+" release distributed lock exception="+e);
75 }
76 }
77}
78
业务层使用时要记得释放锁。要特别注意的是 interProcessMutex.acquire(2, TimeUnit.SECONDS)方法,可以设定等待时候,加上重试的次数,即排队等待时间= acquire × 次数,这两个值一定要设置好,因为使用了分布式锁之后,接口的TPS就下降了,没获取到锁的接口都在等待/重试,如果这里设置的最大等待时间4s,这时并发进来1000个请求,4秒内处理不完1000个请求怎么办呢?所以一定要设置好这个重试次数及单次等待时间,根据自己的压测接口设置合理的阈值,避免业务流转发生问题!