基于Netty和SpringBoot实现一个轻量级RPC框架-协议篇

释放双眼,带上耳机,听听看~!

前提

最近对网络编程方面比较有兴趣,在微服务实践上也用到了相对主流的RPC框架如Spring Cloud Gateway底层也切换为Reactor-Netty,像Redisson底层也是使用Netty封装通讯协议,最近调研和准备使用的SOFARpc也是基于Netty封装实现了多种协议的兼容。因此,基于Netty造一个轮子,在SpringBoot的加持下,实现一个轻量级的RPC框架。这篇博文介绍的是RPC框架协议的定义以及对应的编码解码处理的实现。

依赖引入

截止本文(2020-01-12)编写完成之时,Netty的最新版本为4.1.44.Final,而SpringBoot的最新版本为2.2.2.RELEASE,因此引入这两个版本的依赖,加上其他工具包和序列化等等的支持,pom文件的核心内容如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1    <dependencyManagement>
2        <dependencies>
3            <dependency>
4                <groupId>org.springframework.boot</groupId>
5                <artifactId>spring-boot-dependencies</artifactId>
6                <version>${spring.boot.version}</version>
7                <type>pom</type>
8                <scope>import</scope>
9            </dependency>
10        </dependencies>
11    </dependencyManagement>
12    <dependencies>
13        <dependency>
14            <groupId>org.springframework.boot</groupId>
15            <artifactId>spring-boot-starter</artifactId>
16        </dependency>
17        <dependency>
18            <groupId>io.netty</groupId>
19            <artifactId>netty-all</artifactId>
20            <version>${netty.version}</version>
21        </dependency>
22        <dependency>
23            <groupId>org.projectlombok</groupId>
24            <artifactId>lombok</artifactId>
25            <version>1.18.10</version>
26            <scope>provided</scope>
27        </dependency>
28        <dependency>
29            <groupId>com.alibaba</groupId>
30            <artifactId>fastjson</artifactId>
31            <version>1.2.61</version>
32        </dependency>
33        <dependency>
34            <groupId>com.google.guava</groupId>
35            <artifactId>guava</artifactId>
36            <version>28.1-jre</version>
37        </dependency>
38    </dependencies>
39
40

部分参数的序列化会依赖到FastJson或者Jackson,具体看偏好而定。

自定义协议的定义

为了提高协议传输的效率,需要定制一套高效的RPC协议,设计协议所需的字段和类型。

基础Packet字段

magicNumber
int
魔数,类似于Java的字节码文件的魔数是0xcafebase

version
int
版本号
预留字段,默认为1
serialNumber
java.lang.String
请求流水号
十分重要,每个请求的唯一标识
messageType
MessageType
消息类型
自定义的枚举类型,见下面的MessageType类
attachments
Map<String, String>
附件
K-V形式,类似于HTTP协议中的Header


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
1// 消息枚举类型
2@RequiredArgsConstructor
3public enum MessageType {
4
5    /**
6     * 请求
7     */
8    REQUEST((byte) 1),
9
10    /**
11     * 响应
12     */
13    RESPONSE((byte) 2),
14
15    /**
16     * PING
17     */
18    PING((byte) 3),
19
20    /**
21     * PONG
22     */
23    PONG((byte) 4),
24
25    /**
26     * NULL
27     */
28    NULL((byte) 5),
29
30    ;
31
32    @Getter
33    private final Byte type;
34
35    public static MessageType fromValue(byte value) {
36        for (MessageType type : MessageType.values()) {
37            if (type.getType() == value) {
38                return type;
39            }
40        }
41        throw new IllegalArgumentException(String.format(&quot;value = %s&quot;, value));
42    }
43}
44
45// 基础Packet
46@Data
47public abstract class BaseMessagePacket implements Serializable {
48
49    /**
50     * 魔数
51     */
52    private int magicNumber;
53
54    /**
55     * 版本号
56     */
57    private int version;
58
59    /**
60     * 流水号
61     */
62    private String serialNumber;
63
64    /**
65     * 消息类型
66     */
67    private MessageType messageType;
68
69    /**
70     * 附件 - K-V形式
71     */
72    private Map&lt;String, String&gt; attachments = new HashMap&lt;&gt;();
73
74    /**
75     * 添加附件
76     */
77    public void addAttachment(String key, String value) {
78        attachments.put(key, value);
79    }
80}
81
82

请求Packet扩展字段

interfaceName
java.lang.String
接口全类名

methodName
java.lang.String
方法名

