Mina、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)

释放双眼,带上耳机,听听看~!

在上一篇博文中,有介绍到用换行符分割消息的方法。但是这种方法有个小问题,如果消息中本身就包含换行符,那将会将这条消息分割成两条,结果就不对了。

本文介绍另外一种消息分割方式,即上一篇博文中讲的第2条:use a fixed length header that indicates the length of the body,用一个固定字节数的Header前缀来指定Body的字节数,以此来分割消息。

Mina、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)

上面图中Header固定为4字节,Header中保存的是一个4字节(32位)的整数,例如12即为0x0000000C,这个整数用来指定Body的长度(字节数)。当读完这么多字节的Body之后,又是下一条消息的Header。

下面分别用MINA、Netty、Twisted来实现对这种消息的切合和解码。

MINA:

MINA提供了PrefixedStringCodecFactory来对这种类型的消息进行编码解码,PrefixedStringCodecFactory默认Header的大小是4字节,当然也可以指定成1或2。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1public class TcpServer {  
2  
3    public static void main(String[] args) throws IOException {  
4        IoAcceptor acceptor = new NioSocketAcceptor();  
5          
6        // 4字节的Header指定Body的字节数,对这种消息的处理  
7        acceptor.getFilterChain().addLast("codec",  
8                new ProtocolCodecFilter(new PrefixedStringCodecFactory(Charset.forName("UTF-8"))));  
9          
10        acceptor.setHandler(new TcpServerHandle());  
11        acceptor.bind(new InetSocketAddress(8080));  
12    }  
13  
14}  
15  
16class TcpServerHandle extends IoHandlerAdapter {  
17  
18    @Override  
19    public void exceptionCaught(IoSession session, Throwable cause)  
20            throws Exception {  
21        cause.printStackTrace();  
22    }  
23  
24    // 接收到新的数据  
25    @Override  
26    public void messageReceived(IoSession session, Object message)  
27            throws Exception {  
28  
29        String msg = (String) message;  
30        System.out.println("messageReceived:" + msg);  
31          
32    }  
33  
34    @Override  
35    public void sessionCreated(IoSession session) throws Exception {  
36        System.out.println("sessionCreated");  
37    }  
38  
39    @Override  
40    public void sessionClosed(IoSession session) throws Exception {  
41        System.out.println("sessionClosed");  
42    }  
43}
44

Netty:

Netty使用LengthFieldBasedFrameDecoder来处理这种消息。下面代码中的new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4)中包含5个参数,分别是int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip。maxFrameLength为消息的最大长度,lengthFieldOffset为Header的位置,lengthFieldLength为Header的长度,lengthAdjustment为长度调整(默认Header中的值表示Body的长度,并不包含Header自己),initialBytesToStrip为去掉字节数(默认解码后返回Header+Body的全部内容,这里设为4表示去掉4字节的Header,只留下Body)。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
1public class TcpServer {  
2  
3    public static void main(String[] args) throws InterruptedException {  
4        EventLoopGroup bossGroup = new NioEventLoopGroup();  
5        EventLoopGroup workerGroup = new NioEventLoopGroup();  
6        try {  
7            ServerBootstrap b = new ServerBootstrap();  
8            b.group(bossGroup, workerGroup)  
9                    .channel(NioServerSocketChannel.class)  
10                    .childHandler(new ChannelInitializer<SocketChannel>() {  
11                        @Override  
12                        public void initChannel(SocketChannel ch)  
13                                throws Exception {  
14                            ChannelPipeline pipeline = ch.pipeline();  
15                              
16                            // LengthFieldBasedFrameDecoder按行分割消息,取出body  
17                            pipeline.addLast(new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4));  
18                            // 再按UTF-8编码转成字符串  
19                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));  
20                              
21                            pipeline.addLast(new TcpServerHandler());  
22                        }  
23                    });  
24            ChannelFuture f = b.bind(8080).sync();  
25            f.channel().closeFuture().sync();  
26        } finally {  
27            workerGroup.shutdownGracefully();  
28            bossGroup.shutdownGracefully();  
29        }  
30    }  
31  
32}  
33  
34class TcpServerHandler extends ChannelInboundHandlerAdapter {  
35  
36    // 接收到新的数据  
37    @Override  
38    public void channelRead(ChannelHandlerContext ctx, Object msg) {  
39          
40        String message = (String) msg;  
41        System.out.println("channelRead:" + message);  
42    }  
43  
44    @Override  
45    public void channelActive(ChannelHandlerContext ctx) {  
46        System.out.println("channelActive");  
47    }  
48  
49    @Override  
50    public void channelInactive(ChannelHandlerContext ctx) {  
51        System.out.println("channelInactive");  
52    }  
53  
54    @Override  
55    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
56        cause.printStackTrace();  
57        ctx.close();  
58    }  
59}  
60

