基于netty、zookeeper手写RPC框架之二——接入zookeeper作为注册中心,添加心跳机制

释放双眼,带上耳机,听听看~!

zookeeper

介绍

分布式是指多台不同的服务器中部署不同的服务模块,通过远程调用协同工作,对外提供服务,由于远程调用会出现网络故障等问题,如何保持数据一致性和可用性则成为要解决的问题。而zookeeper是一个分布式服务协调组件,是一个高性能的分布式数据一致性的解决方案。

特性

一致性:数据一致性,数据按照顺序分批入库。

原子性:事务要么成功要么失败,不会局部化

单一视图:客户端连接集群中的任一zk节点,数据都是一致的

可靠性:每次对zk的操作状态都会保存在服务端

实时性:客户端可以读取到zk服务端的最新数据

安装及简单运行

去官网下载压缩包到本地解压,然后把zoo_sample.cfg改为zoo.cfg并按实际情况填写配置文件,在bin目录下运行启动脚本

zookeeper主要目录结构

bin:主要的一些运行命令

conf:存放配置文件

contrib:附加的一些功能

dist-maven:mvn编译后的目录

docs:文档

lib:需要依赖的jar包

recipes:案例demo代码

src:源码

zoo.cfg配置

tickTime:用于计算的时间单位。比如session超时:N*tickTime

initLimit:用于集群,允许从节点连接并同步到master节点的初始化连接时间,以tickTime的倍数表示

syncLimit:用于集群,master主节点与从节点之间发送消息,请求和应答时间长度。(心跳机制)

dataDir:必须配置,数据目录

dataLogDir:日志目录,如果不配置会和dataDir同一个公用目录

clientPort:连接服务器的端口,默认2181

zookeeper基本数据模型

是一个树形结构,类似linux的文件目录结构

每个节点称为znode,它可以有子节点,也可以有数据

节点分为临时节点和永久节点,临时节点在客户端断开后失效

每个zk节点都有各自的版本号,可以通过命令行来显示节点信息

每当节点数据发生变化,那么该节点的版本会累加(乐观锁)

删除/修改过时节点,版本号不匹配会报错

每个zk节点存储的数据不宜过大,几k即可

节点可以设置权限acl,可以通过权限来限制用户的访问、区分环境等等,acl这里就不做介绍了

master节点选举,主节点挂了以后,从节点就会接受工作,并且保证这个节点是唯一的,这也是所谓首脑模式,从而保证我们的集群是高可用的

常见运用

统一配置文件管理,即只需要部署一台服务器,则可以把相同的配置文件同步更新到其他所有服务器,此操作在云计算中用的特别多。

发布与订阅,类似于消息队列,dubbo发布者把数据存在znode上,订阅者读取这个数据

提供分布式锁,分布式环境中不同进程之间争夺资源,类似于多线程中的锁

集群管理,保证集群中数据的强一致性

命令

ls 子节点

ls2 ls+stat

stat 状态信息

czxid:节点被创建的事务ID

ctime: 节点创建时间

mzxid: 最后一次被更新的事务ID

mtime: 节点修改时间

pzxid:子节点列表最后一次被更新的事务ID

cversion:子节点的版本号

dataversion:数据版本号

aclversion:权限版本号

ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0

dataLength:节点存储的数据的长度 numChildren:当前节点的子节点个数

get 数据+stat

session基本原理

客户端与服务端之间的链接存在会话

每个会话都会可以设置一个超时时间

心跳结束,session则过期

session过期,则临时节点znode会被抛弃

心跳机制:客户端向服务端的ping包请求

create:创建 -e临时节点 -s 有序节点

set:修改

delete:删除

watcher机制:

针对每个节点的操作,都有要给监督者->watcher

当监控的某个对象(znode)发生了变换,则触发了watcher事件

zk中的watcher是一次性的,触发后立即销毁

父节点,子节点 增删改都能够触发其watcher

针对不同类型的操作,触发的watcher事件也不同:

节点创建、删除、数据变化事件

注意watcher只可以使用一次,stat、get、ls、ls2后面均可以加watcher进行监听。ls为父节点设置watcher,创建、删除子节点触发:NodeChildrenChanged,修改不会触发

