【RPC】一步一步实现基于netty+zookeeper的RPC框架(一)

释放双眼,带上耳机,听听看~!

随着分布式架构运用的越来越多,RPC框架成为了我们不得不掌握的知识,这里一步一步来手写一个简单的RPC框架,以博文作为记录及自我监督。

首先是技术选型,这边我选用的是当前比较流行的Netty+Zookeeper来实现,通过zookeeper的特性来实现服务注册与发现,通信则使用netty框架。

这里贴出github代码地址,想直接看代码的可以直接下载运行:https://github.com/whiteBX/wrpc

这里先来讲服务注册发现原理: 利用zookeeper的创建临时节点和watcher机制,可以做到在一个服务下注册多个服务器地址,并且在节点发生变动时通过watcher动态更新服务器列表,来达到在新增/修改/删除时自动注册发现/删除/更新服务器连接信息.这里说一点,zookeeper的增删改操作会交由leader去处理,所以这里不用担心并发问题.

zookeeper相关代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
1public class ZKClient {
2
3    /**
4     * 获取zookeeper连接
5     *
6     * @param connectString
7     * @param sessionTimeout
8     * @return
9     */
10    public ZooKeeper newConnection(String connectString, int sessionTimeout) {
11        ZooKeeper zooKeeper = null;
12        try {
13            final CountDownLatch countDownLatch = new CountDownLatch(1);
14            zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
15                public void process(WatchedEvent watchedEvent) {
16                    if (Event.KeeperState.SyncConnected.equals(watchedEvent.getState())) {
17                        countDownLatch.countDown();
18                    }
19                }
20            });
21            countDownLatch.await();
22        } catch (IOException e) {
23            System.out.println("获取zookeeper连接失败:连接不上zookeeper" + e.getMessage());
24        } catch (InterruptedException e) {
25            System.out.println("获取zookeeper连接失败:本地线程原因" + e.getMessage());
26        }
27        System.out.println("zookeeper连接成功");
28        return zooKeeper;
29    }
30
31    /**
32     * 创建临时节点
33     *
34     * @param zk
35     * @param appCode
36     * @param data
37     */
38    public void createEphemeralNode(ZooKeeper zk, String appCode, byte[] data) {
39        try {
40            initAppPath(zk, appCode);
41            String path = zk.create(MessageFormat.format("{0}/{1}/", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode), data,
42                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
43            System.out.println("临时节点创建成功:" + path);
44        } catch (Exception e) {
45            System.out.println("创建临时节点失败:" + e.getMessage());
46        }
47    }
48
49    /**
50     * 初始化appPath
51     *
52     * @param zk
53     * @param appCode
54     */
55    private void initAppPath(ZooKeeper zk, String appCode) {
56        initRootPath(zk);
57        try {
58            if (zk.exists(MessageFormat.format("{0}/{1}", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode), false) == null) {
59                zk.create(MessageFormat.format("{0}/{1}", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode), null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
60                        CreateMode.PERSISTENT);
61            }
62        } catch (Exception e) {
63            System.out.println("zookeeper创建跟节点失败" + e);
64        }
65    }
66
67    /**
68     * 初始化根节点
69     *
70     * @param zk
71     */
72    private void initRootPath(ZooKeeper zk) {
73        try {
74            if (zk.exists(CommonConstant.ZK_REGISTORY_ROOT_PATH, false) == null) {
75                zk.create(CommonConstant.ZK_REGISTORY_ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
76                        CreateMode.PERSISTENT);
77            }
78        } catch (Exception e) {
79            System.out.println("zookeeper创建跟节点失败" + e);
80        }
81    }
82
83}
84
85

接下来看provider相关类,提供服务:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1/**
2 * <p>
3 * 1.创建netty监听,等待客户端连接
4 * 2.连接zk,创建临时节点,记录服务器连接信息 </p >
5 *
6 * @author baixiong
7 * @version $Id: RPCProvider.java, v 0.1 2018年10月15日 17:42:00 baixiong Exp$
8 */
9public class RPCProvider {
10    /**
11     * netty客户端
12     */
13    private static NettyClient nettyClient = new NettyClient();
14    /**
15     * zookeeper客户端
16     */
17    private static ZKClient    zkClient    = new ZKClient();
18
19    public void registry(String server, int port) {
20        // 开启netty监听客户端连接
21        nettyClient.startServer(port);
22        // 创建zk连接并创建临时节点
23        ZooKeeper zooKeeper = zkClient.newConnection(ProviderConstant.ZK_CONNECTION_STRING,
24            ProviderConstant.ZK_SESSION_TIME_OUT);
25        String serverIp = server + CommonConstant.COMMOA + port;
26        zkClient.createEphemeralNode(zooKeeper, ProviderConstant.APP_CODE, serverIp.getBytes());
27    }
28}
29
30

