在上一篇博文中,有介绍到用换行符分割消息的方法。但是这种方法有个小问题,如果消息中本身就包含换行符,那将会将这条消息分割成两条,结果就不对了。
本文介绍另外一种消息分割方式,即上一篇博文中讲的第2条:use a fixed length header that indicates the length of the body,用一个固定字节数的Header前缀来指定Body的字节数,以此来分割消息。
上面图中Header固定为4字节,Header中保存的是一个4字节(32位)的整数,例如12即为0x0000000C,这个整数用来指定Body的长度(字节数)。当读完这么多字节的Body之后,又是下一条消息的Header。
下面分别用MINA、Netty、Twisted来实现对这种消息的切合和解码。
MINA:
MINA提供了PrefixedStringCodecFactory来对这种类型的消息进行编码解码,PrefixedStringCodecFactory默认Header的大小是4字节,当然也可以指定成1或2。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 1public class TcpServer {
2
3 public static void main(String[] args) throws IOException {
4 IoAcceptor acceptor = new NioSocketAcceptor();
5
6 // 4字节的Header指定Body的字节数,对这种消息的处理
7 acceptor.getFilterChain().addLast("codec",
8 new ProtocolCodecFilter(new PrefixedStringCodecFactory(Charset.forName("UTF-8"))));
9
10 acceptor.setHandler(new TcpServerHandle());
11 acceptor.bind(new InetSocketAddress(8080));
12 }
13
14}
15
16class TcpServerHandle extends IoHandlerAdapter {
17
18 @Override
19 public void exceptionCaught(IoSession session, Throwable cause)
20 throws Exception {
21 cause.printStackTrace();
22 }
23
24 // 接收到新的数据
25 @Override
26 public void messageReceived(IoSession session, Object message)
27 throws Exception {
28
29 String msg = (String) message;
30 System.out.println("messageReceived:" + msg);
31
32 }
33
34 @Override
35 public void sessionCreated(IoSession session) throws Exception {
36 System.out.println("sessionCreated");
37 }
38
39 @Override
40 public void sessionClosed(IoSession session) throws Exception {
41 System.out.println("sessionClosed");
42 }
43}
44
Netty:
Netty使用LengthFieldBasedFrameDecoder来处理这种消息。下面代码中的new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4)中包含5个参数,分别是int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip。maxFrameLength为消息的最大长度,lengthFieldOffset为Header的位置,lengthFieldLength为Header的长度,lengthAdjustment为长度调整(默认Header中的值表示Body的长度,并不包含Header自己),initialBytesToStrip为去掉字节数(默认解码后返回Header+Body的全部内容,这里设为4表示去掉4字节的Header,只留下Body)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 1public class TcpServer {
2
3 public static void main(String[] args) throws InterruptedException {
4 EventLoopGroup bossGroup = new NioEventLoopGroup();
5 EventLoopGroup workerGroup = new NioEventLoopGroup();
6 try {
7 ServerBootstrap b = new ServerBootstrap();
8 b.group(bossGroup, workerGroup)
9 .channel(NioServerSocketChannel.class)
10 .childHandler(new ChannelInitializer<SocketChannel>() {
11 @Override
12 public void initChannel(SocketChannel ch)
13 throws Exception {
14 ChannelPipeline pipeline = ch.pipeline();
15
16 // LengthFieldBasedFrameDecoder按行分割消息,取出body
17 pipeline.addLast(new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4));
18 // 再按UTF-8编码转成字符串
19 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
20
21 pipeline.addLast(new TcpServerHandler());
22 }
23 });
24 ChannelFuture f = b.bind(8080).sync();
25 f.channel().closeFuture().sync();
26 } finally {
27 workerGroup.shutdownGracefully();
28 bossGroup.shutdownGracefully();
29 }
30 }
31
32}
33
34class TcpServerHandler extends ChannelInboundHandlerAdapter {
35
36 // 接收到新的数据
37 @Override
38 public void channelRead(ChannelHandlerContext ctx, Object msg) {
39
40 String message = (String) msg;
41 System.out.println("channelRead:" + message);
42 }
43
44 @Override
45 public void channelActive(ChannelHandlerContext ctx) {
46 System.out.println("channelActive");
47 }
48
49 @Override
50 public void channelInactive(ChannelHandlerContext ctx) {
51 System.out.println("channelInactive");
52 }
53
54 @Override
55 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
56 cause.printStackTrace();
57 ctx.close();
58 }
59}
60
Twisted:
在Twisted中需要继承Int32StringReceiver,不再继承Protocol。Int32StringReceiver表示固定32位(4字节)的Header,另外还有Int16StringReceiver、Int8StringReceiver等。而需要实现的接受数据事件的方法不再是dataReceived,也不是lineReceived,而是stringReceived。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 1# -*- coding:utf-8 –*-
2
3from twisted.protocols.basic import Int32StringReceiver
4from twisted.internet.protocol import Factory
5from twisted.internet import reactor
6
7class TcpServerHandle(Int32StringReceiver):
8
9 # 新的连接建立
10 def connectionMade(self):
11 print 'connectionMade'
12
13 # 连接断开
14 def connectionLost(self, reason):
15 print 'connectionLost'
16
17 # 接收到新的数据
18 def stringReceived(self, data):
19 print 'stringReceived:' + data
20
21factory = Factory()
22factory.protocol = TcpServerHandle
23reactor.listenTCP(8080, factory)
24reactor.run()
25
下面是Java编写的一个客户端测试程序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1public class TcpClient {
2
3 public static void main(String[] args) throws IOException {
4
5 Socket socket = null;
6 DataOutputStream out = null;
7
8 try {
9
10 socket = new Socket("localhost", 8080);
11 out = new DataOutputStream(socket.getOutputStream());
12
13 // 请求服务器
14 String data1 = "牛顿";
15 byte[] outputBytes1 = data1.getBytes("UTF-8");
16 out.writeInt(outputBytes1.length); // write header
17 out.write(outputBytes1); // write body
18
19 String data2 = "爱因斯坦";
20 byte[] outputBytes2 = data2.getBytes("UTF-8");
21 out.writeInt(outputBytes2.length); // write header
22 out.write(outputBytes2); // write body
23
24 out.flush();
25
26 } finally {
27 // 关闭连接
28 out.close();
29 socket.close();
30 }
31 }
32}
33
MINA服务器输出结果:
sessionCreated
messageReceived:牛顿
messageReceived:爱因斯坦
sessionClosed
Netty服务器输出结果:
channelActive
channelRead:牛顿
channelRead:爱因斯坦
channelInactive
Twisted服务器输出结果:
connectionMade
stringReceived:牛顿
stringReceived:爱因斯坦
connectionLost
MINA、Netty、Twisted一起学系列
MINA、Netty、Twisted一起学(一):实现简单的TCP服务器
MINA、Netty、Twisted一起学(二):TCP消息边界问题及按行分割消息
MINA、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)
MINA、Netty、Twisted一起学(四):定制自己的协议
MINA、Netty、Twisted一起学(五):整合protobuf
MINA、Netty、Twisted一起学(六):session
MINA、Netty、Twisted一起学(七):发布/订阅(Publish/Subscribe)
MINA、Netty、Twisted一起学(八):HTTP服务器
MINA、Netty、Twisted一起学(九):异步IO和回调函数
MINA、Netty、Twisted一起学(十):线程模型
MINA、Netty、Twisted一起学(十一):SSL/TLS
MINA、Netty、Twisted一起学(十二):HTTPS