Netty游戏服务器实战开发(3):自定义私有协议栈

释放双眼,带上耳机,听听看~!

Netty自定义私有协议栈
自定义私有协议栈开发,其实就是自己封装一套符合自定义数据包结构的编码器和解码器,从而满足我们的业务需求。

通常我们数据包拆分,一部分为包头,一部分为包体,一个数据包就有两部分构成。

如图所示

对于数据包,我们进行细化,每个部分都有很多基本元素组成,利用这些基本元素,我们能够实现通过解析数据包和封装数据包,能轻松的实现
自定义协议栈的开发。

在包头中我们用

一个short类型来表示魔法头,
一个byte类型的来表示版本号
一个int类型来表示数据包体的长度,
一个short类型的来表示消息的commid
一个int类型的来表示序列号

在包体中我们用:

一个byte数组来表示存储的数据,

具体如图所示

在代码上我们通过编码两个实体类来描述定义的代码,为了提高代码的复用性。我们将代码定义为顶级包结构,可扩展为tcp协议数据包和udp协议数据包以及http协议数据包

如下代码所示;
消息头

NettyNetMessageHead.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
1public class NettyNetMessageHead {
2
3    public static final short MESSAGE_HEADER_FLAG = 0x2425;
4
5    /**
6     * 魔法头
7     */
8    private short head;
9    /**
10     * 版本号
11     */
12    private byte version;
13    /**
14     * 长度
15     */
16    private int length;
17    /**
18     * 命令
19     */
20    private short cmd;
21    /**
22     * 序列号
23     */
24    private int serial;
25
26    public NettyNetMessageHead() {
27        this.head = MESSAGE_HEADER_FLAG;
28    }
29
30    public short getHead() {
31        return head;
32    }
33
34    public void setHead(short head) {
35        this.head = head;
36    }
37
38    public byte getVersion() {
39        return version;
40    }
41
42    public void setVersion(byte version) {
43        this.version = version;
44    }
45
46    public int getLength() {
47        return length;
48    }
49
50    public void setLength(int length) {
51        this.length = length;
52    }
53
54    public short getCmd() {
55        return cmd;
56    }
57
58    public void setCmd(short cmd) {
59        this.cmd = cmd;
60    }
61
62    public int getSerial() {
63        return serial;
64    }
65
66    public void setSerial(int serial) {
67        this.serial = serial;
68    }
69
70

消息体

NettyNetMessageBody.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1public class NettyNetMessageBody {
2    /**
3     * 存储数据
4     */
5    private byte[] bytes;
6
7    public byte[] getBytes() {
8        return bytes;
9    }
10
11    public void setBytes(byte[] bytes) {
12        this.bytes = bytes;
13    }
14}
15
16

为了提高消息传输的效率,我们采用protocolbuf来作为序列化编码方式,所以我们将消息抽象成一个抽象类,
为了提高代码复用性,我们创建一个接口。来实现消息的基本操作。

INettyMessage.java


1
2
3
4
5
6
7
1public interface INettyMessage {
2    public NettyNetMessageHead getNetMessageHead();
3    public NettyNetMessageBody getNetMessageBody();
4}
5
6
7

我们可以看到,消息接口中我们有连个方法,一个是getNetMessageHead()用来获取消息头。
一个是getNetMessageBody()用来获取消息体,
然后我们通过封装一个抽象类,AbstractNettyNetMessage.java.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
1public abstract class AbstractNettyNetMessage implements INettyMessage {
2    public NettyNetMessageHead nettyNetMessageHead;
3    public NettyNetMessageBody nettyNetMessageBody;
4    /**
5     * 增加默认属性(附带逻辑调用需要的属性)
6     */
7    private final ConcurrentHashMap<Object, Object> attributes = new ConcurrentHashMap<Object, Object>(3);
8
9
10    public NettyNetMessageHead getNettyNetMessageHead() {
11        return nettyNetMessageHead;
12    }
13
14    public void setNettyNetMessageHead(NettyNetMessageHead nettyNetMessageHead) {
15        this.nettyNetMessageHead = nettyNetMessageHead;
16    }
17
18    public NettyNetMessageBody getNettyNetMessageBody() {
19        return nettyNetMessageBody;
20    }
21
22    public void setNettyNetMessageBody(NettyNetMessageBody nettyNetMessageBody) {
23        this.nettyNetMessageBody = nettyNetMessageBody;
24    }
25
26    public ConcurrentHashMap<Object, Object> getAttributes() {
27        return attributes;
28    }
29    public  Object getAttribute(Object key){
30        return attributes.get(key);
31    }
32    public  void remove(Object key){
33       attributes.remove(key);
34    }
35
36}
37
38

AbstractNettyNetMessage.java.来实现消息的基本操作。这样,我们的代码复用性得到大大的提高,不管是tcp消息还是其他的消息.
我们都可以继承这个类来进行扩展当然,我们最终的目的是为了进行google 的protobuf消息进行编解码。所以我们封装了另一个抽象类

AbstractNettyNetProtoBufMessage.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
1**
2 * Created by twjitm on 2017/11/15.
3 * 基础protobuf协议消息
4 */
5public abstract class AbstractNettyNetProtoBufMessage extends AbstractNettyNetMessage {
6
7    public AbstractNettyNetProtoBufMessage() {
8        setNettyNetMessageHead(new NettyNetMessageHead());
9        setNettyNetMessageBody(new NettyNetMessageBody());
10    }
11    @Override
12    public NettyNetMessageHead getNetMessageHead() {
13        return getNettyNetMessageHead();
14    }
15
16    @Override
17    public NettyNetMessageBody getNetMessageBody() {
18        return getNettyNetMessageBody();
19    }
20
21    protected void initHeadCommId() {
22        MessageCommandAnntation messageCommandAnntation = this.getClass().getAnnotation(MessageCommandAnntation.class);
23         if(messageCommandAnntation!=null){
24             getNetMessageHead().setCmd((short) messageCommandAnntation.messagecmd().commId);
25         }
26    }
27    /*释放message的body*/
28    public  void releaseMessageBody() throws CodecException, Exception{
29        getNetMessageBody().setBytes(null);
30    }
31
32    public abstract void release() throws CodecException;
33
34    public abstract  void encodeNetProtoBufMessageBody() throws CodecException, Exception;
35    public  abstract  void  decoderNetProtoBufMessageBody() throws  CodecException, Exception;
36
37    public void setSerial(int serial){
38        getNetMessageHead().setSerial(serial);
39    }
40
41
42
43
44
45}
46
47
48

这个抽象类 继承了上面的AbstractNettyNetMessage,所以有父类的方法。
关系图如图所示

最后我们封装成一个抽象的tcp消息或者udp消息等,如下代码就封装成一个tcp抽象消息。


1
2
3
4
5
6
7
8
9
10
11
1public abstract class AbstractNettyNetProtoBufTcpMessage extends AbstractNettyNetProtoBufMessage {
2    public AbstractNettyNetProtoBufTcpMessage() {
3        super();
4        setNettyNetMessageHead(new NettyNetMessageHead());
5        setNettyNetMessageBody(new NettyNetProtoBufMessageBody());
6        initHeadCommId();
7    }
8}
9
10
11

为了能够通过消息解耦的方式,我们通常采用自定义注解的形式来实现代理。在这就不介绍代理相关的知识了,还有反射等,
从代码中我们可以看到

一个tcp消息被创建的时候,就会执行initHeadCommId()方法,通过initHeadCommId()我们能够标记某个具体的消息上的注解的消息id,

t抽象消息类图结构.
我们通过自定义注解

MessageCommandAnntation.java 来注解一个消息id的枚举类


1
2
3
4
5
6
7
8
9
10
1/**
2 * 消息分离注解
3 */
4@Retention(RetentionPolicy.RUNTIME)
5@Documented
6public @interface MessageCommandAnntation {
7    MessageComm messagecmd();
8}
9
10

MessageComm.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1public enum MessageComm {
2    MESSAGE_TRUE_RETURN(0),
3    PUBLIC_CHART_MESSAGE(1),
4    PRIVATE_CHAT_MESSAGE(2),
5    PLAYER_LOGIN_MESSAGE(3),
6    PLAYER_LOGOUT_MESSAGE(4),
7    DELETE_CHAT_MESSAGE(5),
8    HEART_MESSAGE(6);
9
10    public int commId;
11
12    MessageComm(int commId) {
13        this.commId = commId;
14    }
15
16    public static int getVaule(MessageComm messageComm) {
17        return messageComm.commId;
18    }
19
20}
21
22

目前为止,我们只是为了实现编解码做好前提准备,下来我们就来看如何实现自定义编码器和解码器。

解码

同样,为了提高代码的复用,我们定义个一个抽象的解码工厂接口
INettyNetProtoBuffToMessageDecoderFactory.java


1
2
3
4
5
6
1public interface INettyNetProtoBuffToMessageDecoderFactory {
2    public AbstractNettyNetProtoBufMessage praseMessage(ByteBuf byteBuf);
3}
4
5
6

接口中就一个方法,将数据流解码成一个具体的消息实体。
然后我们将这个工厂类扩展为一个tcp协议解码工厂,
INettyNetProtoBuffTCPToMessageDecoderFactory.java ,


1
2
3
4
5
6
1@Service
2public interface INettyNetProtoBuffTCPToMessageDecoderFactory extends INettyNetProtoBuffToMessageDecoderFactory {
3}
4
5
6

虽然目前没有任何方法,为了方便后来的扩充做好了前提准备。最后我们真正来编写一个tcp消息解码器工厂的具体实现。NettyNetProtoBuffTCPToMessageDecoderFactory.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
1@Service
2public class NettyNetProtoBuffTCPToMessageDecoderFactory implements INettyNetProtoBuffTCPToMessageDecoderFactory {
3
4    @Override
5    public AbstractNettyNetProtoBufMessage praseMessage(ByteBuf byteBuf) {
6        NettyNetMessageHead netMessageHead=new NettyUDPMessageHead();
7        //跳过2个字节
8        byteBuf.skipBytes(2);
9        //消息长度
10        netMessageHead.setLength(byteBuf.readInt());
11        //版本号
12        netMessageHead.setVersion(byteBuf.readByte());
13        //read message context
14        //读取内容
15        short cmd = byteBuf.readShort();
16        //消息id
17        netMessageHead.setCmd(cmd);
18        //序列号
19        netMessageHead.setSerial(byteBuf.readInt());
20        //通过spring管理容器
21        MessageRegistryFactory registryFactory =SpringServiceManager.springLoadService.getMessageRegistryFactory();
22        AbstractNettyNetProtoBufMessage nettyMessage = registryFactory.get(cmd);
23        nettyMessage.setNettyNetMessageHead(netMessageHead);
24        NettyNetMessageBody nettyNetMessageBody=new NettyNetMessageBody();
25       //数据域大小
26        int byteLength = byteBuf.readableBytes();
27        ByteBuf bodyByteBuffer = Unpooled.buffer(256);
28        byte[] bytes = new byte[byteLength];
29        bodyByteBuffer = byteBuf.getBytes(byteBuf.readerIndex(), bytes);
30        //保存数据到数据域
31        nettyNetMessageBody.setBytes(bytes);
32        nettyMessage.setNettyNetMessageBody(nettyNetMessageBody);
33        try {
34            //提交给具体的消息解码
35            nettyMessage.decoderNetProtoBufMessageBody();
36        } catch (Exception e) {
37            e.printStackTrace();
38            nettyMessage.release();
39        }
40        return nettyMessage;
41    }
42
43
44}
45
46

入上述代码所示,在解码过程中我们利用到了spring整合Netty作为容器去管理一些bean对象,在这就不进行扩展描述了,

编码器

同样,为了提高代码的通用行,我们定义一个抽象的消息编码工厂接口INettyNetProtoBufMessageEncoderFactory.java


1
2
3
4
5
6
1public interface INettyNetProtoBufMessageEncoderFactory {
2    public ByteBuf createByteBuf(AbstractNettyNetProtoBufMessage netMessage) throws Exception;
3}
4
5
6

然后进行扩展,入进行tcp协议进行扩展,则编写一个INettyNetProtoBufTcpMessageEncoderFactory.java 的接口来继承这个接口。


1
2
3
4
5
6
7
8
1/**
2 * Created by twjitm on
3 */
4public interface INettyNetProtoBufTcpMessageEncoderFactory extends INettyNetProtoBufMessageEncoderFactory {
5
6}
7
8

同理,我们编译一个NettyNetProtoBufTcpMessageEncoderFactory.java
的实现类来实现消息编码。如下代码所示。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1@Service
2public class NettyNetProtoBufTcpMessageEncoderFactory implements INettyNetProtoBufTcpMessageEncoderFactory {
3
4    @Override
5    public ByteBuf createByteBuf(AbstractNettyNetProtoBufMessage netMessage) throws Exception {
6        ByteBuf byteBuf = Unpooled.buffer(256);
7        //编写head
8        NettyNetMessageHead netMessageHead = netMessage.getNetMessageHead();
9        byteBuf.writeShort(netMessageHead.getHead());
10        //长度
11        byteBuf.writeInt(netMessageHead.getLength());
12        //设置内容
13        byteBuf.writeByte(netMessageHead.getVersion());
14        //设置消息id
15        byteBuf.writeShort(netMessageHead.getCmd());
16        //设置系列号
17        byteBuf.writeInt(netMessageHead.getSerial());
18        //编写body
19        netMessage.encodeNetProtoBufMessageBody();
20        NettyNetMessageBody netMessageBody = netMessage.getNetMessageBody();
21        byteBuf.writeBytes(netMessageBody.getBytes());
22
23        //重新设置长度
24        int skip = 6;
25        int length = byteBuf.readableBytes() - skip;
26        byteBuf.setInt(2, length);
27        byteBuf.slice();
28        return byteBuf;
29    }
30}
31
32

到此编码器和解码器的对应工厂类编写完毕。接下来我们就要实现一个编码器和一个解码器了

不管是编码器还是解码器,我们都继承netty提供的消息转消息编解码器MessageToMessageDecoder

如下代码所示我们自定义的一个解码器。

NettyNetProtoBufMessageTCPDecoder.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1/**
2 *
3 * @author tjwitm
4 * @date 2017/11/16
5 */
6public class NettyNetProtoBufMessageTCPDecoder extends MessageToMessageDecoder<ByteBuf> {
7    Logger logger=LoggerUtils.getLogger(NettyNetProtoBufMessageTCPDecoder.class);
8    private final Charset charset;
9    private INettyNetProtoBuffTCPToMessageDecoderFactory iNettyNetProtoBuffTCPToMessageDecoderFactory;
10
11    public NettyNetProtoBufMessageTCPDecoder() {
12        this(CharsetUtil.UTF_8);
13    }
14
15    public NettyNetProtoBufMessageTCPDecoder(Class<? extends ByteBuf> inboundMessageType, Charset charset, INettyNetProtoBuffTCPToMessageDecoderFactory iNettyNetProtoBuffTCPToMessageDecoerFactory) {
16        super(inboundMessageType);
17        this.charset = charset;
18        this.iNettyNetProtoBuffTCPToMessageDecoderFactory = iNettyNetProtoBuffTCPToMessageDecoerFactory;
19    }
20
21    public NettyNetProtoBufMessageTCPDecoder(Charset charset) {
22        if (charset == null) {
23            throw new NullPointerException("charset");
24        }
25        this.charset = charset;
26        iNettyNetProtoBuffTCPToMessageDecoderFactory = SpringServiceManager.springLoadService.getNettyNetProtoBuffTCPToMessageDecoderFactory();
27    }
28
29    @Override
30    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
31        if (iNettyNetProtoBuffTCPToMessageDecoderFactory == null) {
32            logger.error("iNettyNetProtoBuffTCPToMessageDecoderFactory  is null ");
33        } else {
34            out.add(iNettyNetProtoBuffTCPToMessageDecoderFactory.praseMessage(msg));
35
36        }
37    }
38}
39
40

