netty 自定义协议

释放双眼,带上耳机,听听看~!

netty 自定义协议

netty 是什么呢?

相信很多人都被人问过这个问题。如果快速准确的回复这个问题呢?网络编程框架,netty可以让你快速和简单的开发出一个高性能的网络应用。netty是一个网络编程框架。那netty又有什么框框呢?主要有二个框。

框1:客户和服务的启动

一切通讯都有收与发,所有的服务端和客户端都是这样的姿势启动。具体的参数可以看文档。

服务端


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1 public void bind() throws Exception {
2        // 配置服务端的NIO线程组
3        EventLoopGroup bossGroup = new NioEventLoopGroup();
4        EventLoopGroup workerGroup = new NioEventLoopGroup();
5
6        ServerBootstrap b = new ServerBootstrap();
7
8        
9        //需要两个线程组
10        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
11            .option(ChannelOption.SO_BACKLOG, 1024)
12            .childHandler(new ServerInit());
13
14        // 绑定端口,同步等待成功
15        b.bind(NettyConstant.REMOTE_PORT).sync();
16            LOG.info("Netty server start : "
17                + (NettyConstant.REMOTE_IP + " : " + NettyConstant.REMOTE_PORT));
18    }
19

客户端


1
2
3
4
5
6
7
8
9
10
11
12
1          Bootstrap b = new Bootstrap();
2            b.group(group).channel(NioSocketChannel.class)
3                    .option(ChannelOption.TCP_NODELAY, true)
4                    .handler(new ClientInit());
5            // 发起异步连接操作
6            ChannelFuture future = b.connect(
7                    new InetSocketAddress(host, port)).sync();
8            channel = future.sync().channel();
9            
10            future.channel().closeFuture().sync();
11
12

框2:处理器ChannelHandler

netty将所有接收到或发出去的数据都交给处理器处理,hannelInboundHandler入站处理器,ChannelOutboundHandler出站处理器。入站既接收数据时触发,出站是发送时触发。包括编码器和解码器分别实现的也是这两个接口。所以我们要为服务端或客户端扩展功能的话!只要对应创建 对个类去实现这两个接口添加到通道上就行了。netty作为一个框架,当然也帮我们实现了很多经常的用的处理器了,如:StringDecoder(字串解码器),StringEncoder(字串编码器),LengthFieldPrepender(给数据添加长度解决黏包半包问题),ReadTimeoutHandler(超时检测)

服务端出入站处理器添加


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1
2**
3 * @author hdk
4 * 类说明:服务端入站处理器初始化类
5 */
6public class ServerInit extends ChannelInitializer<SocketChannel> {
7    @Override
8    protected void initChannel(SocketChannel ch) throws Exception {
9        /*Netty提供的日志打印Handler,可以展示发送接收出去的字节*/
10        //ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
11        /*剥离接收到的消息的长度字段,拿到实际的消息报文的字节数组*/
12        ch.pipeline().addLast("frameDecoder",
13                new LengthFieldBasedFrameDecoder(65535,
14                        0,2,0,
15                        2));
16        /*给发送出去的消息增加长度字段*/
17        ch.pipeline().addLast("frameEncoder",
18                new LengthFieldPrepender(2));
19        /*反序列化,将字节数组转换为消息实体*/
20        ch.pipeline().addLast(new KryoDecoder());
21        /*序列化,将消息实体转换为字节数组准备进行网络传输*/
22        ch.pipeline().addLast("MessageEncoder",
23                new KryoEncoder());
24        /*超时检测*/
25        ch.pipeline().addLast("readTimeoutHandler",
26                new ReadTimeoutHandler(10));
27        /*登录应答*/
28        ch.pipeline().addLast(new LoginAuthRespHandler());
29
30        /*心跳应答*/
31        ch.pipeline().addLast("HeartBeatHandler",
32                new HeartBeatRespHandler());
33
34        /*服务端业务处理*/
35        ch.pipeline().addLast("ServerBusiHandler",
36                new ServerBusiHandler());
37
38    }
39}
40
41

客户端出入站处理器添加


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
1/**
2 * @author hdk
3 * 类说明:客户端Handler的初始化
4 */
5public class ClientInit extends ChannelInitializer<SocketChannel> {
6    @Override
7    protected void initChannel(SocketChannel ch) throws Exception {
8        /*剥离接收到的消息的长度字段,拿到实际的消息报文的字节数组*/
9        ch.pipeline().addLast("frameDecoder",
10                new LengthFieldBasedFrameDecoder(65535,
11                        0,2,0,
12                        2));
13
14        /*给发送出去的消息增加长度字段*/
15        ch.pipeline().addLast("frameEncoder",
16                new LengthFieldPrepender(2));
17
18        /*反序列化,将字节数组转换为消息实体*/
19        ch.pipeline().addLast(new KryoDecoder());
20        /*序列化,将消息实体转换为字节数组准备进行网络传输*/
21        ch.pipeline().addLast("MessageEncoder",
22                new KryoEncoder());
23
24        /*超时检测*/
25        ch.pipeline().addLast("readTimeoutHandler",
26                new ReadTimeoutHandler(10));
27
28        /*发出登录请求*/
29        ch.pipeline().addLast("LoginAuthHandler",
30                new LoginAuthReqHandler());
31
32       /*发出心跳请求*/
33        ch.pipeline().addLast("HeartBeatHandler",
34                new HeartBeatReqHandler());
35    }
36}
37

