Zookeeper实现分布式锁

释放双眼,带上耳机,听听看~!

文章目录

  • 什么是Zookeeper
  • Zookeeper集群机制
  • Zookeeper特性
  • Zookeeper数据结构
  • Zookeeper应用场景
  • Zookeeper的环境搭建(linux)
  • Zookeeper客户端
  • Zookeeper配置文件介绍
  • Java操作Zookeeper
  • Zookeeper的事件通知
  • 使用Zookeeper实现分布式锁的思路
  • Redis实现分布式锁代码实现

什么是Zookeeper


1
2
3
4
5
6
7
8
9
1Zookeeper是一个分布式开源框架,提供了协调分布式应用的基本服务,它向外部应用暴露一组通用服务——分布式同步(Distributed Synchronization)、命名服务(Naming Service)、集群维护(Group Maintenance)等,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。ZooKeeper本身可以以单机模式安装运行,不过它的长处在于通过分布式ZooKeeper集群(一个Leader,多个Follower),基于一定的策略来保证ZooKeeper集群的稳定性和可用性,从而实现分布式应用的可靠性。
21、zookeeper是为别的分布式程序服务的
32、Zookeeper本身就是一个分布式程序(只要有半数以上节点存活,zk就能正常服务)
43、Zookeeper所提供的服务涵盖:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统> 一名称服务等
54、虽然说可以提供各种服务,但是zookeeper在底层其实只提供了两个功能:
6管理(存储,读取)用户程序提交的数据(类似namenode中存放的metadata);
7并为用户程序提供数据节点监听服务;
8
9

Zookeeper集群机制


1
2
3
4
1Zookeeper集群的角色: Leader 和 follower
2只要集群中有半数以上节点存活,集群就能提供服务
3
4

Zookeeper特性


1
2
3
4
5
6
7
8
11、Zookeeper:一个leader,多个follower组成的集群
22、全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的
33、分布式读写,更新请求转发,由leader实施
44、更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行
55、数据更新原子性,一次数据更新要么成功,要么失败
66、实时性,在一定时间范围内,client能读到最新数据
7
8

Zookeeper数据结构


1
2
3
4
5
6
7
8
9
10
11
12
13
14
11、层次化的目录结构,命名符合常规文件系统规范(类似文件系统),如下图所示
22、每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
33、节点Znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点)
4节点类型
5a、Znode有两种类型:
6短暂(ephemeral)(create -e /app1/test1 “test1” 客户端断开连接zk删除ephemeral类型节点)
7持久(persistent) (create -s /app1/test2 “test2” 客户端断开连接zk不删除persistent类型节点)
8b、Znode有四种形式的目录节点(默认是persistent )
9PERSISTENT
10PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )
11EPHEMERAL
12EPHEMERAL_SEQUENTIAL
13
14

Zookeeper实现分布式锁

Zookeeper应用场景


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1统一命名服务
2       分布式环境下,经常需要对应用/服务进行统一命名,便于识别不同服务。类似于域名与ip之间对应关系,域名容易记住。通过名称来获取资源或服务的地址,提供者等信息按照层次结构组织服务/应用名称可将服务名称以及地址信息写到Zookeeper上,客户端通过Zookeeper获取可用服务列表类。
3配置管理
4       分布式环境下,配置文件管理和同步是一个常见问题。一个集群中,所有节点的配置信息是一致的,比如Hadoop。对配置文件修改后,希望能够快速同步到各个节点上配置管理可交由Zookeeper实现。可将配置信息写入Zookeeper的一个znode上。各个节点监听这个znode。一旦znode中的数据被修改,zookeeper将通知各个节点。
5集群管理
6       分布式环境中,实时掌握每个节点的状态是必要的。可根据节点实时状态作出一些调整。Zookeeper可将节点信息写入Zookeeper的一个znode上。监听这个znode可获取它的实时状态变化。典型应用比如Hbase中Master状态监控与选举。
7分布式通知/协调
8       分布式环境中,经常存在一个服务需要知道它所管理的子服务的状态。例如,NameNode须知道各DataNode的状态,JobTracker须知道各TaskTracker的状态。心跳检测机制和信息推送也是可通过Zookeeper实现。
9分布式锁
10       Zookeeper是强一致的。多个客户端同时在Zookeeper上创建相同znode,只有一个创建成功。Zookeeper实现锁的独占性。多个客户端同时在Zookeeper上创建相同znode ,创建成功的那个客户端得到锁,其他客户端等待。Zookeeper 控制锁的时序。各个客户端在某个znode下创建临时znode (类型为CreateMode. EPHEMERAL _SEQUENTIAL),这样,该znode可掌握全局访问时序。
11分布式队列
12       两种队列。当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。(可通过分布式锁实现)
13       同步队列。一个job由多个task组成,只有所有任务完成后,job才运行完成。可为job创建一个/job目录,然后在该目录下,为每个完成的task创建一个临时znode,一旦临时节点数目达到task总数,则job运行完成。
14
15