关于zookeeper更详细介绍可以看看这篇文章https://www.cnblogs.com/luxiaoxun/p/4887452.html

下面开始编码的环节

这里的格式也是像前一篇文章一样 {interface:{url:impl}},而在zookeeper上面的格式为 /interface/url:data(具体实现类的名字),

而心跳机制的实现,是通过服务提供者定时向注册中心发送本机地址(心跳数据包),而注册中心的监控则维持一个channelId和具体地址的map,并且通过IdleHandler监听空闲事件,到达一定的空闲次数则认为不活跃,当不活跃时(这里的不活跃条件是5分钟内3次以上没有发送心跳包),zookeeper删除相应的url节点,但后续的逻辑没有继续做,比如:服务提供方在网络稳定后尝试重新发送心跳包,注册中心通过一定的计算(比如在一定时间内的心跳发送率达到一定的值)认为该ip可用了,就尝试重新向zookeeper注册该ip,而且也可以在本地维持一个map存放接口信息,并添加监听事件去更新可用列表,可以优化的点还很多,这里暂时先接入zookeeper并简单地演示通过心跳来移除不稳定服务

这里采用curator作为zookeeper的客户端

首先编写关于zookeeper的业务操作


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
1
2/**
3 * @author lulu
4 * @Date 2019/11/18 21:17
5 * 负责实现注册中心具体的业务功能
6 */
7public class ZkRegister {
8    //{接口:{URL:实现类名}},这里可以为每个接口建立子节点,节点名为url地址,值为className
9    private static CuratorFramework client = null;
10
11
12    //通过静态代码块初始化
13    static{
14        init();
15    }
16
17    //初始化链接客户端
18    private static void init() {
19        RetryPolicy retryPolicy = new RetryNTimes(ZKConsts.RETRYTIME, ZKConsts.SLEEP_MS_BEWTEENR_RETRY);
20        client = CuratorFrameworkFactory.builder()
21                .connectString(ZKConsts.ZK_SERVER_PATH)
22                .sessionTimeoutMs(ZKConsts.SESSION_TIMEOUT_MS).retryPolicy(retryPolicy)
23                .namespace(ZKConsts.WORK_SPACE).build();
24        client.start();
25    }
26
27
28    //注册接口、对应服务ip及其实现类
29    public static void register(String interfaceName, URL url, Class implClass) {
30        try {
31            client.create()
32                    .creatingParentsIfNeeded()
33                    .withMode(CreateMode.EPHEMERAL)
34                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
35                    .forPath(getPath(interfaceName, url.toString()), implClass.getCanonicalName().getBytes());
36        } catch (Exception e) {
37            e.printStackTrace();
38        }
39    }
40
41    //hostname:port,遍历所有interface节点,把对应的url节点去掉
42    public static void remove(String url) {
43        try {
44            List<String> interfaces = client.getChildren().forPath("/");
45            for (String anInterface : interfaces) {
46                List<String> urlList = client.getChildren().forPath(getPath(anInterface));
47                for (String s : urlList) {
48                    if (s.equals(url)) {
49                        client.delete().forPath(getPath(anInterface, url));
50                    }
51                }
52            }
53        } catch (Exception e) {
54            e.printStackTrace();
55        }
56    }
57
58    //获取具体实现类的类名,这里还可以添加一个内部缓存,不用每次都去访问,
59    public static String get(String interfaceName, URL url) {
60        String res = null;
61        try {
62            byte[] bytes = client.getData().forPath(getPath(interfaceName, url.toString()));
63            res = new String(bytes);
64        } catch (Exception e) {
65            e.printStackTrace();
66        }
67        return res;
68
69    }
70
71    //通过接口名获取具体的实现类
72    public static URL random(String interfaceName) {
73        try {
74            List<String> urlList = client.getChildren().forPath(getPath(interfaceName));
75            String[] url = urlList.get(0).split(":");
76            return new URL(url[0], Integer.valueOf(url[1]));
77        } catch (Exception e) {
78            e.printStackTrace();
79        }
80        return null;
81    }
82
83    //生成节点路径
84    private static String getPath(String... args) {
85        StringBuilder builder = new StringBuilder();
86        for (String arg : args) {
87            builder.append("/").append(arg);
88        }
89        return builder.toString();
90    }
91
92
93    public static void closeZKClient() {
94        if (client != null) {
95            client.close();
96        }
97    }
98
99
100}
101
102

