Redis实现分布式锁
1.根据lockKey区进行setnx(set not exist,如果key值为空,则正常设置,返回1,否则不会进行设置并返回0)操作,如果设置成功,表示已经获得锁,否则并没有获取锁。
2.如果没有获得锁,去Redis上拿到该key对应的值,在该key上我们存储一个时间戳(用毫秒表示,t1),为了避免死锁以及其他客户端占用该锁超过一定时间(5秒),使用该客户端当前时间戳,与存储的时间戳作比较。
3.如果没有超过该key的使用时限,返回false,表示其他人正在占用该key,不能强制使用;如果已经超过时限,那我们就可以进行解锁,使用我们的时间戳来代替该字段的值。
4.但是如果在setnx失败后,get该值却无法拿到该字段时,说明操作之前该锁已经被释放,这个时候,最好的办法就是重新执行一遍setnx方法来获取其值以获得该锁。
释放锁:删除redis中key
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 11 public class RedisKeyLock {
2 2 private static Logger logger = Logger.getLogger(RedisKeyLock.class);
3 3 private final static long ACCQUIRE_LOCK_TIMEOUT_IN_MS = 10 * 1000;
4 4 private final static int EXPIRE_IN_SECOND = 5;//锁失效时间
5 5 private final static long WAIT_INTERVAL_IN_MS = 100;
6 6 private static RedisKeyLock lock;
7 7 private JedisPool jedisPool;
8 8 private RedisKeyLock(JedisPool pool){
9 9 this.jedisPool = pool;
1010 }
1111 public static RedisKeyLock getInstance(JedisPool pool){
1212 if(lock == null){
1313 lock = new RedisKeyLock(pool);
1414 }
1515 return lock;
1616 }
1717
1818 public void lock(final String redisKey) {
1919 Jedis resource = null;
2020 try {
2121 long now = System.currentTimeMillis();
2222 resource = jedisPool.getResource();
2323 long timeoutAt = now + ACCQUIRE_LOCK_TIMEOUT_IN_MS;
2424 boolean flag = false;
2525 while (true) {
2626 String expireAt = String.valueOf(now + EXPIRE_IN_SECOND * 1000);
2727 long ret = resource.setnx(redisKey, expireAt);
2828 if (ret == 1) {//已获取锁
2929 flag = true;
3030 break;
3131 } else {//未获取锁,重试获取锁
3232 String oldExpireAt = resource.get(redisKey);
3333 if (oldExpireAt != null && Long.parseLong(oldExpireAt) < now) {
3434 oldExpireAt = resource.getSet(redisKey, expireAt);
3535 if (Long.parseLong(oldExpireAt) < now) {
3636 flag = true;
3737 break;
3838 }
3939 }
4040 }
4141 if (timeoutAt < now) {
4242 break;
4343 }
4444 TimeUnit.NANOSECONDS.sleep(WAIT_INTERVAL_IN_MS);
4545 }
4646 if (!flag) {
4747 throw new RuntimeException("canot acquire lock now ...");
4848 }
4949 } catch (JedisException je) {
5050 logger.error("lock", je);
5151 je.printStackTrace();
5252 if (resource != null) {
5353 jedisPool.returnBrokenResource(resource);
5454 }
5555 } catch (Exception e) {
5656 e.printStackTrace();
5757 logger.error("lock", e);
5858 } finally {
5959 if (resource != null) {
6060 jedisPool.returnResource(resource);
6161 }
6262 }
6363 }
6464 public boolean unlock(final String redisKey) {
6565 Jedis resource = null;
6666 try {
6767 resource = jedisPool.getResource();
6868 resource.del(redisKey);
6969 return true;
7070 } catch (JedisException je) {
7171 je.printStackTrace();
7272 if (resource != null) {
7373 jedisPool.returnBrokenResource(resource);
7474 }
7575 return false;
7676 } catch (Exception e) {
7777 logger.error("lock", e);
7878 return false;
7979 } finally {
8080 if (resource != null) {
8181 jedisPool.returnResource(resource);
8282 }
8383 }
8484 }
8585 }
86
另一个版本:
SET my:lock 随机值 NX PX 30000
这个的NX的意思就是只有key不存在的时候才会设置成功,PX 30000的意思是30秒后锁自动释放。别人创建的时候如果发现已经有了就不能加锁了。
释放锁就是删除key,但是一般可以用lua脚本删除,判断value一样才删除
为啥要用随机值呢?因为如果某个客户端获取到了锁,但是阻塞了很长时间才执行完,此时可能已经自动释放锁了,此时可能别的客户端已经获取到了这个锁,要是你这个时候直接删除key的话会有问题,所以得用随机值加上面的lua脚本来释放锁。(就是根据这个随机值来判断这个锁是不是自己加的)
如果是Redis是单机,会有问题。
因为如果是普通的redis单实例,那就是单点故障。单节点挂了会导致锁失效。
如果是redis普通主从,那redis主从异步复制,如果主节点挂了,key还没同步到从节点,此时从节点切换为主节点,别人就会拿到锁。
RedLock算法
这个场景是假设有一个redis cluster,有5个redis master实例。然后执行如下步骤获取一把锁:
获取当前时间戳,单位是毫秒
跟上面类似,轮流尝试在每个master节点上创建锁,过期时间较短,一般就几十毫秒
尝试在大多数节点上建立一个锁,比如5个节点就要求是3个节点(n / 2 +1)
客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了
要是锁建立失败了,那么就依次删除这个锁
只要别人建立了一把分布式锁,你就得不断轮询去尝试获取锁
Zookeeper实现分布式锁
基于临时顺序节点:
1.客户端调用create()方法创建名为“locknode/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。
2.客户端调用getChildren(“locknode”)方法来获取所有已经创建的子节点。
3.客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。
4.如果创建的节点不是所有节点中序号最小的,那么则监视比自己创建节点的序列号小的最大的节点,进入等待。直到下次监视的子节点变更的时候,再进行子节点的获取,判断是否获取锁。
释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可。
不太严谨的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126 11 public class ZooKeeperDistributedLock implements Watcher{
2 2
3 3 private ZooKeeper zk;
4 4 private String locksRoot= "/locks";
5 5 private String productId;
6 6 private String waitNode;
7 7 private String lockNode;
8 8 private CountDownLatch latch;
9 9 private CountDownLatch connectedLatch = new CountDownLatch(1);
10 10 private int sessionTimeout = 30000;
11 11
12 12 public ZooKeeperDistributedLock(String productId){
13 13 this.productId = productId;
14 14 try {
15 15 String address = "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181";
16 16 zk = new ZooKeeper(address, sessionTimeout, this);
17 17 connectedLatch.await();
18 18 } catch (IOException e) {
19 19 throw new LockException(e);
20 20 } catch (KeeperException e) {
21 21 throw new LockException(e);
22 22 } catch (InterruptedException e) {
23 23 throw new LockException(e);
24 24 }
25 25 }
26 26
27 27 public void process(WatchedEvent event) {
28 28 if(event.getState()==KeeperState.SyncConnected){
29 29 connectedLatch.countDown();
30 30 return;
31 31 }
32 32
33 33 if(this.latch != null) {
34 34 this.latch.countDown();
35 35 }
36 36 }
37 37
38 38 public void acquireDistributedLock() {
39 39 try {
40 40 if(this.tryLock()){
41 41 return;
42 42 }
43 43 else{
44 44 waitForLock(waitNode, sessionTimeout);
45 45 }
46 46 } catch (KeeperException e) {
47 47 throw new LockException(e);
48 48 } catch (InterruptedException e) {
49 49 throw new LockException(e);
50 50 }
51 51 }
52 52
53 53 public boolean tryLock() {
54 54 try {
55 55 // 传入进去的locksRoot + “/” + productId
56 56 // 假设productId代表了一个商品id,比如说1
57 57 // locksRoot = locks
58 58 // /locks/10000000000,/locks/10000000001,/locks/10000000002
59 59 lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
60 60
61 61 // 看看刚创建的节点是不是最小的节点
62 62 // locks:10000000000,10000000001,10000000002
63 63 List<String> locks = zk.getChildren(locksRoot, false);
64 64 Collections.sort(locks);
65 65
66 66 if(lockNode.equals(locksRoot+"/"+ locks.get(0))){
67 67 //如果是最小的节点,则表示取得锁
68 68 return true;
69 69 }
70 70
71 71 //如果不是最小的节点,找到比自己小1的节点
72 72 int previousLockIndex = -1;
73 73 for(int i = 0; i < locks.size(); i++) {
74 74 if(lockNode.equals(locksRoot + “/” + locks.get(i))) {
75 75 previousLockIndex = i - 1;
76 76 break;
77 77 }
78 78 }
79 79
80 80 this.waitNode = locks.get(previousLockIndex);
81 81 } catch (KeeperException e) {
82 82 throw new LockException(e);
83 83 } catch (InterruptedException e) {
84 84 throw new LockException(e);
85 85 }
86 86 return false;
87 87 }
88 88
89 89 private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException {
90 90 Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
91 91 if(stat != null){
92 92 this.latch = new CountDownLatch(1);
93 93 this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null;
94 94 }
95 95 return true;
96 96 }
97 97
98 98 public void unlock() {
99 99 try {
100100 // 删除/locks/10000000000节点
101101 // 删除/locks/10000000001节点
102102 System.out.println("unlock " + lockNode);
103103 zk.delete(lockNode,-1);
104104 lockNode = null;
105105 zk.close();
106106 } catch (InterruptedException e) {
107107 e.printStackTrace();
108108 } catch (KeeperException e) {
109109 e.printStackTrace();
110110 }
111111 }
112112
113113 public class LockException extends RuntimeException {
114114 private static final long serialVersionUID = 1L;
115115 public LockException(String e){
116116 super(e);
117117 }
118118 public LockException(Exception e){
119119 super(e);
120120 }
121121 }
122122
123123 // 如果有一把锁,被多个人给竞争,此时多个人会排队,第一个拿到锁的人会执行,然后释放锁,后面的每个人都会去监听排在自己前面的那个人创建的node上,一旦某个人释放了锁,排在自己后面的人就会被zookeeper给通知,一旦被通知了之后,就ok了,自己就获取到了锁,就可以执行代码了
124124
125125 }
126
另一个版本:
zk分布式锁,就是某个节点尝试创建临时znode,此时创建成功了就获取了这个锁;这个时候别的客户端来创建锁会失败,只能注册个监听器监听这个锁。
释放锁就是删除这个znode,一旦释放掉就会通知客户端,然后有一个等待着的客户端就可以再次重新加锁。
redis分布式锁,其实需要自己不断去尝试获取锁,比较消耗性能
zk分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小
另外一点就是,如果是redis获取锁的那个客户端bug了或者挂了,那么只能等待超时时间之后才能释放锁;而zk的话,因为创建的是临时znode,只要客户端挂了,znode就没了,此时就自动释放锁