curator分布式锁

释放双眼,带上耳机,听听看~!

长长的一生,总要勤勤恳恳,刻苦专研。把一件事情做到极致,不然,岂非白活了?

之前看kafka的时候发现他用的是curator来做leader选举的,突然灵机一动,咦,这不就是把ZK分布式锁封装了一下嘛,咦,那是不是这个也可以做分布式锁呢?“好奇心害死猫”,赶紧在pom里面dependency一下,抓下来源码来看看到底怎么肥事。先把源码贴上,具体的分析自己先分析一波,分析之后找时间再补充上来!!!

一、InterProcessLock接口

curator分布式锁

1.1 InterProcessMutex类

这个是一个:分布式可重入排他锁

curator分布式锁

curator分布式锁
curator分布式锁

curator分布式锁

JDK里面获取锁是用acquire,这里也是用acquire,你说神奇不神奇,好理解不好理解~

curator分布式锁


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1    private boolean internalLock(long time, TimeUnit unit) throws Exception
2    {
3        /*
4           Note on concurrency: a given lockData instance
5           can be only acted on by a single thread so locking isn't necessary
6        */
7
8        Thread currentThread = Thread.currentThread();
9
10        LockData lockData = threadData.get(currentThread);
11        if ( lockData != null )
12        {
13            // re-entering
14            lockData.lockCount.incrementAndGet();
15            return true;
16        }
17
18        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
19        if ( lockPath != null )
20        {
21            LockData newLockData = new LockData(currentThread, lockPath);
22            threadData.put(currentThread, newLockData);
23            return true;
24        }
25
26        return false;
27    }
28}
29
30

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
1    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
2    {
3        final long      startMillis = System.currentTimeMillis();
4        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
5        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
6        int             retryCount = 0;
7
8        String          ourPath = null;
9        boolean         hasTheLock = false;
10        boolean         isDone = false;
11        while ( !isDone )
12        {
13            isDone = true;
14
15            try
16            {
17                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
18                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
19            }
20            catch ( KeeperException.NoNodeException e )
21            {
22                // gets thrown by StandardLockInternalsDriver when it can't find the lock node
23                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
24                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
25                {
26                    isDone = false;
27                }
28                else
29                {
30                    throw e;
31                }
32            }
33        }
34
35        if ( hasTheLock )
36        {
37            return ourPath;
38        }
39
40        return null;
41    }
42
43

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1
2    @Override
3    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
4    {
5        String ourPath;
6        if ( lockNodeBytes != null )
7        {
8            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
9        }
10        else
11        {
12            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
13        }
14        return ourPath;
15    }
16
17

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
1private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
2    {
3        boolean     haveTheLock = false;
4        boolean     doDelete = false;
5        try
6        {
7            if ( revocable.get() != null )
8            {
9                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
10            }
11
12            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
13            {
14                List<String>        children = getSortedChildren();
15                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
16
17                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
18                if ( predicateResults.getsTheLock() )
19                {
20                    haveTheLock = true;
21                }
22                else
23                {
24                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
25
26                    synchronized(this)
27                    {
28                        try
29                        {
30                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
31                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
32                            if ( millisToWait != null )
33                            {
34                                millisToWait -= (System.currentTimeMillis() - startMillis);
35                                startMillis = System.currentTimeMillis();
36                                if ( millisToWait <= 0 )
37                                {
38                                    doDelete = true;    // timed out - delete our node
39                                    break;
40                                }
41
42                                wait(millisToWait);
43                            }
44                            else
45                            {
46                                wait();
47                            }
48                        }
49                        catch ( KeeperException.NoNodeException e )
50                        {
51                            // it has been deleted (i.e. lock released). Try to acquire again
52                        }
53                    }
54                }
55            }
56        }
57        catch ( Exception e )
58        {
59            ThreadUtils.checkInterrupted(e);
60            doDelete = true;
61            throw e;
62        }
63        finally
64        {
65            if ( doDelete )
66            {
67                deleteOurPath(ourPath);
68            }
69        }
70        return haveTheLock;
71    }
72
73

1
2
3
4
5
6
7
8
9
10
11
12
13
1@Override
2    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
3    {
4        int             ourIndex = children.indexOf(sequenceNodeName);
5        validateOurIndex(sequenceNodeName, ourIndex);
6
7        boolean         getsTheLock = ourIndex < maxLeases;
8        String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
9
10        return new PredicateResults(pathToWatch, getsTheLock);
11    }
12
13

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全运维

深入理解 Linux 内核---Ex2 和 Ex3 文件系统

2021-8-18 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索