长长的一生,总要勤勤恳恳,刻苦专研。把一件事情做到极致,不然,岂非白活了?
之前看kafka的时候发现他用的是curator来做leader选举的,突然灵机一动,咦,这不就是把ZK分布式锁封装了一下嘛,咦,那是不是这个也可以做分布式锁呢?“好奇心害死猫”,赶紧在pom里面dependency一下,抓下来源码来看看到底怎么肥事。先把源码贴上,具体的分析自己先分析一波,分析之后找时间再补充上来!!!
一、InterProcessLock接口
1.1 InterProcessMutex类
这个是一个:分布式可重入排他锁
JDK里面获取锁是用acquire,这里也是用acquire,你说神奇不神奇,好理解不好理解~
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 1 private boolean internalLock(long time, TimeUnit unit) throws Exception
2 {
3 /*
4 Note on concurrency: a given lockData instance
5 can be only acted on by a single thread so locking isn't necessary
6 */
7
8 Thread currentThread = Thread.currentThread();
9
10 LockData lockData = threadData.get(currentThread);
11 if ( lockData != null )
12 {
13 // re-entering
14 lockData.lockCount.incrementAndGet();
15 return true;
16 }
17
18 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
19 if ( lockPath != null )
20 {
21 LockData newLockData = new LockData(currentThread, lockPath);
22 threadData.put(currentThread, newLockData);
23 return true;
24 }
25
26 return false;
27 }
28}
29
30
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 1 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
2 {
3 final long startMillis = System.currentTimeMillis();
4 final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
5 final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
6 int retryCount = 0;
7
8 String ourPath = null;
9 boolean hasTheLock = false;
10 boolean isDone = false;
11 while ( !isDone )
12 {
13 isDone = true;
14
15 try
16 {
17 ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
18 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
19 }
20 catch ( KeeperException.NoNodeException e )
21 {
22 // gets thrown by StandardLockInternalsDriver when it can't find the lock node
23 // this can happen when the session expires, etc. So, if the retry allows, just try it all again
24 if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
25 {
26 isDone = false;
27 }
28 else
29 {
30 throw e;
31 }
32 }
33 }
34
35 if ( hasTheLock )
36 {
37 return ourPath;
38 }
39
40 return null;
41 }
42
43
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 1
2 @Override
3 public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
4 {
5 String ourPath;
6 if ( lockNodeBytes != null )
7 {
8 ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
9 }
10 else
11 {
12 ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
13 }
14 return ourPath;
15 }
16
17
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 1private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
2 {
3 boolean haveTheLock = false;
4 boolean doDelete = false;
5 try
6 {
7 if ( revocable.get() != null )
8 {
9 client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
10 }
11
12 while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
13 {
14 List<String> children = getSortedChildren();
15 String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
16
17 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
18 if ( predicateResults.getsTheLock() )
19 {
20 haveTheLock = true;
21 }
22 else
23 {
24 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
25
26 synchronized(this)
27 {
28 try
29 {
30 // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
31 client.getData().usingWatcher(watcher).forPath(previousSequencePath);
32 if ( millisToWait != null )
33 {
34 millisToWait -= (System.currentTimeMillis() - startMillis);
35 startMillis = System.currentTimeMillis();
36 if ( millisToWait <= 0 )
37 {
38 doDelete = true; // timed out - delete our node
39 break;
40 }
41
42 wait(millisToWait);
43 }
44 else
45 {
46 wait();
47 }
48 }
49 catch ( KeeperException.NoNodeException e )
50 {
51 // it has been deleted (i.e. lock released). Try to acquire again
52 }
53 }
54 }
55 }
56 }
57 catch ( Exception e )
58 {
59 ThreadUtils.checkInterrupted(e);
60 doDelete = true;
61 throw e;
62 }
63 finally
64 {
65 if ( doDelete )
66 {
67 deleteOurPath(ourPath);
68 }
69 }
70 return haveTheLock;
71 }
72
73
1
2
3
4
5
6
7
8
9
10
11
12
13 1@Override
2 public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
3 {
4 int ourIndex = children.indexOf(sequenceNodeName);
5 validateOurIndex(sequenceNodeName, ourIndex);
6
7 boolean getsTheLock = ourIndex < maxLeases;
8 String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
9
10 return new PredicateResults(pathToWatch, getsTheLock);
11 }
12
13