代码中注释已经很完整了,文中用到的几个常量自己定就行,appCode作为服务的唯一标识,接下来看代码中用到的NettyClient代码


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1    /**
2     * 开启新的netty连接
3     *
4     * @param port
5     */
6    public void startServer(int port) {
7        try {
8            ServerBootstrap bootstrap = new ServerBootstrap();
9            bootstrap.group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class)
10                    .childHandler(new ChannelInitializer<SocketChannel>() {
11                        @Override
12                        protected void initChannel(SocketChannel socketChannel) {
13                            ChannelPipeline pipeline = socketChannel.pipeline();
14                            pipeline.addLast(new StringDecoder());
15                            pipeline.addLast(new StringEncoder());
16                            pipeline.addLast(new RpcServerNettyHandler());
17                        }
18                    });
19            bootstrap.bind(port).sync();
20        } catch (InterruptedException e) {
21            System.out.println("netty创建服务端channel失败:" + e.getMessage());
22        }
23    }
24
25

这里来看下RpcServerNettyHandler类,他继承自ChannelInboundHandlerAdapter,负责通信数据。


1
2
3
4
5
6
7
8
9
10
1public class RpcServerNettyHandler extends ChannelInboundHandlerAdapter {
2
3    @Override
4    public void channelRead(ChannelHandlerContext ctx, Object msg) {
5        System.out.println("服务端收到请求:" + msg);
6        ctx.writeAndFlush("success");
7    }
8}
9
10

到这里provider相关代码就完了,接下来看consumer相关代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1public class RPCConsumer {
2
3    /**
4     * url处理器
5     */
6    private UrlHolder urlHolder = new UrlHolder();
7    /**
8     * netty客户端
9     */
10    private NettyClient nettyClient = new NettyClient();
11
12    /**
13     * 远程调用
14     *
15     * @param appCode
16     * @param param
17     * @return
18     */
19    public String call(String appCode, String param) {
20        try {
21            // 从zookeeper获取服务地址
22            String serverIp = urlHolder.getUrl(appCode);
23            if (serverIp == null) {
24                System.out.println("远程调用错误:当前无服务提供者");
25                return "connect error";
26            }
27            // 连接netty,请求并接收响应
28            RpcClientNettyHandler clientHandler = new RpcClientNettyHandler();
29            clientHandler.setParam(param);
30            nettyClient.initClient(serverIp, clientHandler);
31            String result = clientHandler.process();
32            System.out.println(MessageFormat.format("调用服务器:{0},请求参数:{1},响应参数:{2}", serverIp, param, result));
33            return result;
34        } catch (Exception e) {
35            System.out.println("远程服务调用失败:" + e);
36            return "error";
37        }
38    }
39}
40
41