methodArgumentSignatures
java.lang.String[]
方法参数签名字符串数组
存放方法参数类型全类名字符串数组
methodArguments
java.lang.Object[]
方法参数数组
因为未知方法参数类型,所以用Object表示


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1@EqualsAndHashCode(callSuper = true)
2@Data
3public class RequestMessagePacket extends BaseMessagePacket {
4
5    /**
6     * 接口全类名
7     */
8    private String interfaceName;
9
10    /**
11     * 方法名
12     */
13    private String methodName;
14
15    /**
16     * 方法参数签名
17     */
18    private String[] methodArgumentSignatures;
19
20    /**
21     * 方法参数
22     */
23    private Object[] methodArguments;
24}
25
26

响应Packet扩展字段

errorCode
java.lang.Long
响应码

message
java.lang.String
响应消息
如果出现异常,message就是对应的异常信息
payload
java.lang.Object
消息载荷
业务处理返回的消息载荷,定义为Object类型


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1@EqualsAndHashCode(callSuper = true)
2@Data
3public class ResponseMessagePacket extends BaseMessagePacket {
4
5    /**
6     * error code
7     */
8    private Long errorCode;
9
10    /**
11     * 消息描述
12     */
13    private String message;
14
15    /**
16     * 消息载荷
17     */
18    private Object payload;
19}
20
21

需要注意以下几点

  • 非基本类型在序列化和反序列化的时候,一定注意要先写入或者先读取序列的长度,以java.lang.String类型为例:


1
2
3
4
5
6
7
8
9
1// 序列化 - 流水号
2out.writeInt(packet.getSerialNumber().length());
3out.writeCharSequence(packet.getSerialNumber(), ProtocolConstant.UTF_8);
4
5// 反序列化 - 流水号
6int serialNumberLength = in.readInt();
7packet.setSerialNumber(in.readCharSequence(serialNumberLength, ProtocolConstant.UTF_8).toString());
8
9
  • 特殊编码的字符串在序列化的时候,要注意字符串编码的长度,例如UTF-8编码下一个中文字符占3个字节,这一点可以抽取一个工具类专门处理字符串的序列化:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1public enum ByteBufferUtils {
2
3    // 单例
4    X;
5
6    public void encodeUtf8CharSequence(ByteBuf byteBuf, CharSequence charSequence) {
7        int writerIndex = byteBuf.writerIndex();
8        byteBuf.writeInt(0);
9        int length = ByteBufUtil.writeUtf8(byteBuf, charSequence);
10        byteBuf.setInt(writerIndex, length);
11    }
12}
13
14
  • 方法参数数组的序列化和反序列化方案需要定制,笔者为了简化自定义协议,定义了方法参数签名数组,长度和方法参数数组一致,这样做方便后面编写服务端代码的时候,简化对方法参数数组进行反序列化以及宿主类目标方法的查找。注意一下Object[]的序列化和反序列化相对特殊,因为ByteBuf无法处理自定义类型的写入和读取(这个很好理解,网络编程就是面向0和1的编程):


1
2
3
4
5
1write Object --&gt; ByteBuf#writeInt() &amp;&amp; ByteBuf#writeBytes()
2
3read Object --&gt; ByteBuf#readInt() &amp;&amp; ByteBuf#readBytes() [&lt;== 这个方法返回值是ByteBuf实例]
4
5
  • 最后注意释放ByteBuf的引用,否则有可能导致内存泄漏。

自定义协议编码解码实现

自定义协议编码解码主要包括四个部分的编码解码器:

  • 请求Packet编码器:RequestMessagePacketEncoder,主要用于客户端把RequestMessagePacket实例序列化为二进制序列。
  • 请求Packet解码器:RequestMessagePacketDecoder,主要用于服务端把二进制序列反序列化为RequestMessagePacket实例。
  • 响应Packet编码器:ResponseMessagePacketEncoder,主要用于服务端把ResponseMessagePacket实例序列化为二进制序列。
  • 响应Packet解码器:ResponseMessagePacketDecoder,主要用于客户端把二进制序列反序列化为ResponseMessagePacket实例。

画个图描述一下几个组件的交互流程(省略了部分入站和出站处理器):

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qv5Z49uJ-1578841000258)(https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202001/n-s-b-c-p-1.png)]

