ETCD分布式锁实现选主机制(Golang)
为什么要写这篇文章
做架构的时候,涉及到系统的一个功能,有一个服务必须在指定的节点执行,并且需要有个节点来做任务分发,想了半天,那就搞个主节点做这事呗,所以就有了这篇文章的诞生,我把踩的坑和收获记录下来,方便未来查看和各位兄弟们参考。
选主机制是什么
举个例子,分布式系统内,好几台机器,总得分个三六九等,发号施令的时候总得有个带头大哥站出来,告诉其他小弟我们今天要干嘛干嘛之类的,这个大哥就是master节点,master节点一般都是做信息处理分发,或者重要服务运行之类的。所以,选主机制就是,选一个master出来,这个master可用,并且可以顺利发消息给其他小弟,其他小弟也认为你是master,就可以了。
ETCD的分布式锁是什么
首先认为一点,它是唯一的,全局的,一个key值
为什么一定要强调这个唯一和全局呢,因为分布式锁就是指定只能让一个客户端访问这个key值,其他的没法访问,这样才能保证它的唯一性。
再一个,认为分布式锁是一个限时的,会过期的的key值
你创建了一个key,要保证访问它的客户端时刻online,类似一个“心跳”的机制,如果持有锁的客户端崩溃了,那么key值在过期后会被删除,其他的客户端也可以继续抢key,继续接力,实现高可用。
选主机制怎么设计
其实主要的逻辑前面都说清楚了,我在这里叙述下我该怎么做。
我们假设有三个节点,node1,node2,node3
-
三个节点都去创建一个全局的唯一key /dev/lock
-
谁先创建成功谁就是master主节点
-
其他节点持续待命继续获取,主节点继续续租key值(key值会过期)
持有key的节点down机,key值过期被删,其他节点创key成功,继续接力。
ETCD分布式锁简单实现
看一下ETCD的golang代码,还是给出了如何去实现一个分布式锁,这个比较简单,我先写一个简单的Demo说下几个接口的功能
-
创建锁
1
2
3
4
5
6
7
8
9
10 1kv = clientv3.NewKV(client)
2txn = kv.Txn(context.TODO())
3txn.If(clientv3.Compare(clientv3.CreateRevision("/dev/lock"),"=",0)).Then(
4clientv3.OpPut("/dev/lock","占用",clientv3.WithLease(leaseId))).Else(clientv3.OpGet("/dev/lock"))
5txnResponse,err = txn.Commit()
6if err !=nil{
7 fmt.Println(err)
8 return
9 }
10
-
判断是否抢到锁
1
2
3
4
5
6 1if txnResponse.Succeeded {
2 fmt.Println("抢到锁了")
3 } else {
4 fmt.Println("没抢到锁",txnResponse.Responses[0].GetResponseRange().Kvs[0].Value)
5 }
6
-
续租逻辑
1
2
3
4
5
6
7
8
9
10
11
12 1for {
2 select {
3 case leaseKeepAliveResponse = <-leaseKeepAliveChan:
4 if leaseKeepAliveResponse != nil{
5 fmt.Println("续租成功,leaseID :",leaseKeepAliveResponse.ID)
6 }else {
7 fmt.Println("续租失败")
8 }
9 }
10 time.Sleep(time.Second*1)
11 }
12
我的实现逻辑
首先我的逻辑就是,大家一起抢,谁抢到谁就一直续,要是不续了就另外的老哥上,能者居之嘛!我上一下我的实现代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118 1package main
2
3import (
4 "fmt"
5 "context"
6 "time"
7 //"reflect"
8
9 "go.etcd.io/etcd/clientv3"
10)
11
12var (
13 lease clientv3.Lease
14 ctx context.Context
15 cancelFunc context.CancelFunc
16 leaseId clientv3.LeaseID
17 leaseGrantResponse *clientv3.LeaseGrantResponse
18 leaseKeepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
19 leaseKeepAliveResponse *clientv3.LeaseKeepAliveResponse
20 txn clientv3.Txn
21 txnResponse *clientv3.TxnResponse
22 kv clientv3.KV
23)
24
25type ETCD struct {
26 client *clientv3.Client
27 cfg clientv3.Config
28 err error
29}
30
31// 创建ETCD连接服务
32func New(endpoints ...string) (*ETCD, error) {
33 cfg := clientv3.Config{
34 Endpoints: endpoints,
35 DialTimeout: time.Second * 5,
36 }
37
38 client, err := clientv3.New(cfg)
39 if err != nil {
40 fmt.Println("连接ETCD失败")
41 return nil, err
42 }
43
44 etcd := &ETCD{
45 cfg: cfg,
46 client: client,
47 }
48
49 fmt.Println("连接ETCD成功")
50 return etcd, nil
51}
52
53// 抢锁逻辑
54func (etcd *ETCD) Newleases_lock(ip string) (error) {
55 lease := clientv3.NewLease(etcd.client)
56 leaseGrantResponse, err := lease.Grant(context.TODO(), 5)
57 if err != nil {
58 fmt.Println(err)
59 return err
60 }
61 leaseId := leaseGrantResponse.ID
62 ctx, cancelFunc := context.WithCancel(context.TODO())
63 defer cancelFunc()
64 defer lease.Revoke(context.TODO(), leaseId)
65 leaseKeepAliveChan, err := lease.KeepAlive(ctx, leaseId)
66 if err != nil {
67 fmt.Println(err)
68 return err
69 }
70
71 // 初始化锁
72 kv := clientv3.NewKV(etcd.client)
73 txn := kv.Txn(context.TODO())
74 txn.If(clientv3.Compare(clientv3.CreateRevision("/dev/lock"), "=", 0)).Then(
75 clientv3.OpPut("/dev/lock", ip, clientv3.WithLease(leaseId))).Else(
76 clientv3.OpGet("/dev/lock"))
77 txnResponse, err := txn.Commit()
78 if err != nil {
79 fmt.Println(err)
80 return err
81 }
82 // 判断是否抢锁成功
83 if txnResponse.Succeeded {
84 fmt.Println("抢到锁了")
85 fmt.Println("选定主节点", ip)
86 // 续租节点
87 for {
88 select {
89 case leaseKeepAliveResponse = <-leaseKeepAliveChan:
90 if leaseKeepAliveResponse != nil {
91 fmt.Println("续租成功,leaseID :", leaseKeepAliveResponse.ID)
92 } else {
93 fmt.Println("续租失败")
94 }
95
96 }
97 }
98 } else {
99 // 继续回头去抢,不停请求
100 fmt.Println("没抢到锁", txnResponse.Responses[0].GetResponseRange().Kvs[0].Value)
101 fmt.Println("继续抢")
102 time.Sleep(time.Second * 1)
103 }
104 return nil
105}
106
107func main(){
108 // 连接ETCD
109 etcd, err := New("xxxxxxxx:2379")
110 if err != nil {
111 fmt.Println(err)
112 }
113 // 设定无限循环
114 for {
115 etcd.Newleases_lock("node1")
116 }
117}
118
总结
相关代码写入到github当中,其中的地址是
https://github.com/Alexanderklau/Go_poject/tree/master/Go-Etcd/lock_work
实现这个功能废了不少功夫,好久没写go了,自己太菜了,如果有老哥发现问题请联系我,好改正。