Netty自定义私有协议栈
自定义私有协议栈开发,其实就是自己封装一套符合自定义数据包结构的编码器和解码器,从而满足我们的业务需求。
通常我们数据包拆分,一部分为包头,一部分为包体,一个数据包就有两部分构成。
如图所示
对于数据包,我们进行细化,每个部分都有很多基本元素组成,利用这些基本元素,我们能够实现通过解析数据包和封装数据包,能轻松的实现
自定义协议栈的开发。
在包头中我们用
一个short类型来表示魔法头,
一个byte类型的来表示版本号
一个int类型来表示数据包体的长度,
一个short类型的来表示消息的commid
一个int类型的来表示序列号
在包体中我们用:
一个byte数组来表示存储的数据,
具体如图所示
在代码上我们通过编码两个实体类来描述定义的代码,为了提高代码的复用性。我们将代码定义为顶级包结构,可扩展为tcp协议数据包和udp协议数据包以及http协议数据包
如下代码所示;
消息头
NettyNetMessageHead.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 1public class NettyNetMessageHead {
2
3 public static final short MESSAGE_HEADER_FLAG = 0x2425;
4
5 /**
6 * 魔法头
7 */
8 private short head;
9 /**
10 * 版本号
11 */
12 private byte version;
13 /**
14 * 长度
15 */
16 private int length;
17 /**
18 * 命令
19 */
20 private short cmd;
21 /**
22 * 序列号
23 */
24 private int serial;
25
26 public NettyNetMessageHead() {
27 this.head = MESSAGE_HEADER_FLAG;
28 }
29
30 public short getHead() {
31 return head;
32 }
33
34 public void setHead(short head) {
35 this.head = head;
36 }
37
38 public byte getVersion() {
39 return version;
40 }
41
42 public void setVersion(byte version) {
43 this.version = version;
44 }
45
46 public int getLength() {
47 return length;
48 }
49
50 public void setLength(int length) {
51 this.length = length;
52 }
53
54 public short getCmd() {
55 return cmd;
56 }
57
58 public void setCmd(short cmd) {
59 this.cmd = cmd;
60 }
61
62 public int getSerial() {
63 return serial;
64 }
65
66 public void setSerial(int serial) {
67 this.serial = serial;
68 }
69
70
消息体
NettyNetMessageBody.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1public class NettyNetMessageBody {
2 /**
3 * 存储数据
4 */
5 private byte[] bytes;
6
7 public byte[] getBytes() {
8 return bytes;
9 }
10
11 public void setBytes(byte[] bytes) {
12 this.bytes = bytes;
13 }
14}
15
16
为了提高消息传输的效率,我们采用protocolbuf来作为序列化编码方式,所以我们将消息抽象成一个抽象类,
为了提高代码复用性,我们创建一个接口。来实现消息的基本操作。
INettyMessage.java
1
2
3
4
5
6
7 1public interface INettyMessage {
2 public NettyNetMessageHead getNetMessageHead();
3 public NettyNetMessageBody getNetMessageBody();
4}
5
6
7
我们可以看到,消息接口中我们有连个方法,一个是getNetMessageHead()用来获取消息头。
一个是getNetMessageBody()用来获取消息体,
然后我们通过封装一个抽象类,AbstractNettyNetMessage.java.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 1public abstract class AbstractNettyNetMessage implements INettyMessage {
2 public NettyNetMessageHead nettyNetMessageHead;
3 public NettyNetMessageBody nettyNetMessageBody;
4 /**
5 * 增加默认属性(附带逻辑调用需要的属性)
6 */
7 private final ConcurrentHashMap<Object, Object> attributes = new ConcurrentHashMap<Object, Object>(3);
8
9
10 public NettyNetMessageHead getNettyNetMessageHead() {
11 return nettyNetMessageHead;
12 }
13
14 public void setNettyNetMessageHead(NettyNetMessageHead nettyNetMessageHead) {
15 this.nettyNetMessageHead = nettyNetMessageHead;
16 }
17
18 public NettyNetMessageBody getNettyNetMessageBody() {
19 return nettyNetMessageBody;
20 }
21
22 public void setNettyNetMessageBody(NettyNetMessageBody nettyNetMessageBody) {
23 this.nettyNetMessageBody = nettyNetMessageBody;
24 }
25
26 public ConcurrentHashMap<Object, Object> getAttributes() {
27 return attributes;
28 }
29 public Object getAttribute(Object key){
30 return attributes.get(key);
31 }
32 public void remove(Object key){
33 attributes.remove(key);
34 }
35
36}
37
38
AbstractNettyNetMessage.java.来实现消息的基本操作。这样,我们的代码复用性得到大大的提高,不管是tcp消息还是其他的消息.
我们都可以继承这个类来进行扩展当然,我们最终的目的是为了进行google 的protobuf消息进行编解码。所以我们封装了另一个抽象类
AbstractNettyNetProtoBufMessage.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 1**
2 * Created by twjitm on 2017/11/15.
3 * 基础protobuf协议消息
4 */
5public abstract class AbstractNettyNetProtoBufMessage extends AbstractNettyNetMessage {
6
7 public AbstractNettyNetProtoBufMessage() {
8 setNettyNetMessageHead(new NettyNetMessageHead());
9 setNettyNetMessageBody(new NettyNetMessageBody());
10 }
11 @Override
12 public NettyNetMessageHead getNetMessageHead() {
13 return getNettyNetMessageHead();
14 }
15
16 @Override
17 public NettyNetMessageBody getNetMessageBody() {
18 return getNettyNetMessageBody();
19 }
20
21 protected void initHeadCommId() {
22 MessageCommandAnntation messageCommandAnntation = this.getClass().getAnnotation(MessageCommandAnntation.class);
23 if(messageCommandAnntation!=null){
24 getNetMessageHead().setCmd((short) messageCommandAnntation.messagecmd().commId);
25 }
26 }
27 /*释放message的body*/
28 public void releaseMessageBody() throws CodecException, Exception{
29 getNetMessageBody().setBytes(null);
30 }
31
32 public abstract void release() throws CodecException;
33
34 public abstract void encodeNetProtoBufMessageBody() throws CodecException, Exception;
35 public abstract void decoderNetProtoBufMessageBody() throws CodecException, Exception;
36
37 public void setSerial(int serial){
38 getNetMessageHead().setSerial(serial);
39 }
40
41
42
43
44
45}
46
47
48
这个抽象类 继承了上面的AbstractNettyNetMessage,所以有父类的方法。
关系图如图所示
最后我们封装成一个抽象的tcp消息或者udp消息等,如下代码就封装成一个tcp抽象消息。
1
2
3
4
5
6
7
8
9
10
11 1public abstract class AbstractNettyNetProtoBufTcpMessage extends AbstractNettyNetProtoBufMessage {
2 public AbstractNettyNetProtoBufTcpMessage() {
3 super();
4 setNettyNetMessageHead(new NettyNetMessageHead());
5 setNettyNetMessageBody(new NettyNetProtoBufMessageBody());
6 initHeadCommId();
7 }
8}
9
10
11
为了能够通过消息解耦的方式,我们通常采用自定义注解的形式来实现代理。在这就不介绍代理相关的知识了,还有反射等,
从代码中我们可以看到
一个tcp消息被创建的时候,就会执行initHeadCommId()方法,通过initHeadCommId()我们能够标记某个具体的消息上的注解的消息id,
t抽象消息类图结构.
我们通过自定义注解
MessageCommandAnntation.java 来注解一个消息id的枚举类
1
2
3
4
5
6
7
8
9
10 1/**
2 * 消息分离注解
3 */
4@Retention(RetentionPolicy.RUNTIME)
5@Documented
6public @interface MessageCommandAnntation {
7 MessageComm messagecmd();
8}
9
10
MessageComm.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 1public enum MessageComm {
2 MESSAGE_TRUE_RETURN(0),
3 PUBLIC_CHART_MESSAGE(1),
4 PRIVATE_CHAT_MESSAGE(2),
5 PLAYER_LOGIN_MESSAGE(3),
6 PLAYER_LOGOUT_MESSAGE(4),
7 DELETE_CHAT_MESSAGE(5),
8 HEART_MESSAGE(6);
9
10 public int commId;
11
12 MessageComm(int commId) {
13 this.commId = commId;
14 }
15
16 public static int getVaule(MessageComm messageComm) {
17 return messageComm.commId;
18 }
19
20}
21
22
目前为止,我们只是为了实现编解码做好前提准备,下来我们就来看如何实现自定义编码器和解码器。
解码
同样,为了提高代码的复用,我们定义个一个抽象的解码工厂接口
INettyNetProtoBuffToMessageDecoderFactory.java
1
2
3
4
5
6 1public interface INettyNetProtoBuffToMessageDecoderFactory {
2 public AbstractNettyNetProtoBufMessage praseMessage(ByteBuf byteBuf);
3}
4
5
6
接口中就一个方法,将数据流解码成一个具体的消息实体。
然后我们将这个工厂类扩展为一个tcp协议解码工厂,
INettyNetProtoBuffTCPToMessageDecoderFactory.java ,
1
2
3
4
5
6 1@Service
2public interface INettyNetProtoBuffTCPToMessageDecoderFactory extends INettyNetProtoBuffToMessageDecoderFactory {
3}
4
5
6
虽然目前没有任何方法,为了方便后来的扩充做好了前提准备。最后我们真正来编写一个tcp消息解码器工厂的具体实现。NettyNetProtoBuffTCPToMessageDecoderFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 1@Service
2public class NettyNetProtoBuffTCPToMessageDecoderFactory implements INettyNetProtoBuffTCPToMessageDecoderFactory {
3
4 @Override
5 public AbstractNettyNetProtoBufMessage praseMessage(ByteBuf byteBuf) {
6 NettyNetMessageHead netMessageHead=new NettyUDPMessageHead();
7 //跳过2个字节
8 byteBuf.skipBytes(2);
9 //消息长度
10 netMessageHead.setLength(byteBuf.readInt());
11 //版本号
12 netMessageHead.setVersion(byteBuf.readByte());
13 //read message context
14 //读取内容
15 short cmd = byteBuf.readShort();
16 //消息id
17 netMessageHead.setCmd(cmd);
18 //序列号
19 netMessageHead.setSerial(byteBuf.readInt());
20 //通过spring管理容器
21 MessageRegistryFactory registryFactory =SpringServiceManager.springLoadService.getMessageRegistryFactory();
22 AbstractNettyNetProtoBufMessage nettyMessage = registryFactory.get(cmd);
23 nettyMessage.setNettyNetMessageHead(netMessageHead);
24 NettyNetMessageBody nettyNetMessageBody=new NettyNetMessageBody();
25 //数据域大小
26 int byteLength = byteBuf.readableBytes();
27 ByteBuf bodyByteBuffer = Unpooled.buffer(256);
28 byte[] bytes = new byte[byteLength];
29 bodyByteBuffer = byteBuf.getBytes(byteBuf.readerIndex(), bytes);
30 //保存数据到数据域
31 nettyNetMessageBody.setBytes(bytes);
32 nettyMessage.setNettyNetMessageBody(nettyNetMessageBody);
33 try {
34 //提交给具体的消息解码
35 nettyMessage.decoderNetProtoBufMessageBody();
36 } catch (Exception e) {
37 e.printStackTrace();
38 nettyMessage.release();
39 }
40 return nettyMessage;
41 }
42
43
44}
45
46
入上述代码所示,在解码过程中我们利用到了spring整合Netty作为容器去管理一些bean对象,在这就不进行扩展描述了,
编码器
同样,为了提高代码的通用行,我们定义一个抽象的消息编码工厂接口INettyNetProtoBufMessageEncoderFactory.java
1
2
3
4
5
6 1public interface INettyNetProtoBufMessageEncoderFactory {
2 public ByteBuf createByteBuf(AbstractNettyNetProtoBufMessage netMessage) throws Exception;
3}
4
5
6
然后进行扩展,入进行tcp协议进行扩展,则编写一个INettyNetProtoBufTcpMessageEncoderFactory.java 的接口来继承这个接口。
1
2
3
4
5
6
7
8 1/**
2 * Created by twjitm on
3 */
4public interface INettyNetProtoBufTcpMessageEncoderFactory extends INettyNetProtoBufMessageEncoderFactory {
5
6}
7
8
同理,我们编译一个NettyNetProtoBufTcpMessageEncoderFactory.java
的实现类来实现消息编码。如下代码所示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 1@Service
2public class NettyNetProtoBufTcpMessageEncoderFactory implements INettyNetProtoBufTcpMessageEncoderFactory {
3
4 @Override
5 public ByteBuf createByteBuf(AbstractNettyNetProtoBufMessage netMessage) throws Exception {
6 ByteBuf byteBuf = Unpooled.buffer(256);
7 //编写head
8 NettyNetMessageHead netMessageHead = netMessage.getNetMessageHead();
9 byteBuf.writeShort(netMessageHead.getHead());
10 //长度
11 byteBuf.writeInt(netMessageHead.getLength());
12 //设置内容
13 byteBuf.writeByte(netMessageHead.getVersion());
14 //设置消息id
15 byteBuf.writeShort(netMessageHead.getCmd());
16 //设置系列号
17 byteBuf.writeInt(netMessageHead.getSerial());
18 //编写body
19 netMessage.encodeNetProtoBufMessageBody();
20 NettyNetMessageBody netMessageBody = netMessage.getNetMessageBody();
21 byteBuf.writeBytes(netMessageBody.getBytes());
22
23 //重新设置长度
24 int skip = 6;
25 int length = byteBuf.readableBytes() - skip;
26 byteBuf.setInt(2, length);
27 byteBuf.slice();
28 return byteBuf;
29 }
30}
31
32
到此编码器和解码器的对应工厂类编写完毕。接下来我们就要实现一个编码器和一个解码器了
不管是编码器还是解码器,我们都继承netty提供的消息转消息编解码器MessageToMessageDecoder
如下代码所示我们自定义的一个解码器。
NettyNetProtoBufMessageTCPDecoder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 1/**
2 *
3 * @author tjwitm
4 * @date 2017/11/16
5 */
6public class NettyNetProtoBufMessageTCPDecoder extends MessageToMessageDecoder<ByteBuf> {
7 Logger logger=LoggerUtils.getLogger(NettyNetProtoBufMessageTCPDecoder.class);
8 private final Charset charset;
9 private INettyNetProtoBuffTCPToMessageDecoderFactory iNettyNetProtoBuffTCPToMessageDecoderFactory;
10
11 public NettyNetProtoBufMessageTCPDecoder() {
12 this(CharsetUtil.UTF_8);
13 }
14
15 public NettyNetProtoBufMessageTCPDecoder(Class<? extends ByteBuf> inboundMessageType, Charset charset, INettyNetProtoBuffTCPToMessageDecoderFactory iNettyNetProtoBuffTCPToMessageDecoerFactory) {
16 super(inboundMessageType);
17 this.charset = charset;
18 this.iNettyNetProtoBuffTCPToMessageDecoderFactory = iNettyNetProtoBuffTCPToMessageDecoerFactory;
19 }
20
21 public NettyNetProtoBufMessageTCPDecoder(Charset charset) {
22 if (charset == null) {
23 throw new NullPointerException("charset");
24 }
25 this.charset = charset;
26 iNettyNetProtoBuffTCPToMessageDecoderFactory = SpringServiceManager.springLoadService.getNettyNetProtoBuffTCPToMessageDecoderFactory();
27 }
28
29 @Override
30 protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
31 if (iNettyNetProtoBuffTCPToMessageDecoderFactory == null) {
32 logger.error("iNettyNetProtoBuffTCPToMessageDecoderFactory is null ");
33 } else {
34 out.add(iNettyNetProtoBuffTCPToMessageDecoderFactory.praseMessage(msg));
35
36 }
37 }
38}
39
40
重写了decode方法,利用我们自定义的解码工厂来进行解码。。
同样的道理,我们来看编码器的编写
NettyNetProtoBufMessageTCPEncoder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 1public class NettyNetProtoBufMessageTCPEncoder extends MessageToMessageEncoder<AbstractNettyNetProtoBufMessage> {
2
3 private final Charset charset;
4
5 private INettyNetProtoBufTcpMessageEncoderFactory iNetMessageEncoderFactory;
6
7 public NettyNetProtoBufMessageTCPEncoder() {
8 this(CharsetUtil.UTF_8);
9 this.iNetMessageEncoderFactory = SpringServiceManager.springLoadService.getNettyNetProtoBufTcpMessageEncoderFactory();
10 }
11
12 public NettyNetProtoBufMessageTCPEncoder(Charset charset) {
13 if(charset == null) {
14 throw new NullPointerException("charset");
15 } else {
16 this.charset = charset;
17 }
18 }
19
20 @Override
21 protected void encode(ChannelHandlerContext ctx, AbstractNettyNetProtoBufMessage msg, List<Object> out) throws Exception {
22 ByteBuf netMessageBuf = iNetMessageEncoderFactory.createByteBuf(msg);
23 out.add(netMessageBuf);
24 }
25}
26
27
28
细心的小伙伴可能会发现这一次我们的编码器是继承了MessageToMessageEncoder。当然,这也是netty提供的
编码器。重写encode方法,利用我们自定义的编码器工厂类来进行编码。
到此,netty的自定义编码器和解码器的编写介绍完毕,介于代码比较多,我们利用tcp协议来进行描述,下面我们就来看看
编写实例进行测试吧。
我们编写一个服务器引导程序StartTcpService和编写一个客户端引导实例。启动服务器,启动客户端就可以进行传输
想要看效果的小伙伴自己下载源码。
中间我们还有很多没有介绍,由于篇幅限定,我们将进行下一篇描述《Netty实战开发(4):Netty整合spring来管理bean对象》