序列化器Serializer的代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1public interface Serializer {
2
3    byte[] encode(Object target);
4
5    Object decode(byte[] bytes, Class&lt;?&gt; targetClass);
6}
7
8// FastJson实现
9public enum FastJsonSerializer implements Serializer {
10
11    // 单例
12    X;
13
14    @Override
15    public byte[] encode(Object target) {
16        return JSON.toJSONBytes(target);
17    }
18
19    @Override
20    public Object decode(byte[] bytes, Class&lt;?&gt; targetClass) {
21        return JSON.parseObject(bytes, targetClass);
22    }
23}
24
25

请求Packet编码器RequestMessagePacketEncoder的代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
1@RequiredArgsConstructor
2public class RequestMessagePacketEncoder extends MessageToByteEncoder&lt;RequestMessagePacket&gt; {
3
4    private final Serializer serializer;
5
6    @Override
7    protected void encode(ChannelHandlerContext context, RequestMessagePacket packet, ByteBuf out) throws Exception {
8        // 魔数
9        out.writeInt(packet.getMagicNumber());
10        // 版本
11        out.writeInt(packet.getVersion());
12        // 流水号
13        out.writeInt(packet.getSerialNumber().length());
14        out.writeCharSequence(packet.getSerialNumber(), ProtocolConstant.UTF_8);
15        // 消息类型
16        out.writeByte(packet.getMessageType().getType());
17        // 附件size
18        Map&lt;String, String&gt; attachments = packet.getAttachments();
19        out.writeInt(attachments.size());
20        // 附件内容
21        attachments.forEach((k, v) -&gt; {
22            out.writeInt(k.length());
23            out.writeCharSequence(k, ProtocolConstant.UTF_8);
24            out.writeInt(v.length());
25            out.writeCharSequence(v, ProtocolConstant.UTF_8);
26        });
27        // 接口全类名
28        out.writeInt(packet.getInterfaceName().length());
29        out.writeCharSequence(packet.getInterfaceName(), ProtocolConstant.UTF_8);
30        // 方法名
31        out.writeInt(packet.getMethodName().length());
32        out.writeCharSequence(packet.getMethodName(), ProtocolConstant.UTF_8);
33        // 方法参数签名(String[]类型) - 非必须
34        if (null != packet.getMethodArgumentSignatures()) {
35            int len = packet.getMethodArgumentSignatures().length;
36            // 方法参数签名数组长度
37            out.writeInt(len);
38            for (int i = 0; i &lt; len; i++) {
39                String methodArgumentSignature = packet.getMethodArgumentSignatures()[i];
40                out.writeInt(methodArgumentSignature.length());
41                out.writeCharSequence(methodArgumentSignature, ProtocolConstant.UTF_8);
42            }
43        } else {
44            out.writeInt(0);
45        }
46        // 方法参数(Object[]类型) - 非必须
47        if (null != packet.getMethodArguments()) {
48            int len = packet.getMethodArguments().length;
49            // 方法参数数组长度
50            out.writeInt(len);
51            for (int i = 0; i &lt; len; i++) {
52                byte[] bytes = serializer.encode(packet.getMethodArguments()[i]);
53                out.writeInt(bytes.length);
54                out.writeBytes(bytes);
55            }
56        } else {
57            out.writeInt(0);
58        }
59    }
60}
61
62