Twisted:

在Twisted中需要继承Int32StringReceiver,不再继承Protocol。Int32StringReceiver表示固定32位(4字节)的Header,另外还有Int16StringReceiver、Int8StringReceiver等。而需要实现的接受数据事件的方法不再是dataReceived,也不是lineReceived,而是stringReceived。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1# -*- coding:utf-8 –*-  
2  
3from twisted.protocols.basic import Int32StringReceiver  
4from twisted.internet.protocol import Factory  
5from twisted.internet import reactor  
6  
7class TcpServerHandle(Int32StringReceiver):  
8  
9    # 新的连接建立  
10    def connectionMade(self):  
11        print 'connectionMade'  
12  
13    # 连接断开  
14    def connectionLost(self, reason):  
15        print 'connectionLost'  
16  
17    # 接收到新的数据  
18    def stringReceived(self, data):  
19        print 'stringReceived:' + data  
20  
21factory = Factory()  
22factory.protocol = TcpServerHandle  
23reactor.listenTCP(8080, factory)  
24reactor.run()  
25

下面是Java编写的一个客户端测试程序:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
1public class TcpClient {  
2  
3    public static void main(String[] args) throws IOException {  
4  
5        Socket socket = null;  
6        DataOutputStream out = null;  
7  
8        try {  
9  
10            socket = new Socket("localhost", 8080);  
11            out = new DataOutputStream(socket.getOutputStream());  
12  
13            // 请求服务器  
14            String data1 = "牛顿";  
15            byte[] outputBytes1 = data1.getBytes("UTF-8");  
16            out.writeInt(outputBytes1.length); // write header  
17            out.write(outputBytes1); // write body  
18              
19            String data2 = "爱因斯坦";  
20            byte[] outputBytes2 = data2.getBytes("UTF-8");  
21            out.writeInt(outputBytes2.length); // write header  
22            out.write(outputBytes2); // write body  
23              
24            out.flush();  
25  
26        } finally {  
27            // 关闭连接  
28            out.close();  
29            socket.close();  
30        }  
31    }  
32}
33

MINA服务器输出结果:

sessionCreated
messageReceived:牛顿
messageReceived:爱因斯坦
sessionClosed

Netty服务器输出结果:

channelActive
channelRead:牛顿
channelRead:爱因斯坦
channelInactive

Twisted服务器输出结果:

connectionMade
stringReceived:牛顿
stringReceived:爱因斯坦
connectionLost

MINA、Netty、Twisted一起学系列

MINA、Netty、Twisted一起学(一):实现简单的TCP服务器

MINA、Netty、Twisted一起学(二):TCP消息边界问题及按行分割消息

MINA、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)

MINA、Netty、Twisted一起学(四):定制自己的协议

MINA、Netty、Twisted一起学(五):整合protobuf

MINA、Netty、Twisted一起学(六):session

MINA、Netty、Twisted一起学(七):发布/订阅(Publish/Subscribe)

MINA、Netty、Twisted一起学(八):HTTP服务器

MINA、Netty、Twisted一起学(九):异步IO和回调函数

MINA、Netty、Twisted一起学(十):线程模型

MINA、Netty、Twisted一起学(十一):SSL/TLS

MINA、Netty、Twisted一起学(十二):HTTPS

源码

https://github.com/wucao/mina-netty-twisted

给TA打赏
共{{data.count}}人
人已打赏
安全技术

详解Node.js API系列 Http模块(1) 构造一个简单的静态页服务器

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索