Zookeeper的环境搭建(linux)

加入说目前我们有三台机器。IP分别为
192.168.110.154
192.168.110.158
192.168.110.159
环境要求


1
2
3
4
5
1必须要有jdk环境,关于linux的jdk的安装,麻烦大家动动可爱的小手,百度下
2# 关闭防火墙
3service iptable stop
4
5

结构


1
2
3
4
1一共三个节点
2(zk服务器集群规模不小于3个节点),要求服务器之间系统时间保持一致。
3
4

上传zk并且解压


1
2
3
4
1进行解压: tar -zxvf zookeeper-3.4.6.tar.gz
2重命名: mv zookeeper-3.4.6 zookeeper
3
4

修改zookeeper环境变量


1
2
3
4
5
6
7
8
9
10
1# 编辑文件
2vi /etc/profile
3export JAVA_HOME=/opt/jdk1.8.0_71
4export ZOOKEEPER_HOME=/usr/local/zookeeper
5export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
6export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH
7# 检测环境变成是否报错
8source /etc/profile
9
10

修改zoo_sample.cfg文件


1
2
3
4
5
6
7
8
9
1cd /usr/local/zookeeper/conf
2vim zoo_sample.cfg
3(1) dataDir=/usr/local/zookeeper/data(注意同时在zookeeper创建data目录)
4(2)最后面添加
5server.0=192.168.110.154:2888:3888 # 2888 底层连接的端口号
6server.1=192.168.110.158:2888:3888
7server.2=192.168.110.159:2888:3888
8
9

创建服务器标识


1
2
3
4
5
6
7
8
9
10
11
12
13
1# 进入到文件目录
2cd /usr/local/zookeeper/data
3# 创建一个文件并且编辑
4vim myid
5# 填写内容
60
7
8# 其他服务也进行相同的操作,myid的编号分别为 Ip对象的server编号
9server.0=192.168.110.154:2888:3888 # 2888 底层连接的端口号
10server.1=192.168.110.158:2888:3888
11server.2=192.168.110.159:2888:3888
12
13

启动Zookeeper


1
2
3
4
5
6
7
8
9
1# 进入路径
2cd /usr/local/zookeeper/bin
3# 启动
4zkServer.sh start
5(注意这里3台机器都要进行启动)
6# 检查状态
7zkServer.sh status(在三个节点上检验zk的mode,一个leader和俩个follower),具体如下图,可以看见每个服务器的状态
8
9

Zookeeper实现分布式锁

Zookeeper客户端


1
2
3
4
5
6
7
8
9
10
11
12
13
1ZooKeeper命令行工具类似于Linux的shell环境,不过功能肯定不及shell啦,但是使用它我们可以简单的对ZooKeeper进行访问,数据创建,数据修改等操作.  使用 zkCli.sh -server 127.0.0.1:2181 连接到 ZooKeeper 服务,连接成功后,系统会输出 ZooKeeper 的相关环境以及配置信息。
2命令行工具的一些简单操作如下:
3•    1. 显示根目录下、文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容
4•    2. 显示根目录下、文件: ls2 / 查看当前节点数据并能看到更新次数等数据
5•    3. 创建文件,并设置初始内容: create /zk "test" 创建一个新的 znode节点“ zk ”以及与它关联的字符串
6•    4. 获取文件内容: get /zk 确认 znode 是否包含我们所创建的字符串
7•    5. 修改文件内容: set /zk "zkbak" 对 zk 所关联的字符串进行设置
8•    6. 删除文件: delete /zk 将刚才创建的 znode 删除
9•    7. 退出客户端: quit
10•   8. 帮助命令: help
11
12
13

Zookeeper配置文件介绍


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1
2# 心跳时间,为了确保连接存在的,以毫秒为单位,最小超时时间为两个心跳时间
3tickTime=2000
4
5
6# 多少个心跳时间内,允许其他server连接并初始化数据,如果ZooKeeper管理的数据较大,则应相应增大这个值
7initLimit=10
8
9
10# 多少个tickTime内,允许follower同步,如果follower落后太多,则会被丢弃。
11syncLimit=5
12
13
14# 用于存放内存数据库快照的文件夹,同时用于集群的myid文件也存在这个文件夹里(注意:一个配置文件只能包含一个dataDir字样,即使它被注释掉了。)
15dataDir=/home/myuser/zooA/data
16
17
18# 服务的监听端口
19clientPort=2181
20
21
22# server.A=B:C:D:
23#A是一个数字,表示这个是第几号服务器,B是这个服务器的ip地址
24#C第一个端口用来集群成员的信息交换,表示的是这个服务器与集群中的Leader服务器交换信息的端口
25#D是在leader挂掉时专门用来进行选举leader所用
26
27server.1=127.0.0.1:2888:3888
28server.2=127.0.0.1:2988:3988  
29server.3=127.0.0.1:2088:3088
30
31# 用于单独设置transaction log的目录,transaction log分离可以避免和普通log还有快照的竞争
32dataLogDir=/home/myuser/zooA/log
33
34