这里来看一下UrlHolder类,它负责连接zookeeper,并获取其中服务地址列表,随机返回其中一条:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
1public class UrlHolder {
2    /**
3     * url列表
4     */
5    private List<String> urlList  = new ArrayList<String>();
6    /**
7     * zk客户端
8     */
9    private ZKClient     zkClient = new ZKClient();
10
11    /**
12     * 获取URL
13     * @param appCode
14     * @return
15     */
16    public String getUrl(String appCode) {
17        // 初始化url
18        if (urlList.size() == 0) {
19            initUrlList(appCode);
20        }
21        // 随机返回一条,此处以后优化为负载均衡策略
22        if (urlList.size() > 0) {
23            return urlList.get(new Random(urlList.size()).nextInt());
24        } else {
25            System.out.println("目前没有服务提供者");
26            return null;
27        }
28    }
29
30    /**
31     * 初始化urlList
32     * @param appCode
33     */
34    private void initUrlList(final String appCode) {
35        try {
36            // 获取zookeeper连接
37            ZooKeeper zk = zkClient.newConnection(ConsumerConstant.ZK_CONNECTION_STRING,
38                ConsumerConstant.ZK_SESSION_TIME_OUT);
39            // 获取目录下所有子节点
40            List<String> urlNodeList = zk.getChildren(
41                MessageFormat.format("{0}/{1}", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode), new Watcher() {
42                    public void process(WatchedEvent watchedEvent) {
43                        initUrlList(appCode);
44                    }
45                });
46            if (CollectionUtils.isEmpty(urlNodeList)) {
47                return;
48            }
49            // 从子节点数据中解析出所有url
50            List<String> urlList = new ArrayList<String>();
51            for (String path : urlNodeList) {
52                byte[] url = zk.getData(path, new Watcher() {
53                    public void process(WatchedEvent watchedEvent) {
54                        initUrlList(appCode);
55                    }
56                }, null);
57                if (url != null) {
58                    urlList.add(new String(url));
59                }
60            }
61            this.urlList = urlList;
62        } catch (Exception e) {
63            System.out.println("初始化url异常" + e.getMessage());
64        }
65    }
66
67}public class UrlHolder {
68    /**
69     * url列表
70     */
71    private List<String> urlList = new ArrayList<String>();
72    /**
73     * zk客户端
74     */
75    private ZKClient zkClient = new ZKClient();
76
77    /**
78     * 获取URL
79     *
80     * @param appCode
81     * @return
82     */
83    public String getUrl(String appCode) {
84        // 初始化url
85        if (urlList.size() == 0) {
86            initUrlList(appCode);
87        }
88        // 随机返回一条,此处以后优化为负载均衡策略
89        if (urlList.size() > 0) {
90            return urlList.get(new Random().nextInt(urlList.size()));
91        } else {
92            System.out.println("目前没有服务提供者");
93            return null;
94        }
95    }
96
97    /**
98     * 初始化urlList
99     *
100     * @param appCode
101     */
102    private void initUrlList(final String appCode) {
103        try {
104            // 获取zookeeper连接
105            ZooKeeper zk = zkClient.newConnection(ConsumerConstant.ZK_CONNECTION_STRING,
106                    ConsumerConstant.ZK_SESSION_TIME_OUT);
107            // 获取目录下所有子节点
108            String appPath = MessageFormat.format("{0}/{1}", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode);
109            List<String> urlNodeList = zk.getChildren(appPath
110                    , new Watcher() {
111                        public void process(WatchedEvent watchedEvent) {
112                            initUrlList(appCode);
113                        }
114                    });
115            if (CollectionUtils.isEmpty(urlNodeList)) {
116                return;
117            }
118            // 从子节点数据中解析出所有url
119            List<String> urlList = new ArrayList<String>();
120            for (String path : urlNodeList) {
121                byte[] url = zk.getData(appPath + "/" + path, new Watcher() {
122                    public void process(WatchedEvent watchedEvent) {
123                        initUrlList(appCode);
124                    }
125                }, null);
126                if (url != null) {
127                    urlList.add(new String(url));
128                }
129            }
130            this.urlList = urlList;
131        } catch (Exception e) {
132            System.out.println("初始化url异常" + e.getMessage());
133        }
134    }
135
136}
137
138

接下来是netty的initClient方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1    /**
2     * 开始一个新的客户端连接
3     *
4     * @param server
5     */
6    public void initClient(String server, final RpcClientNettyHandler clientHandler) {
7        try {
8            String[] urlArray = server.split(CommonConstant.COMMOA);
9            String ip = urlArray[0];
10            int port = Integer.parseInt(urlArray[1]);
11            Bootstrap bootstrap = new Bootstrap();
12            bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
13                    .handler(new ChannelInitializer<SocketChannel>() {
14                        @Override
15                        protected void initChannel(SocketChannel socketChannel) {
16                            ChannelPipeline pipeline = socketChannel.pipeline();
17                            pipeline.addLast(new StringDecoder());
18                            pipeline.addLast(new StringEncoder());
19                            pipeline.addLast(clientHandler);
20                        }
21                    });
22            bootstrap.connect(ip, port).sync();
23        } catch (InterruptedException e) {
24            System.out.println("netty创建客户端channel失败:" + e);
25        }
26    }
27
28

