一个轻量级分布式RPC框架–NettyRpc

释放双眼,带上耳机,听听看~!

1、背景

最近在搜索Netty和Zookeeper方面的文章时,看到了这篇文章《轻量级分布式 RPC 框架》,作者用Zookeeper、Netty和Spring写了一个轻量级的分布式RPC框架。花了一些时间看了下他的代码,写的干净简单,写的RPC框架可以算是一个简易版的dubbo。这个RPC框架虽小,但是麻雀虽小,五脏俱全,有兴趣的可以学习一下。

本人在这个简易版的RPC上添加了如下特性:

* 服务异步调用的支持,回调函数callback的支持

* 客户端使用长连接(在多次调用共享连接)

* 服务端异步多线程处理RPC请求

项目地址:https://github.com/luxiaoxun/NettyRpc

2、简介

RPC,即 Remote Procedure Call(远程过程调用),调用远程计算机上的服务,就像调用本地服务一样。RPC可以很好的解耦系统,如WebService就是一种基于Http协议的RPC。

这个RPC整体框架如下:

一个轻量级分布式RPC框架--NettyRpc

这个RPC框架使用的一些技术所解决的问题:

服务发布与订阅:服务端使用Zookeeper注册服务地址,客户端从Zookeeper获取可用的服务地址。

通信:使用Netty作为通信框架。

Spring:使用Spring配置服务,加载Bean,扫描注解。

动态代理:客户端使用代理模式透明化服务调用。

消息编解码:使用Protostuff序列化和反序列化消息。

3、服务端发布服务

使用注解标注要发布的服务

服务注解


1
2
3
4
5
6
7
1@Target({ElementType.TYPE})
2@Retention(RetentionPolicy.RUNTIME)
3@Component
4public @interface RpcService {
5    Class<?> value();
6}
7

一个服务接口:


1
2
3
4
5
6
7
1public interface HelloService {
2
3    String hello(String name);
4
5    String hello(Person person);
6}
7

一个服务实现:使用注解标注


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1@RpcService(HelloService.class)
2public class HelloServiceImpl implements HelloService {
3
4    @Override
5    public String hello(String name) {
6        return "Hello! " + name;
7    }
8
9    @Override
10    public String hello(Person person) {
11        return "Hello! " + person.getFirstName() + " " + person.getLastName();
12    }
13}
14

服务在启动的时候扫描得到所有的服务接口及其实现:


1
2
3
4
5
6
7
8
9
10
11
1@Override
2    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
3        Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
4        if (MapUtils.isNotEmpty(serviceBeanMap)) {
5            for (Object serviceBean : serviceBeanMap.values()) {
6                String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
7                handlerMap.put(interfaceName, serviceBean);
8            }
9        }
10    }
11

在Zookeeper集群上注册服务地址:

一个轻量级分布式RPC框架--NettyRpc一个轻量级分布式RPC框架--NettyRpc


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
1public class ServiceRegistry {
2
3    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);
4
5    private CountDownLatch latch = new CountDownLatch(1);
6
7    private String registryAddress;
8
9    public ServiceRegistry(String registryAddress) {
10        this.registryAddress = registryAddress;
11    }
12
13    public void register(String data) {
14        if (data != null) {
15            ZooKeeper zk = connectServer();
16            if (zk != null) {
17                AddRootNode(zk); // Add root node if not exist
18                createNode(zk, data);
19            }
20        }
21    }
22
23    private ZooKeeper connectServer() {
24        ZooKeeper zk = null;
25        try {
26            zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
27                @Override
28                public void process(WatchedEvent event) {
29                    if (event.getState() == Event.KeeperState.SyncConnected) {
30                        latch.countDown();
31                    }
32                }
33            });
34            latch.await();
35        } catch (IOException e) {
36            LOGGER.error("", e);
37        }
38        catch (InterruptedException ex){
39            LOGGER.error("", ex);
40        }
41        return zk;
42    }
43
44    private void AddRootNode(ZooKeeper zk){
45        try {
46            Stat s = zk.exists(Constant.ZK_REGISTRY_PATH, false);
47            if (s == null) {
48                zk.create(Constant.ZK_REGISTRY_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
49            }
50        } catch (KeeperException e) {
51            LOGGER.error(e.toString());
52        } catch (InterruptedException e) {
53            LOGGER.error(e.toString());
54        }
55    }
56
57    private void createNode(ZooKeeper zk, String data) {
58        try {
59            byte[] bytes = data.getBytes();
60            String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
61            LOGGER.debug("create zookeeper node ({} => {})", path, data);
62        } catch (KeeperException e) {
63            LOGGER.error("", e);
64        }
65        catch (InterruptedException ex){
66            LOGGER.error("", ex);
67        }
68    }
69}
70