Java操作Zookeeper

依赖


1
2
3
4
5
6
7
1<dependency>
2           <groupId>org.apache.zookeeper</groupId>
3           <artifactId>zookeeper</artifactId>
4           <version>3.4.6</version>
5       </dependency>
6
7

客户端


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
1public class ZookeeperDemo {
2   /**
3    * 集群连接地址
4    */
5   private static final String CONNECT_ADDR = "192.168.110.138:2181,192.168.110.147:2181,192.168.110.148:2181";
6   /**
7    * session超时时间
8    */
9   private static final int SESSION_OUTTIME = 2000;
10  /**
11   * 信号量,阻塞程序执行,用户等待zookeeper连接成功,发送成功信号,
12   */
13  private static final CountDownLatch countDownLatch = new CountDownLatch(1);
14
15  public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
16      ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher() {
17
18          public void process(WatchedEvent event) {
19              // 获取时间的状态
20              KeeperState keeperState = event.getState();
21              EventType tventType = event.getType();
22              // 如果是建立连接
23              if (KeeperState.SyncConnected == keeperState) {
24                  if (EventType.None == tventType) {
25                      // 如果建立连接成功,则发送信号量,让后阻塞程序向下执行  类似notify
26                      countDownLatch.countDown();
27                      System.out.println("zk 建立连接");
28                  }
29              }
30          }
31
32      });
33      // 进行阻塞 类似join
34      countDownLatch.await();
35      //创建父节点
36//        String result = zk.create("/testRott", "12245465".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
37//        System.out.println("result:" + result);
38      //创建子节点
39      String result = zk.create("/testRott/children", "children 12245465".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
40      System.out.println("result:"+result);
41      zk.close();
42  }
43
44}
45
46

Zookeeper的事件通知

Watcher


1
2
3
4
5
1在ZooKeeper中,接口类Watcher用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,
2包含KeeperState和EventType两个枚举类,分别代表了通知状态和事件类型,对节点的CRUD进行监听
3同时定义了事件的回调方法:process(WatchedEvent event)。
4
5

代码


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
1public class ZkClientWatcher implements Watcher {
2   // 集群连接地址
3   private static final String CONNECT_ADDRES = "192.168.110.159:2181,192.168.110.160:2181,192.168.110.162:2181";
4   // 会话超时时间
5   private static final int SESSIONTIME = 2000;
6   // 信号量,让zk在连接之前等待,连接成功后才能往下走.
7   private static final CountDownLatch countDownLatch = new CountDownLatch(1);
8   private static String LOG_MAIN = "【main】 ";
9   private ZooKeeper zk;
10
11  public void createConnection(String connectAddres, int sessionTimeOut) {
12      try {
13          zk = new ZooKeeper(connectAddres, sessionTimeOut, this);
14          System.out.println(LOG_MAIN + "zk 开始启动连接服务器....");
15          countDownLatch.await();
16      } catch (Exception e) {
17          e.printStackTrace();
18      }
19  }
20
21  public boolean createPath(String path, String data) {
22      try {
23          this.exists(path, true);
24          this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
25          System.out.println(LOG_MAIN + "节点创建成功, Path:" + path + ",data:" + data);
26      } catch (Exception e) {
27          e.printStackTrace();
28          return false;
29      }
30      return true;
31  }
32
33  /**
34   * 判断指定节点是否存在
35   *
36   * @param path
37   *            节点路径
38   */
39  public Stat exists(String path, boolean needWatch) {
40      try {
41          return this.zk.exists(path, needWatch);
42      } catch (Exception e) {
43          e.printStackTrace();
44          return null;
45      }
46  }
47
48  public boolean updateNode(String path,String data) throws KeeperException, InterruptedException {
49      exists(path, true);
50      this.zk.setData(path, data.getBytes(), -1);
51      return false;
52  }
53
54  public void process(WatchedEvent watchedEvent) {
55
56      // 获取事件状态
57      KeeperState keeperState = watchedEvent.getState();
58      // 获取事件类型
59      EventType eventType = watchedEvent.getType();
60      // zk 路径
61      String path = watchedEvent.getPath();
62      System.out.println("进入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path);
63      // 判断是否建立连接
64      if (KeeperState.SyncConnected == keeperState) {
65          if (EventType.None == eventType) {
66              // 如果建立建立成功,让后程序往下走
67              System.out.println(LOG_MAIN + "zk 建立连接成功!");
68              countDownLatch.countDown();
69          } else if (EventType.NodeCreated == eventType) {
70              System.out.println(LOG_MAIN + "事件通知,新增node节点" + path);
71          } else if (EventType.NodeDataChanged == eventType) {
72              System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被修改....");
73          }
74          else if (EventType.NodeDeleted == eventType) {
75              System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被删除....");
76          }
77
78      }
79      System.out.println("--------------------------------------------------------");
80  }
81
82  public static void main(String[] args) throws KeeperException, InterruptedException {
83      ZkClientWatcher zkClientWatcher = new ZkClientWatcher();
84      zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME);
85//        boolean createResult = zkClientWatcher.createPath("/p15", "pa-644064");
86      zkClientWatcher.updateNode("/pa2","7894561");
87  }
88
89}
90
91
92