请求Packet解码器RequestMessagePacketDecoder的代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
1@RequiredArgsConstructor
2public class RequestMessagePacketDecoder extends ByteToMessageDecoder {
3
4    @Override
5    protected void decode(ChannelHandlerContext context, ByteBuf in, List&lt;Object&gt; list) throws Exception {
6        RequestMessagePacket packet = new RequestMessagePacket();
7        // 魔数
8        packet.setMagicNumber(in.readInt());
9        // 版本
10        packet.setVersion(in.readInt());
11        // 流水号
12        int serialNumberLength = in.readInt();
13        packet.setSerialNumber(in.readCharSequence(serialNumberLength, ProtocolConstant.UTF_8).toString());
14        // 消息类型
15        byte messageTypeByte = in.readByte();
16        packet.setMessageType(MessageType.fromValue(messageTypeByte));
17        // 附件
18        Map&lt;String, String&gt; attachments = Maps.newHashMap();
19        packet.setAttachments(attachments);
20        int attachmentSize = in.readInt();
21        if (attachmentSize &gt; 0) {
22            for (int i = 0; i &lt; attachmentSize; i++) {
23                int keyLength = in.readInt();
24                String key = in.readCharSequence(keyLength, ProtocolConstant.UTF_8).toString();
25                int valueLength = in.readInt();
26                String value = in.readCharSequence(valueLength, ProtocolConstant.UTF_8).toString();
27                attachments.put(key, value);
28            }
29        }
30        // 接口全类名
31        int interfaceNameLength = in.readInt();
32        packet.setInterfaceName(in.readCharSequence(interfaceNameLength, ProtocolConstant.UTF_8).toString());
33        // 方法名
34        int methodNameLength = in.readInt();
35        packet.setMethodName(in.readCharSequence(methodNameLength, ProtocolConstant.UTF_8).toString());
36        // 方法参数签名
37        int methodArgumentSignatureArrayLength = in.readInt();
38        if (methodArgumentSignatureArrayLength &gt; 0) {
39            String[] methodArgumentSignatures = new String[methodArgumentSignatureArrayLength];
40            for (int i = 0; i &lt; methodArgumentSignatureArrayLength; i++) {
41                int methodArgumentSignatureLength = in.readInt();
42                methodArgumentSignatures[i] = in.readCharSequence(methodArgumentSignatureLength, ProtocolConstant.UTF_8).toString();
43            }
44            packet.setMethodArgumentSignatures(methodArgumentSignatures);
45        }
46        // 方法参数
47        int methodArgumentArrayLength = in.readInt();
48        if (methodArgumentArrayLength &gt; 0) {
49            // 这里的Object[]实际上是ByteBuf[] - 后面需要二次加工为对应类型的实例
50            Object[] methodArguments = new Object[methodArgumentArrayLength];
51            for (int i = 0; i &lt; methodArgumentArrayLength; i++) {
52                int byteLength = in.readInt();
53                methodArguments[i] = in.readBytes(byteLength);
54            }
55            packet.setMethodArguments(methodArguments);
56        }
57        list.add(packet);
58    }
59}
60
61

响应Packet编码器ResponseMessagePacketEncoder的代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
1@RequiredArgsConstructor
2public class ResponseMessagePacketEncoder extends MessageToByteEncoder&lt;ResponseMessagePacket&gt; {
3
4    private final Serializer serializer;
5
6    @Override
7    protected void encode(ChannelHandlerContext ctx, ResponseMessagePacket packet, ByteBuf out) throws Exception {
8        // 魔数
9        out.writeInt(packet.getMagicNumber());
10        // 版本
11        out.writeInt(packet.getVersion());
12        // 流水号
13        out.writeInt(packet.getSerialNumber().length());
14        out.writeCharSequence(packet.getSerialNumber(), ProtocolConstant.UTF_8);
15        // 消息类型
16        out.writeByte(packet.getMessageType().getType());
17        // 附件size
18        Map&lt;String, String&gt; attachments = packet.getAttachments();
19        out.writeInt(attachments.size());
20        // 附件内容
21        attachments.forEach((k, v) -&gt; {
22            out.writeInt(k.length());
23            out.writeCharSequence(k, ProtocolConstant.UTF_8);
24            out.writeInt(v.length());
25            out.writeCharSequence(v, ProtocolConstant.UTF_8);
26        });
27        // error code
28        out.writeLong(packet.getErrorCode());
29        // message
30        String message = packet.getMessage();
31        ByteBufferUtils.X.encodeUtf8CharSequence(out, message);
32        // payload
33        byte[] bytes = serializer.encode(packet.getPayload());
34        out.writeInt(bytes.length);
35        out.writeBytes(bytes);
36    }
37}
38
39

响应Packet解码器ResponseMessagePacketDecoder的代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1public class ResponseMessagePacketDecoder extends ByteToMessageDecoder {
2
3    @Override
4    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List&lt;Object&gt; out) throws Exception {
5        ResponseMessagePacket packet = new ResponseMessagePacket();
6        // 魔数
7        packet.setMagicNumber(in.readInt());
8        // 版本
9        packet.setVersion(in.readInt());
10        // 流水号
11        int serialNumberLength = in.readInt();
12        packet.setSerialNumber(in.readCharSequence(serialNumberLength, ProtocolConstant.UTF_8).toString());
13        // 消息类型
14        byte messageTypeByte = in.readByte();
15        packet.setMessageType(MessageType.fromValue(messageTypeByte));
16        // 附件
17        Map&lt;String, String&gt; attachments = Maps.newHashMap();
18        packet.setAttachments(attachments);
19        int attachmentSize = in.readInt();
20        if (attachmentSize &gt; 0) {
21            for (int i = 0; i &lt; attachmentSize; i++) {
22                int keyLength = in.readInt();
23                String key = in.readCharSequence(keyLength, ProtocolConstant.UTF_8).toString();
24                int valueLength = in.readInt();
25                String value = in.readCharSequence(valueLength, ProtocolConstant.UTF_8).toString();
26                attachments.put(key, value);
27            }
28        }
29        // error code
30        packet.setErrorCode(in.readLong());
31        // message
32        int messageLength = in.readInt();
33        packet.setMessage(in.readCharSequence(messageLength, ProtocolConstant.UTF_8).toString());
34        // payload - ByteBuf实例
35        int payloadLength = in.readInt();
36        packet.setPayload(in.readBytes(payloadLength));
37        out.add(packet);
38    }
39}
40
41