ServiceRegistry

这里在原文的基础上加了AddRootNode()判断服务父节点是否存在,如果不存在则添加一个PERSISTENT的服务父节点,这样虽然启动服务时多了点判断,但是不需要手动命令添加服务父节点了。

关于Zookeeper的使用原理,可以看这里《ZooKeeper基本原理》。

4、客户端调用服务

使用代理模式调用服务:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
1public class RpcProxy {
2
3    private String serverAddress;
4    private ServiceDiscovery serviceDiscovery;
5
6    public RpcProxy(String serverAddress) {
7        this.serverAddress = serverAddress;
8    }
9
10    public RpcProxy(ServiceDiscovery serviceDiscovery) {
11        this.serviceDiscovery = serviceDiscovery;
12    }
13
14    @SuppressWarnings("unchecked")
15    public <T> T create(Class<?> interfaceClass) {
16        return (T) Proxy.newProxyInstance(
17                interfaceClass.getClassLoader(),
18                new Class<?>[]{interfaceClass},
19                new InvocationHandler() {
20                    @Override
21                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
22                        RpcRequest request = new RpcRequest();
23                        request.setRequestId(UUID.randomUUID().toString());
24                        request.setClassName(method.getDeclaringClass().getName());
25                        request.setMethodName(method.getName());
26                        request.setParameterTypes(method.getParameterTypes());
27                        request.setParameters(args);
28
29                        if (serviceDiscovery != null) {
30                            serverAddress = serviceDiscovery.discover();
31                        }
32                        if(serverAddress != null){
33                            String[] array = serverAddress.split(":");
34                            String host = array[0];
35                            int port = Integer.parseInt(array[1]);
36
37                            RpcClient client = new RpcClient(host, port);
38                            RpcResponse response = client.send(request);
39
40                            if (response.isError()) {
41                                throw new RuntimeException("Response error.",new Throwable(response.getError()));
42                            } else {
43                                return response.getResult();
44                            }
45                        }
46                        else{
47                            throw new RuntimeException("No server address found!");
48                        }
49                    }
50                }
51        );
52    }
53}
54

这里每次使用代理远程调用服务,从Zookeeper上获取可用的服务地址,通过RpcClient send一个Request,等待该Request的Response返回。这里原文有个比较严重的bug,在原文给出的简单的Test中是很难测出来的,原文使用了obj的wait和notifyAll来等待Response返回,会出现“假死等待”的情况:一个Request发送出去后,在obj.wait()调用之前可能Response就返回了,这时候在channelRead0里已经拿到了Response并且obj.notifyAll()已经在obj.wait()之前调用了,这时候send后再obj.wait()就出现了假死等待,客户端就一直等待在这里。使用CountDownLatch可以解决这个问题。

注意:这里每次调用的send时候才去和服务端建立连接,使用的是短连接,这种短连接在高并发时会有连接数问题,也会影响性能。

从Zookeeper上获取服务地址:

一个轻量级分布式RPC框架--NettyRpc一个轻量级分布式RPC框架--NettyRpc


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
1public class ServiceDiscovery {
2
3    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
4
5    private CountDownLatch latch = new CountDownLatch(1);
6
7    private volatile List<String> dataList = new ArrayList<>();
8
9    private String registryAddress;
10
11    public ServiceDiscovery(String registryAddress) {
12        this.registryAddress = registryAddress;
13        ZooKeeper zk = connectServer();
14        if (zk != null) {
15            watchNode(zk);
16        }
17    }
18
19    public String discover() {
20        String data = null;
21        int size = dataList.size();
22        if (size > 0) {
23            if (size == 1) {
24                data = dataList.get(0);
25                LOGGER.debug("using only data: {}", data);
26            } else {
27                data = dataList.get(ThreadLocalRandom.current().nextInt(size));
28                LOGGER.debug("using random data: {}", data);
29            }
30        }
31        return data;
32    }
33
34    private ZooKeeper connectServer() {
35        ZooKeeper zk = null;
36        try {
37            zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
38                @Override
39                public void process(WatchedEvent event) {
40                    if (event.getState() == Event.KeeperState.SyncConnected) {
41                        latch.countDown();
42                    }
43                }
44            });
45            latch.await();
46        } catch (IOException | InterruptedException e) {
47            LOGGER.error("", e);
48        }
49        return zk;
50    }
51
52    private void watchNode(final ZooKeeper zk) {
53        try {
54            List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
55                @Override
56                public void process(WatchedEvent event) {
57                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
58                        watchNode(zk);
59                    }
60                }
61            });
62            List<String> dataList = new ArrayList<>();
63            for (String node : nodeList) {
64                byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
65                dataList.add(new String(bytes));
66            }
67            LOGGER.debug("node data: {}", dataList);
68            this.dataList = dataList;
69        } catch (KeeperException | InterruptedException e) {
70            LOGGER.error("", e);
71        }
72    }
73}
74

ServiceDiscovery

每次服务地址节点发生变化,都需要再次watchNode,获取新的服务地址列表。

5、消息编码

请求消息:

一个轻量级分布式RPC框架--NettyRpc一个轻量级分布式RPC框架--NettyRpc


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1public class RpcRequest {
2
3    private String requestId;
4    private String className;
5    private String methodName;
6    private Class<?>[] parameterTypes;
7    private Object[] parameters;
8
9    public String getRequestId() {
10        return requestId;
11    }
12
13    public void setRequestId(String requestId) {
14        this.requestId = requestId;
15    }
16
17    public String getClassName() {
18        return className;
19    }
20
21    public void setClassName(String className) {
22        this.className = className;
23    }
24
25    public String getMethodName() {
26        return methodName;
27    }
28
29    public void setMethodName(String methodName) {
30        this.methodName = methodName;
31    }
32
33    public Class<?>[] getParameterTypes() {
34        return parameterTypes;
35    }
36
37    public void setParameterTypes(Class<?>[] parameterTypes) {
38        this.parameterTypes = parameterTypes;
39    }
40
41    public Object[] getParameters() {
42        return parameters;
43    }
44
45    public void setParameters(Object[] parameters) {
46        this.parameters = parameters;
47    }
48}
49

RpcRequest

响应消息:

一个轻量级分布式RPC框架--NettyRpc一个轻量级分布式RPC框架--NettyRpc


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1public class RpcResponse {
2
3    private String requestId;
4    private String error;
5    private Object result;
6
7    public boolean isError() {
8        return error != null;
9    }
10
11    public String getRequestId() {
12        return requestId;
13    }
14
15    public void setRequestId(String requestId) {
16        this.requestId = requestId;
17    }
18
19    public String getError() {
20        return error;
21    }
22
23    public void setError(String error) {
24        this.error = error;
25    }
26
27    public Object getResult() {
28        return result;
29    }
30
31    public void setResult(Object result) {
32        this.result = result;
33    }
34}
35

RpcResponse

消息序列化和反序列化工具:(基于 Protostuff 实现)