心跳监听,在提供服务方使用


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
1package com.gdut.rpcstudy.demo.register.zk.heartbeat;
2
3import io.netty.bootstrap.Bootstrap;
4import io.netty.channel.*;
5import io.netty.channel.nio.NioEventLoopGroup;
6import io.netty.channel.socket.SocketChannel;
7import io.netty.channel.socket.nio.NioSocketChannel;
8import io.netty.handler.codec.string.StringEncoder;
9
10import java.util.Random;
11import java.util.concurrent.*;
12
13
14/**
15 * @author lulu
16 * @Date 2019/11/18 23:30
17 */
18public class BeatDataSender {
19    private BeatDataSender() {
20
21    }
22    public static void send(String url, String hostName, Integer port) {
23        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
24        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
25        try {
26            Bootstrap bootstrap = new Bootstrap();
27            ChannelFuture connect = bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
28                    .handler(new ChannelInitializer<SocketChannel>() {
29                        @Override
30                        protected void initChannel(SocketChannel socketChannel) throws Exception {
31                            socketChannel.pipeline().addLast(new StringEncoder())
32                                    .addLast(new StringEncoder())
33                                    .addLast(new ChannelInboundHandlerAdapter(){
34                                        @Override
35                                        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
36                                            System.out.println("由于不活跃次数在5分钟内超过2次,链接被关闭");
37                                        }
38                                    });
39
40                        }
41                    })
42                    .connect(hostName, port).sync();
43            System.out.println("心跳客户端绑定" + "hostname:" + hostName + "port:" + port);
44            //这里只是演示心跳机制不活跃的情况下重连,普通的做法只需要定时发送本机地址即可
45            service.scheduleAtFixedRate(() -> {
46                if (connect.channel().isActive()) {
47                    int time = new Random().nextInt(5);
48                    System.out.println(time);
49                    if(time >3){
50                        System.out.println("发送本机地址:" + url);
51                        connect.channel().writeAndFlush(url);
52                    }
53                }
54            }, 60, 60, TimeUnit.SECONDS);
55
56        } catch (Exception e) {
57            e.printStackTrace();
58        }
59    }
60
61
62
63}
64
65

注册中心检查心跳


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
1/**
2 * @author lulu
3 * @Date 2019/11/18 22:17
4 * 注册中心心跳检查服务器,通过查看心跳来查看各server是否存活
5 */
6public class ZkServer {
7
8    public static void main(String[] args) {
9
10        NioEventLoopGroup boss = new NioEventLoopGroup();
11        NioEventLoopGroup worker = new NioEventLoopGroup();
12        ServerBootstrap bootstrap = new ServerBootstrap();
13        ConcurrentHashMap<String,String> ChannalIdUrlMap=new ConcurrentHashMap();
14        try {
15            bootstrap.group(boss, worker)
16                    .channel(NioServerSocketChannel.class)
17                    //存放已完成三次握手的请求的队列的最大长度
18                    .option(ChannelOption.SO_BACKLOG, 128)
19                    //启用心跳保活
20                    .childOption(ChannelOption.SO_KEEPALIVE, true)
21                    .childHandler(new ChannelInitializer<SocketChannel>() {
22                        @Override
23                        protected void initChannel(SocketChannel ch) throws Exception {
24                            //string编码器
25                            ch.pipeline().addLast(new StringEncoder())
26                            //string解码器
27                            .addLast(new StringDecoder())
28                                    //监听链接空闲时间
29                            .addLast(new IdleStateHandler(0,0,60))
30                           //hearbeat处理器
31                            .addLast(new HeartbeatHandler(ChannalIdUrlMap));
32                        }
33                    });
34            //bind初始化端口是异步的,但调用sync则会同步阻塞等待端口绑定成功
35            ChannelFuture future = bootstrap.bind("127.0.0.1",8888).sync();
36            future.channel().closeFuture().sync();
37        } catch (Exception e) {
38            e.printStackTrace();
39        }finally {
40            boss.shutdownGracefully();
41            worker.shutdownGracefully();
42        }
43    }
44
45}
46

