zookeeper
介绍
分布式是指多台不同的服务器中部署不同的服务模块,通过远程调用协同工作,对外提供服务,由于远程调用会出现网络故障等问题,如何保持数据一致性和可用性则成为要解决的问题。而zookeeper是一个分布式服务协调组件,是一个高性能的分布式数据一致性的解决方案。
特性
一致性:数据一致性,数据按照顺序分批入库。
原子性:事务要么成功要么失败,不会局部化
单一视图:客户端连接集群中的任一zk节点,数据都是一致的
可靠性:每次对zk的操作状态都会保存在服务端
实时性:客户端可以读取到zk服务端的最新数据
安装及简单运行
去官网下载压缩包到本地解压,然后把zoo_sample.cfg改为zoo.cfg并按实际情况填写配置文件,在bin目录下运行启动脚本
zookeeper主要目录结构
bin:主要的一些运行命令
conf:存放配置文件
contrib:附加的一些功能
dist-maven:mvn编译后的目录
docs:文档
lib:需要依赖的jar包
recipes:案例demo代码
src:源码
zoo.cfg配置
tickTime:用于计算的时间单位。比如session超时:N*tickTime
initLimit:用于集群,允许从节点连接并同步到master节点的初始化连接时间,以tickTime的倍数表示
syncLimit:用于集群,master主节点与从节点之间发送消息,请求和应答时间长度。(心跳机制)
dataDir:必须配置,数据目录
dataLogDir:日志目录,如果不配置会和dataDir同一个公用目录
clientPort:连接服务器的端口,默认2181
zookeeper基本数据模型
是一个树形结构,类似linux的文件目录结构
每个节点称为znode,它可以有子节点,也可以有数据
节点分为临时节点和永久节点,临时节点在客户端断开后失效
每个zk节点都有各自的版本号,可以通过命令行来显示节点信息
每当节点数据发生变化,那么该节点的版本会累加(乐观锁)
删除/修改过时节点,版本号不匹配会报错
每个zk节点存储的数据不宜过大,几k即可
节点可以设置权限acl,可以通过权限来限制用户的访问、区分环境等等,acl这里就不做介绍了
master节点选举,主节点挂了以后,从节点就会接受工作,并且保证这个节点是唯一的,这也是所谓首脑模式,从而保证我们的集群是高可用的
常见运用
统一配置文件管理,即只需要部署一台服务器,则可以把相同的配置文件同步更新到其他所有服务器,此操作在云计算中用的特别多。
发布与订阅,类似于消息队列,dubbo发布者把数据存在znode上,订阅者读取这个数据
提供分布式锁,分布式环境中不同进程之间争夺资源,类似于多线程中的锁
集群管理,保证集群中数据的强一致性
命令
ls 子节点
ls2 ls+stat
stat 状态信息
czxid:节点被创建的事务ID
ctime: 节点创建时间
mzxid: 最后一次被更新的事务ID
mtime: 节点修改时间
pzxid:子节点列表最后一次被更新的事务ID
cversion:子节点的版本号
dataversion:数据版本号
aclversion:权限版本号
ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0
dataLength:节点存储的数据的长度 numChildren:当前节点的子节点个数
get 数据+stat
session基本原理
客户端与服务端之间的链接存在会话
每个会话都会可以设置一个超时时间
心跳结束,session则过期
session过期,则临时节点znode会被抛弃
心跳机制:客户端向服务端的ping包请求
create:创建 -e临时节点 -s 有序节点
set:修改
delete:删除
watcher机制:
针对每个节点的操作,都有要给监督者->watcher
当监控的某个对象(znode)发生了变换,则触发了watcher事件
zk中的watcher是一次性的,触发后立即销毁
父节点,子节点 增删改都能够触发其watcher
针对不同类型的操作,触发的watcher事件也不同:
节点创建、删除、数据变化事件
注意watcher只可以使用一次,stat、get、ls、ls2后面均可以加watcher进行监听。ls为父节点设置watcher,创建、删除子节点触发:NodeChildrenChanged,修改不会触发
关于zookeeper更详细介绍可以看看这篇文章https://www.cnblogs.com/luxiaoxun/p/4887452.html
下面开始编码的环节
这里的格式也是像前一篇文章一样 {interface:{url:impl}},而在zookeeper上面的格式为 /interface/url:data(具体实现类的名字),
而心跳机制的实现,是通过服务提供者定时向注册中心发送本机地址(心跳数据包),而注册中心的监控则维持一个channelId和具体地址的map,并且通过IdleHandler监听空闲事件,到达一定的空闲次数则认为不活跃,当不活跃时(这里的不活跃条件是5分钟内3次以上没有发送心跳包),zookeeper删除相应的url节点,但后续的逻辑没有继续做,比如:服务提供方在网络稳定后尝试重新发送心跳包,注册中心通过一定的计算(比如在一定时间内的心跳发送率达到一定的值)认为该ip可用了,就尝试重新向zookeeper注册该ip,而且也可以在本地维持一个map存放接口信息,并添加监听事件去更新可用列表,可以优化的点还很多,这里暂时先接入zookeeper并简单地演示通过心跳来移除不稳定服务
这里采用curator作为zookeeper的客户端
首先编写关于zookeeper的业务操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 1
2/**
3 * @author lulu
4 * @Date 2019/11/18 21:17
5 * 负责实现注册中心具体的业务功能
6 */
7public class ZkRegister {
8 //{接口:{URL:实现类名}},这里可以为每个接口建立子节点,节点名为url地址,值为className
9 private static CuratorFramework client = null;
10
11
12 //通过静态代码块初始化
13 static{
14 init();
15 }
16
17 //初始化链接客户端
18 private static void init() {
19 RetryPolicy retryPolicy = new RetryNTimes(ZKConsts.RETRYTIME, ZKConsts.SLEEP_MS_BEWTEENR_RETRY);
20 client = CuratorFrameworkFactory.builder()
21 .connectString(ZKConsts.ZK_SERVER_PATH)
22 .sessionTimeoutMs(ZKConsts.SESSION_TIMEOUT_MS).retryPolicy(retryPolicy)
23 .namespace(ZKConsts.WORK_SPACE).build();
24 client.start();
25 }
26
27
28 //注册接口、对应服务ip及其实现类
29 public static void register(String interfaceName, URL url, Class implClass) {
30 try {
31 client.create()
32 .creatingParentsIfNeeded()
33 .withMode(CreateMode.EPHEMERAL)
34 .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
35 .forPath(getPath(interfaceName, url.toString()), implClass.getCanonicalName().getBytes());
36 } catch (Exception e) {
37 e.printStackTrace();
38 }
39 }
40
41 //hostname:port,遍历所有interface节点,把对应的url节点去掉
42 public static void remove(String url) {
43 try {
44 List<String> interfaces = client.getChildren().forPath("/");
45 for (String anInterface : interfaces) {
46 List<String> urlList = client.getChildren().forPath(getPath(anInterface));
47 for (String s : urlList) {
48 if (s.equals(url)) {
49 client.delete().forPath(getPath(anInterface, url));
50 }
51 }
52 }
53 } catch (Exception e) {
54 e.printStackTrace();
55 }
56 }
57
58 //获取具体实现类的类名,这里还可以添加一个内部缓存,不用每次都去访问,
59 public static String get(String interfaceName, URL url) {
60 String res = null;
61 try {
62 byte[] bytes = client.getData().forPath(getPath(interfaceName, url.toString()));
63 res = new String(bytes);
64 } catch (Exception e) {
65 e.printStackTrace();
66 }
67 return res;
68
69 }
70
71 //通过接口名获取具体的实现类
72 public static URL random(String interfaceName) {
73 try {
74 List<String> urlList = client.getChildren().forPath(getPath(interfaceName));
75 String[] url = urlList.get(0).split(":");
76 return new URL(url[0], Integer.valueOf(url[1]));
77 } catch (Exception e) {
78 e.printStackTrace();
79 }
80 return null;
81 }
82
83 //生成节点路径
84 private static String getPath(String... args) {
85 StringBuilder builder = new StringBuilder();
86 for (String arg : args) {
87 builder.append("/").append(arg);
88 }
89 return builder.toString();
90 }
91
92
93 public static void closeZKClient() {
94 if (client != null) {
95 client.close();
96 }
97 }
98
99
100}
101
102
心跳监听,在提供服务方使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 1package com.gdut.rpcstudy.demo.register.zk.heartbeat;
2
3import io.netty.bootstrap.Bootstrap;
4import io.netty.channel.*;
5import io.netty.channel.nio.NioEventLoopGroup;
6import io.netty.channel.socket.SocketChannel;
7import io.netty.channel.socket.nio.NioSocketChannel;
8import io.netty.handler.codec.string.StringEncoder;
9
10import java.util.Random;
11import java.util.concurrent.*;
12
13
14/**
15 * @author lulu
16 * @Date 2019/11/18 23:30
17 */
18public class BeatDataSender {
19 private BeatDataSender() {
20
21 }
22 public static void send(String url, String hostName, Integer port) {
23 EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
24 ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
25 try {
26 Bootstrap bootstrap = new Bootstrap();
27 ChannelFuture connect = bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
28 .handler(new ChannelInitializer<SocketChannel>() {
29 @Override
30 protected void initChannel(SocketChannel socketChannel) throws Exception {
31 socketChannel.pipeline().addLast(new StringEncoder())
32 .addLast(new StringEncoder())
33 .addLast(new ChannelInboundHandlerAdapter(){
34 @Override
35 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
36 System.out.println("由于不活跃次数在5分钟内超过2次,链接被关闭");
37 }
38 });
39
40 }
41 })
42 .connect(hostName, port).sync();
43 System.out.println("心跳客户端绑定" + "hostname:" + hostName + "port:" + port);
44 //这里只是演示心跳机制不活跃的情况下重连,普通的做法只需要定时发送本机地址即可
45 service.scheduleAtFixedRate(() -> {
46 if (connect.channel().isActive()) {
47 int time = new Random().nextInt(5);
48 System.out.println(time);
49 if(time >3){
50 System.out.println("发送本机地址:" + url);
51 connect.channel().writeAndFlush(url);
52 }
53 }
54 }, 60, 60, TimeUnit.SECONDS);
55
56 } catch (Exception e) {
57 e.printStackTrace();
58 }
59 }
60
61
62
63}
64
65
注册中心检查心跳
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 1/**
2 * @author lulu
3 * @Date 2019/11/18 22:17
4 * 注册中心心跳检查服务器,通过查看心跳来查看各server是否存活
5 */
6public class ZkServer {
7
8 public static void main(String[] args) {
9
10 NioEventLoopGroup boss = new NioEventLoopGroup();
11 NioEventLoopGroup worker = new NioEventLoopGroup();
12 ServerBootstrap bootstrap = new ServerBootstrap();
13 ConcurrentHashMap<String,String> ChannalIdUrlMap=new ConcurrentHashMap();
14 try {
15 bootstrap.group(boss, worker)
16 .channel(NioServerSocketChannel.class)
17 //存放已完成三次握手的请求的队列的最大长度
18 .option(ChannelOption.SO_BACKLOG, 128)
19 //启用心跳保活
20 .childOption(ChannelOption.SO_KEEPALIVE, true)
21 .childHandler(new ChannelInitializer<SocketChannel>() {
22 @Override
23 protected void initChannel(SocketChannel ch) throws Exception {
24 //string编码器
25 ch.pipeline().addLast(new StringEncoder())
26 //string解码器
27 .addLast(new StringDecoder())
28 //监听链接空闲时间
29 .addLast(new IdleStateHandler(0,0,60))
30 //hearbeat处理器
31 .addLast(new HeartbeatHandler(ChannalIdUrlMap));
32 }
33 });
34 //bind初始化端口是异步的,但调用sync则会同步阻塞等待端口绑定成功
35 ChannelFuture future = bootstrap.bind("127.0.0.1",8888).sync();
36 future.channel().closeFuture().sync();
37 } catch (Exception e) {
38 e.printStackTrace();
39 }finally {
40 boss.shutdownGracefully();
41 worker.shutdownGracefully();
42 }
43 }
44
45}
46
在nettyserver的start方法添加发送心跳
1
2
3 1 BeatDataSender.send(hostName + ":" + port, "127.0.0.1", 8888);
2
3
心跳监听处理逻辑,用于监听心跳服务器的处理,需要和监听链接空闲时间的IdleHandler一起使用,复写eventTriger方法当链接符合给的空闲条件时,对其进行逻辑处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 1
2
3/**
4 * @author lulu
5 * @Date 2019/11/18 22:29
6 */
7public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
8
9 //维护channelId和具体地址的map,当发生变化时对其进行删除
10 private static ConcurrentHashMap<String, String> channelUrlMap;
11
12//活跃次数
13 private int inActiveCount = 0;
14//开始计数时间
15 private long start;
16
17
18 public HeartbeatHandler(ConcurrentHashMap<String, String> map) {
19 HeartbeatHandler.channelUrlMap = map;
20 }
21
22 @Override
23 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
24 String url = msg.toString();
25 String id = ctx.channel().id().asShortText();
26 System.out.println("收到channelId:" + id + "发来信息:" + url);
27 if (channelUrlMap.get(id) == null) {
28 channelUrlMap.put(id, url);
29 }
30
31
32 }
33
34 @Override
35 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
36
37 if (evt instanceof IdleStateEvent) {
38
39 IdleStateEvent state = (IdleStateEvent) evt;
40 if (state.state().equals(IdleState.READER_IDLE)) {
41 System.out.println("读空闲");
42 } else if (state.state().equals(IdleState.WRITER_IDLE)) {
43 System.out.println("写空闲");
44 }
45 //在一定时间内读写空闲才会关闭链接
46 else if (state.state().equals(IdleState.ALL_IDLE)) {
47 if (++inActiveCount == 1) {
48 start = System.currentTimeMillis();
49 }
50 int minute = (int) ((System.currentTimeMillis() - start) / (60 * 1000))+1;
51 System.out.printf("第%d次读写都空闲,计时分钟数%d%n", inActiveCount,minute);
52 //5分钟内出现2次以上不活跃现象,有的话就把它去掉
53 if (inActiveCount > 2 && minute <= 5) {
54 System.out.println("移除不活跃的ip");
55 removeAndClose(ctx);
56 } else {
57 //重新计算
58 if (minute >= 5) {
59 System.out.println("新周期开始");
60 start = 0;
61 inActiveCount = 0;
62 }
63 }
64
65 }
66
67 }
68 }
69
70 //通过ID获取地址,并删除zk上相关的
71 private void removeAndClose(ChannelHandlerContext ctx) {
72 String id = ctx.channel().id().asShortText();
73 String url = channelUrlMap.get(id);
74 //移除不活跃的节点
75 ZkRegister.remove(url);
76 channelUrlMap.remove(id);
77 ctx.channel().close();
78 }
79
80 //当出现异常时关闭链接
81 @Override
82 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
83 removeAndClose(ctx);
84 }
85
86
87 @Override
88 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
89 System.out.println(ctx.channel().id().asShortText() + "注册");
90 }
91
92 @Override
93 public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
94 System.out.println(ctx.channel().id().asShortText() + "注销");
95 }
96}
97
98
由于存放的是classImplName,所以要在handler处理逻辑里加载该类,后面接入spring后可以从一个服务注册的类上获取相应的实现类
1
2
3
4 1 String serviceImplName= ZkRegister.get(invocation.getInterfaceName(),new URL(hostAddress,8080));
2 Class<?> serviceImpl = Class.forName(serviceImplName);
3 Method method=serviceImpl.getMethod(invocation.getMethodName(),invocation.getParamsTypes());
4
客户端获取url
1
2 1 URL url= ZkRegister.random(interfaceClass.getName());
2
此致更改完毕,地址为https://github.com/97lele/rpcstudy/tree/withzk,接下来是把服务端和客户端整合到spring