使用Zookeeper实现分布式锁的思路

分布式锁使用Zookeeper,会先zk上创建一个临时节点(有效期),使用临时节点资源,因为节点不允许重复,如果能创建节点成功,则生成订单号,如果失败,则创建是订单号失败,等待。在释放锁的时候,可以通过他给他的设置的有效期自动释放,或者自己关闭,然后其他节点就可以创建订单号啦。
核心思想:zk节点唯一,创建就能获取到锁,获取不到就等待

Redis实现分布式锁代码实现


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
1
2#####生成订单号######
3import java.text.SimpleDateFormat;
4import java.util.Date;
5
6//生成订单号
7public class OrderNumGenerator {
8   private static int count = 0;
9    //生成订单号
10  public String getOrderNumber() {
11      SimpleDateFormat smt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
12      return smt.format(new Date()) + "-" + ++count;
13  }
14
15}
16#####订单业务逻辑######
17public class OrderService implements Runnable {
18  private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
19  private static Object oj = new Object();
20  private Lock lock = new ZookeeperDistrbuteLock();
21
22  public void run() {
23      getNumber();
24  }
25
26  public void getNumber() {
27      // synchronized (oj) {
28      lock.getLock();
29      String orderNumber = orderNumGenerator.getOrderNumber();
30      System.out.println("获取订单号:" + orderNumber);
31      lock.unLock();
32      // }
33
34  }
35
36  public static void main(String[] args) {
37      for (int i = 0; i < 100; i++) {
38          new Thread(new OrderService()).start();
39      }
40  }
41
42}
43
44#####lock接口 ######
45public interface Lock {
46  // 获取锁
47  public void getLock();
48    // 释放锁
49  public void unLock();
50}
51
52#####ZookeeperAbstractLock抽象类接口 ######
53public abstract class ZookeeperAbstractLock implements Lock {
54  private static final String CONNECT_ADDRES = "192.168.110.159:2181,192.168.110.160:2181,192.168.110.162:2181";
55
56  protected ZkClient zkClient = new ZkClient(CONNECT_ADDRES);
57  protected String PATH = "/lock";
58
59  public void getLock() {
60      // 如果当前节点已经存在,则等待
61      if (tryLock()) {
62          System.out.println("获取到锁 get");
63      } else {
64          // 等待
65          waitLock();
66          // 重新获取锁
67          getLock();
68      }
69  }
70
71  protected abstract void waitLock();
72
73  protected abstract boolean tryLock();
74
75  public void unLock() {
76      if (zkClient != null) {
77          zkClient.close();
78      }
79      System.out.println("已经释放锁...");
80  }
81#####ZookeeperAbstractLock抽象类接口 ######
82//实现锁
83public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock {
84  private CountDownLatch countDownLatch = new CountDownLatch(1);
85
86  @Override
87  protected boolean tryLock() {
88      try {
89          zkClient.createEphemeral(PATH);
90          // 创建成功
91          return true;
92      } catch (Exception e) {
93          // 创建失败
94          return false;
95      }
96
97  }
98
99  @Override
100 protected void waitLock() {
101     try {
102         IZkDataListener iZkDataListener = new IZkDataListener() {
103
104             public void handleDataDeleted(String path) throws Exception {
105                 // 唤醒等待线程, 继续往下走.
106                 if (countDownLatch != null) {
107                     countDownLatch.countDown();
108                 }
109             }
110
111             public void handleDataChange(String path, Object data) throws Exception {
112
113             }
114         };
115         // 注册到zk监听中
116         zkClient.subscribeDataChanges(PATH, iZkDataListener);
117         if (zkClient.exists(PATH)) {
118             countDownLatch = new CountDownLatch(1);
119
120             // 等待
121             countDownLatch.await();
122
123         }
124         // 删除事件通知
125         zkClient.unsubscribeDataChanges(PATH, iZkDataListener);
126     } catch (Exception e) {
127         // TODO: handle exception
128     }
129 }
130
131}
132
133
134

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全经验

高并发架构实战(九) Spring Boot集Kafka

2021-11-28 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索