在nettyserver的start方法添加发送心跳


1
2
3
1  BeatDataSender.send(hostName + ":" + port, "127.0.0.1", 8888);
2
3

心跳监听处理逻辑,用于监听心跳服务器的处理,需要和监听链接空闲时间的IdleHandler一起使用,复写eventTriger方法当链接符合给的空闲条件时,对其进行逻辑处理


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
1
2
3/**
4 * @author lulu
5 * @Date 2019/11/18 22:29
6 */
7public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
8
9    //维护channelId和具体地址的map,当发生变化时对其进行删除
10    private static ConcurrentHashMap<String, String> channelUrlMap;
11
12//活跃次数
13    private int inActiveCount = 0;
14//开始计数时间
15    private long start;
16
17
18    public HeartbeatHandler(ConcurrentHashMap<String, String> map) {
19       HeartbeatHandler.channelUrlMap = map;
20    }
21
22    @Override
23    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
24        String url = msg.toString();
25        String id = ctx.channel().id().asShortText();
26        System.out.println("收到channelId:" + id + "发来信息:" + url);
27        if (channelUrlMap.get(id) == null) {
28            channelUrlMap.put(id, url);
29        }
30
31
32    }
33
34    @Override
35    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
36
37        if (evt instanceof IdleStateEvent) {
38
39            IdleStateEvent state = (IdleStateEvent) evt;
40            if (state.state().equals(IdleState.READER_IDLE)) {
41                System.out.println("读空闲");
42            } else if (state.state().equals(IdleState.WRITER_IDLE)) {
43                System.out.println("写空闲");
44            }
45            //在一定时间内读写空闲才会关闭链接
46            else if (state.state().equals(IdleState.ALL_IDLE)) {
47                if (++inActiveCount == 1) {
48                    start = System.currentTimeMillis();
49                }
50                int minute = (int) ((System.currentTimeMillis() - start) / (60 * 1000))+1;
51                System.out.printf("第%d次读写都空闲,计时分钟数%d%n", inActiveCount,minute);
52                //5分钟内出现2次以上不活跃现象,有的话就把它去掉
53                if (inActiveCount > 2 && minute <= 5) {
54                    System.out.println("移除不活跃的ip");
55                    removeAndClose(ctx);
56                } else {
57                    //重新计算
58                    if (minute >= 5) {
59                        System.out.println("新周期开始");
60                        start = 0;
61                        inActiveCount = 0;
62                    }
63                }
64
65            }
66
67        }
68    }
69
70    //通过ID获取地址,并删除zk上相关的
71    private void removeAndClose(ChannelHandlerContext ctx) {
72        String id = ctx.channel().id().asShortText();
73        String url = channelUrlMap.get(id);
74        //移除不活跃的节点
75        ZkRegister.remove(url);
76        channelUrlMap.remove(id);
77        ctx.channel().close();
78    }
79
80    //当出现异常时关闭链接
81    @Override
82    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
83        removeAndClose(ctx);
84    }
85
86
87    @Override
88    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
89        System.out.println(ctx.channel().id().asShortText() + "注册");
90    }
91
92    @Override
93    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
94        System.out.println(ctx.channel().id().asShortText() + "注销");
95    }
96}
97
98

由于存放的是classImplName,所以要在handler处理逻辑里加载该类,后面接入spring后可以从一个服务注册的类上获取相应的实现类


1
2
3
4
1  String serviceImplName= ZkRegister.get(invocation.getInterfaceName(),new URL(hostAddress,8080));
2        Class<?> serviceImpl = Class.forName(serviceImplName);
3        Method method=serviceImpl.getMethod(invocation.getMethodName(),invocation.getParamsTypes());
4

客户端获取url


1
2
1  URL url= ZkRegister.random(interfaceClass.getName());
2

此致更改完毕,地址为https://github.com/97lele/rpcstudy/tree/withzk,接下来是把服务端和客户端整合到spring

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense 技巧提示100条

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索