重写了decode方法,利用我们自定义的解码工厂来进行解码。。

同样的道理,我们来看编码器的编写

NettyNetProtoBufMessageTCPEncoder.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1public class NettyNetProtoBufMessageTCPEncoder extends MessageToMessageEncoder<AbstractNettyNetProtoBufMessage> {
2
3    private final Charset charset;
4
5    private INettyNetProtoBufTcpMessageEncoderFactory iNetMessageEncoderFactory;
6
7    public NettyNetProtoBufMessageTCPEncoder() {
8        this(CharsetUtil.UTF_8);
9        this.iNetMessageEncoderFactory = SpringServiceManager.springLoadService.getNettyNetProtoBufTcpMessageEncoderFactory();
10    }
11
12    public NettyNetProtoBufMessageTCPEncoder(Charset charset) {
13        if(charset == null) {
14            throw new NullPointerException("charset");
15        } else {
16            this.charset = charset;
17        }
18    }
19
20    @Override
21    protected void encode(ChannelHandlerContext ctx, AbstractNettyNetProtoBufMessage msg, List<Object> out) throws Exception {
22        ByteBuf netMessageBuf = iNetMessageEncoderFactory.createByteBuf(msg);
23        out.add(netMessageBuf);
24    }
25}
26
27
28

细心的小伙伴可能会发现这一次我们的编码器是继承了MessageToMessageEncoder。当然,这也是netty提供的
编码器。重写encode方法,利用我们自定义的编码器工厂类来进行编码。

到此,netty的自定义编码器和解码器的编写介绍完毕,介于代码比较多,我们利用tcp协议来进行描述,下面我们就来看看
编写实例进行测试吧。

我们编写一个服务器引导程序StartTcpService和编写一个客户端引导实例。启动服务器,启动客户端就可以进行传输
想要看效果的小伙伴自己下载源码。

中间我们还有很多没有介绍,由于篇幅限定,我们将进行下一篇描述《Netty实战开发(4):Netty整合spring来管理bean对象》

给TA打赏
共{{data.count}}人
人已打赏
安全技术

用node.js做cluster,监听异常的邮件提醒服务

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索