核心的编码解码器已经编写完,接着要注意一下TCP协议二进制包发送的时候只保证了包的发送顺序、确认发送以及重传,无法保证二进制包是否完整(有些博客也称此类场景为粘包、半包等等,其实网络协议里面并没有定义这些术语,估计是有人杜撰出来),因此这里采取了定长帧编码和解码器LengthFieldPrepender和LengthFieldBasedFrameDecoder,简单来说就是在消息帧的开头几位定义了整个帧的长度,读取到整个长度的消息帧才认为是一个完整的二进制报文。举个几个例子:


1
2
3
4
1|&lt;--------packet frame---------&gt;|
2| Length Field | Actual Content |
3
4

0
4
abcd
1
9
throwable
2
14
{“name”:“doge”}

编写测试客户端和服务端

客户端代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
1@Slf4j
2public class TestProtocolClient {
3
4    public static void main(String[] args) throws Exception {
5        int port = 9092;
6        EventLoopGroup workerGroup = new NioEventLoopGroup();
7        Bootstrap bootstrap = new Bootstrap();
8        try {
9            bootstrap.group(workerGroup);
10            bootstrap.channel(NioSocketChannel.class);
11            bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
12            bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.TRUE);
13            bootstrap.handler(new ChannelInitializer&lt;SocketChannel&gt;() {
14
15                @Override
16                protected void initChannel(SocketChannel ch) throws Exception {
17                    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
18                    ch.pipeline().addLast(new LengthFieldPrepender(4));
19                    ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X));
20                    ch.pipeline().addLast(new ResponseMessagePacketDecoder());
21                    ch.pipeline().addLast(new SimpleChannelInboundHandler&lt;ResponseMessagePacket&gt;() {
22                        @Override
23                        protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception {
24                            Object targetPayload = packet.getPayload();
25                            if (targetPayload instanceof ByteBuf) {
26                                ByteBuf byteBuf = (ByteBuf) targetPayload;
27                                int readableByteLength = byteBuf.readableBytes();
28                                byte[] bytes = new byte[readableByteLength];
29                                byteBuf.readBytes(bytes);
30                                targetPayload = FastJsonSerializer.X.decode(bytes, String.class);
31                                byteBuf.release();
32                            }
33                            packet.setPayload(targetPayload);
34                            log.info(&quot;接收到来自服务端的响应消息,消息内容:{}&quot;, JSON.toJSONString(packet));
35                        }
36                    });
37                }
38            });
39            ChannelFuture future = bootstrap.connect(&quot;localhost&quot;, port).sync();
40            log.info(&quot;启动NettyClient[{}]成功...&quot;, port);
41            Channel channel = future.channel();
42            RequestMessagePacket packet = new RequestMessagePacket();
43            packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER);
44            packet.setVersion(ProtocolConstant.VERSION);
45            packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber());
46            packet.setMessageType(MessageType.REQUEST);
47            packet.setInterfaceName(&quot;club.throwable.contract.HelloService&quot;);
48            packet.setMethodName(&quot;sayHello&quot;);
49            packet.setMethodArgumentSignatures(new String[]{&quot;java.lang.String&quot;});
50            packet.setMethodArguments(new Object[]{&quot;doge&quot;});
51            channel.writeAndFlush(packet);
52            future.channel().closeFuture().sync();
53        } finally {
54            workerGroup.shutdownGracefully();
55        }
56    }
57}
58
59