一个轻量级分布式RPC框架--NettyRpc一个轻量级分布式RPC框架--NettyRpc


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
1public class SerializationUtil {
2
3    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
4
5    private static Objenesis objenesis = new ObjenesisStd(true);
6
7    private SerializationUtil() {
8    }
9
10    @SuppressWarnings("unchecked")
11    private static <T> Schema<T> getSchema(Class<T> cls) {
12        Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
13        if (schema == null) {
14            schema = RuntimeSchema.createFrom(cls);
15            if (schema != null) {
16                cachedSchema.put(cls, schema);
17            }
18        }
19        return schema;
20    }
21
22    /**
23     * 序列化(对象 -> 字节数组)
24     */
25    @SuppressWarnings("unchecked")
26    public static <T> byte[] serialize(T obj) {
27        Class<T> cls = (Class<T>) obj.getClass();
28        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
29        try {
30            Schema<T> schema = getSchema(cls);
31            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
32        } catch (Exception e) {
33            throw new IllegalStateException(e.getMessage(), e);
34        } finally {
35            buffer.clear();
36        }
37    }
38
39    /**
40     * 反序列化(字节数组 -> 对象)
41     */
42    public static <T> T deserialize(byte[] data, Class<T> cls) {
43        try {
44            T message = (T) objenesis.newInstance(cls);
45            Schema<T> schema = getSchema(cls);
46            ProtostuffIOUtil.mergeFrom(data, message, schema);
47            return message;
48        } catch (Exception e) {
49            throw new IllegalStateException(e.getMessage(), e);
50        }
51    }
52}
53

SerializationUtil

由于处理的是TCP消息,本人加了TCP的粘包处理Handler


1
2
1channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,0))
2

消息编解码时开始4个字节表示消息的长度,也就是消息编码的时候,先写消息的长度,再写消息。

6、性能改进

1)服务端请求异步处理

Netty本身就是一个高性能的网络框架,从网络IO方面来说并没有太大的问题。

从这个RPC框架本身来说,在原文的基础上把Server端处理请求的过程改成了多线程异步:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1 public void channelRead0(final ChannelHandlerContext ctx,final RpcRequest request) throws Exception {
2        RpcServer.submit(new Runnable() {
3            @Override
4            public void run() {
5                LOGGER.debug("Receive request " + request.getRequestId());
6                RpcResponse response = new RpcResponse();
7                response.setRequestId(request.getRequestId());
8                try {
9                    Object result = handle(request);
10                    response.setResult(result);
11                } catch (Throwable t) {
12                    response.setError(t.toString());
13                    LOGGER.error("RPC Server handle request error",t);
14                }
15                ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE).addListener(new ChannelFutureListener() {
16                    @Override
17                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
18                        LOGGER.debug("Send response for request " + request.getRequestId());
19                    }
20                });
21            }
22        });
23    }
24

Netty 4中的Handler处理在IO线程中,如果Handler处理中有耗时的操作(如数据库相关),会让IO线程等待,影响性能。

2)服务端长连接的管理

 客户端保持和服务进行长连接,不需要每次调用服务的时候进行连接,长连接的管理(通过Zookeeper获取有效的地址)。

通过监听Zookeeper服务节点值的变化,动态更新客户端和服务端保持的长连接。这个事情现在放在客户端在做,客户端保持了和所有可用服务的长连接,给客户端和服务端都造成了压力,需要解耦这个实现。

3)客户端请求异步处理

客户端请求异步处理的支持,不需要同步等待:发送一个异步请求,返回Feature,通过Feature的callback机制获取结果。


1
2
3
4
1IAsyncObjectProxy client = rpcClient.createAsync(HelloService.class);
2RPCFuture helloFuture = client.call("hello", Integer.toString(i));
3String result = (String) helloFuture.get(3000, TimeUnit.MILLISECONDS);
4

 

个人觉得该RPC的待改进项:

* 编码序列化的多协议支持。

 

项目持续更新中。

项目地址:https://github.com/luxiaoxun/NettyRpc

 

参考:

轻量级分布式 RPC 框架:http://my.oschina.net/huangyong/blog/361751

你应该知道的
RPC原理:http://www.cnblogs.com/LBSer/p/4853234.html

 

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全网络

Netty源码分析第8章(高性能工具类FastThreadLocal和Recycler)---->第6节: 异线程回收对象...

2021-8-18 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索