然后是这里我们自己主要写的一个通信处理类RpcClientNettyHandler,这个类负责远程调用和处理返回的消息:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
1public interface MethodProcessor {
2
3    String process() throws Exception;
4}
5public class RpcClientNettyHandler extends ChannelInboundHandlerAdapter implements MethodProcessor {
6
7    private ChannelHandlerContext context;
8    private CountDownLatch        contextCountDownLatch = new CountDownLatch(1);
9    private CountDownLatch        countDownLatch = new CountDownLatch(1);
10    /**
11     * 入参
12     */
13    private String                param;
14    /**
15     * 响应
16     */
17    private String                response;
18
19    /**
20     * 连上时触发
21     * @param ctx
22     * @throws Exception
23     */
24    @Override
25    public void channelActive(ChannelHandlerContext ctx) throws Exception {
26        this.context = ctx;
27        contextCountDownLatch.countDown();
28    }
29
30    /**
31     * 获取服务器返回信息
32     * @param ctx
33     * @param msg
34     * @throws Exception
35     */
36    @Override
37    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
38        response = msg.toString();
39        countDownLatch.countDown();
40    }
41
42    /**
43     * 远程调用并返回结果
44     * @return
45     * @throws InterruptedException
46     */
47    public String process() throws InterruptedException {
48        contextCountDownLatch.await();
49        context.writeAndFlush(param);
50        countDownLatch.await();
51        return response;
52    }
53
54    public void setParam(String param) {
55        this.param = param;
56    }
57
58}
59
60

到这里代码实现就写完了,下面是测试:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1    // 服务端
2    public static void main(String[] args) throws InterruptedException {
3        RPCProvider provider = new RPCProvider();
4        provider.registry("127.0.0.1", 8091);
5        provider.registry("127.0.0.1", 8092);
6        provider.registry("127.0.0.1", 8093);
7        provider.registry("127.0.0.1", 8094);
8        provider.registry("127.0.0.1", 8095);
9
10        Thread.sleep(Long.MAX_VALUE);
11    }
12    // 客户端
13    public static void main(String[] args) throws InterruptedException {
14        RPCConsumer consumer = new RPCConsumer();
15        int i = 0;
16        while (true) {
17            consumer.call(ConsumerConstant.APP_CODE, "aaa" + i++);
18            Thread.sleep(2000L);
19        }
20    }
21
22

日志输出如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1// 服务端日志
2zookeeper连接成功
3临时节点创建成功:/registry/100000/0000000001
4zookeeper连接成功
5临时节点创建成功:/registry/100000/0000000002
6zookeeper连接成功
7临时节点创建成功:/registry/100000/0000000003
8zookeeper连接成功
9临时节点创建成功:/registry/100000/0000000004
10zookeeper连接成功
11临时节点创建成功:/registry/100000/0000000005
12服务端收到请求:aaa0
13服务端收到请求:aaa1
14服务端收到请求:aaa2
15服务端收到请求:aaa3
16
17// 客户端日志
18zookeeper连接成功
19调用服务器:127.0.0.1,8091,请求参数:aaa0,响应参数:success
20调用服务器:127.0.0.1,8093,请求参数:aaa1,响应参数:success
21调用服务器:127.0.0.1,8092,请求参数:aaa2,响应参数:success
22调用服务器:127.0.0.1,8094,请求参数:aaa3,响应参数:success
23调用服务器:127.0.0.1,8091,请求参数:aaa4,响应参数:success
24调用服务器:127.0.0.1,8092,请求参数:aaa5,响应参数:success
25
26

这边可以修改一下服务端里面注册服务的端口,然后再启多个服务,会发现服务启动起来之后,客户端会自动随机分配到新启动的服务请求,并成功返回。再关掉几个端口的服务,会发现不会在请求那几个端口,这就是zookeeper的watcher机制实现的功能。

到这里就实现了RPC框架的服务注册发现+基本通信功能.

这里可能跟大家平时用的RPC框架差别还挺大:我怎么把我自己的Service注册到provider里,然后客户端通过接口来调用呢?.

关于这个问题,下一篇见!

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense老手经验

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索