服务端代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1@Slf4j
2public class TestProtocolServer {
3
4    public static void main(String[] args) throws Exception {
5        int port = 9092;
6        ServerBootstrap bootstrap = new ServerBootstrap();
7        EventLoopGroup bossGroup = new NioEventLoopGroup();
8        EventLoopGroup workerGroup = new NioEventLoopGroup();
9        try {
10            bootstrap.group(bossGroup, workerGroup)
11                    .channel(NioServerSocketChannel.class)
12                    .childHandler(new ChannelInitializer&lt;SocketChannel&gt;() {
13
14                        @Override
15                        protected void initChannel(SocketChannel ch) throws Exception {
16                            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
17                            ch.pipeline().addLast(new LengthFieldPrepender(4));
18                            ch.pipeline().addLast(new RequestMessagePacketDecoder());
19                            ch.pipeline().addLast(new ResponseMessagePacketEncoder(FastJsonSerializer.X));
20                            ch.pipeline().addLast(new SimpleChannelInboundHandler&lt;RequestMessagePacket&gt;() {
21
22                                @Override
23                                protected void channelRead0(ChannelHandlerContext ctx, RequestMessagePacket packet) throws Exception {
24                                    log.info(&quot;接收到来自客户端的请求消息,消息内容:{}&quot;, JSON.toJSONString(packet));
25                                    ResponseMessagePacket response = new ResponseMessagePacket();
26                                    response.setMagicNumber(packet.getMagicNumber());
27                                    response.setVersion(packet.getVersion());
28                                    response.setSerialNumber(packet.getSerialNumber());
29                                    response.setAttachments(packet.getAttachments());
30                                    response.setMessageType(MessageType.RESPONSE);
31                                    response.setErrorCode(200L);
32                                    response.setMessage(&quot;Success&quot;);
33                                    response.setPayload(&quot;{\&quot;name\&quot;:\&quot;throwable\&quot;}&quot;);
34                                    ctx.writeAndFlush(response);
35                                }
36                            });
37                        }
38                    });
39            ChannelFuture future = bootstrap.bind(port).sync();
40            log.info(&quot;启动NettyServer[{}]成功...&quot;, port);
41            future.channel().closeFuture().sync();
42        } finally {
43            workerGroup.shutdownGracefully();
44            bossGroup.shutdownGracefully();
45        }
46    }
47}
48
49

这里在测试的环境中,最大的消息帧长度暂时定义为1024。先启动服务端,再启动客户端,见控制台输出如下:


1
2
3
4
5
6
7
8
9
10
11
1// 服务端
222:29:32.596 [main] INFO club.throwable.protocol.TestProtocolServer - 启动NettyServer[9092]成功...
3...省略其他日志...
422:29:53.538 [nioEventLoopGroup-3-1] INFO club.throwable.protocol.TestProtocolServer - 接收到来自客户端的请求消息,消息内容:{&quot;attachments&quot;:{},&quot;interfaceName&quot;:&quot;club.throwable.contract.HelloService&quot;,&quot;magicNumber&quot;:10086,&quot;messageType&quot;:&quot;REQUEST&quot;,&quot;methodArgumentSignatures&quot;:[&quot;java.lang.String&quot;],&quot;methodArguments&quot;:[{&quot;contiguous&quot;:true,&quot;direct&quot;:true,&quot;readOnly&quot;:false,&quot;readable&quot;:true,&quot;writable&quot;:false}],&quot;methodName&quot;:&quot;sayHello&quot;,&quot;serialNumber&quot;:&quot;7f992c7cf9f445258601def1cac9bec0&quot;,&quot;version&quot;:1}
5
6// 客户端
722:31:28.360 [main] INFO club.throwable.protocol.TestProtocolClient - 启动NettyClient[9092]成功...
8...省略其他日志...
922:31:39.320 [nioEventLoopGroup-2-1] INFO club.throwable.protocol.TestProtocolClient - 接收到来自服务端的响应消息,消息内容:{&quot;attachments&quot;:{},&quot;errorCode&quot;:200,&quot;magicNumber&quot;:10086,&quot;message&quot;:&quot;Success&quot;,&quot;messageType&quot;:&quot;RESPONSE&quot;,&quot;payload&quot;:&quot;{\&quot;name\&quot;:\&quot;throwable\&quot;}&quot;,&quot;serialNumber&quot;:&quot;320808e709b34edbb91ba557780b58ad&quot;,&quot;version&quot;:1}
10
11

小结

一个基于Netty实现的简单的自定义协议基本完成,但是要编写一个优秀的RPC框架,还需要做服务端的宿主类和目标方法查询、调用,客户端的动态代理,Netty的NIO模式下的同步调用改造,心跳处理,异常处理等等。后面会使用多篇文章逐个问题解决,网络编程其实挺好玩了,就是编码量会比较大(゜-゜)つロ。

Demo项目:

  • ch0-custom-rpc-protocol

(e-a-20200112 c-1-d)

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense老手经验

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索