处理器注意事项

入站:在入站实现方法中ChannelInboundHandler.channelRead必需release下一个处理器会接着触发。释放netty的ByteBuf给其他处理器读取: ReferenceCountUtil.release(msg);如果不想下个处理器处理就调用ctx.fireChannelRead(msg);

出站:在入站实现方法中ChannelOutboundHandler.write同样要调用ReferenceCountUtil.release(msg),在其他的处理器中才可以读取本处理器处理过的数据

处理器的扩展编码器与解码器

在网络编程中定义一种高效编码规则是很重要的,节省带宽提交传输效率。提高传输速度。所以netty封装了两种特殊的处理器叫:ByteToMessageDecoder解码器和MessageToByteEncoder编码器。使用方法也很简单,继承重写一下decode和encode就可以了.。

解码器 ByteToMessageDecoder


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1/**
2 * @author hdk
3 * 类说明:反序列化的Handler
4 */
5public class KryoDecoder extends ByteToMessageDecoder {
6
7    @Override
8    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
9                          List<Object> out) throws Exception {
10        Object obj = KryoSerializer.deserialize(in);
11        //数据信息处理后添加目标编码格式的数据到ByteBuf中就供后面的处理器处理。
12        out.add(obj);
13    }
14}
15

编码器 MessageToByteEncoder


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1
2/**
3 * @author hdk
4 * 类说明:序列化的Handler
5 */
6public class KryoEncoder  extends MessageToByteEncoder<MyMessage> {
7
8    @Override
9    protected void encode(ChannelHandlerContext ctx, MyMessage message,
10                          ByteBuf out) throws Exception {
11        KryoSerializer.serialize(message, out);
12        //发送ByteBuf到目标IP
13        ctx.flush();
14    }
15}
16
17

总结

netty为什么那么流行,个人认为有几点:

  • 1事件编程让代码变得很简洁。
  • 2提供很多协议如mqtt,http,websocket,smtp,redis等等,所有可以很快速的就可以开发出这些协议的服务端程序,不用自己去实现了。
  • netty 自定义协议

netty扩展-自定义协议

Netty协议栈消息定义包含两部分:消息头、消息体

netty 自定义协议

Header

crcCode
Int
32
Netty消息校验码
Length
Int
32
整个消息长度
sessionID
Long
64
会话ID
Type
Byte
8
0:业务请求消息1:业务响应消息2:业务one way消息3握手请求消息4握手应答消息5:心跳请求消息6:心跳应答消息
Priority
Byte
8
消息优先级:0~255
Attachment
Map<String,Object>
变长
可选字段,由于推展消息头


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
1/**
2 * @author hdk
3 * 类说明:消息头
4 */
5public final class MyHeader {
6
7    private int crcCode = 0xabef0101;
8
9    private int length;// 消息长度
10
11    private long sessionID;// 会话ID
12
13    private byte type;// 消息类型
14
15    private byte priority;// 消息优先级
16
17    private Map&lt;String, Object&gt; attachment = new HashMap&lt;String, Object&gt;(); // 附件
18
19    public final int getCrcCode() {
20        return crcCode;
21    }
22
23    public final void setCrcCode(int crcCode) {
24        this.crcCode = crcCode;
25    }
26
27    public final int getLength() {
28        return length;
29    }
30
31    public final void setLength(int length) {
32        this.length = length;
33    }
34
35    public final long getSessionID() {
36        return sessionID;
37    }
38
39    public final void setSessionID(long sessionID) {
40        this.sessionID = sessionID;
41    }
42
43    public final byte getType() {
44        return type;
45    }
46
47    public final void setType(byte type) {
48        this.type = type;
49    }
50
51    public final byte getPriority() {
52        return priority;
53    }
54
55    public final void setPriority(byte priority) {
56        this.priority = priority;
57    }
58
59    public final Map&lt;String, Object&gt; getAttachment() {
60        return attachment;
61    }
62
63    public final void setAttachment(Map&lt;String, Object&gt; attachment) {
64        this.attachment = attachment;
65    }
66
67    @Override
68    public String toString() {
69        return &quot;MyHeader [crcCode=&quot; + crcCode + &quot;, length=&quot; + length
70            + &quot;, sessionID=&quot; + sessionID + &quot;, type=&quot; + type + &quot;, priority=&quot;
71            + priority + &quot;, attachment=&quot; + attachment + &quot;]&quot;;
72    }
73
74}
75
76

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
1/**
2 * @author hdk
3 * 类说明:消息实体类
4 */
5public final class MyMessage {
6
7    private MyHeader myHeader;
8
9    private Object body;
10
11    public final MyHeader getMyHeader() {
12        return myHeader;
13    }
14
15    public final void setMyHeader(MyHeader myHeader) {
16        this.myHeader = myHeader;
17    }
18
19    public final Object getBody() {
20        return body;
21    }
22
23    public final void setBody(Object body) {
24        this.body = body;
25    }
26
27    @Override
28    public String toString() {
29        return &quot;MyMessage [myHeader=&quot; + myHeader + &quot;][body=&quot;+body+&quot;]&quot;;
30    }
31}
32
33

源码

https://github.com/huangdongkui/netty_project

给TA打赏
共{{data.count}}人
人已打赏
安全技术

使用bootstrap的栅栏实现五列布局

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索