1、Netty的心跳,不像Mina,Mina有个心跳基类,而Netty没有,Netty的心跳也是继承ChannelInboundHandlerAdapter重写channelRead;
以下代码实现:服务端30读空闲,则给客户端发送‘+’,客户端收到后,回'-',如果服务端连续发送3次还是未收到‘-’,则断开连接
1 2
| 1TcpServerInitializer 中主要看:
2 |
1 2 3
| 1.addLast("ping", new IdleStateHandler(NettyConstants.READERIDLE_TIMESECONDS, 0, 0, TimeUnit.SECONDS))
2.addLast("heartbeatHandler", new HeartbeatHandler_Read())
3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| 1package com.sintech.initializer;
2
3import java.util.concurrent.TimeUnit;
4
5import com.sintech.handler.TcpInHandler;
6import com.sintech.handler.TcpOutHandler;
7import com.sintech.handler.keepalive.HeartbeatHandler_Read;
8import com.sintech.message.codec.MessageDecoder;
9import com.sintech.message.codec.MessageEncoder;
10import com.sintech.utils.NettyConstants;
11
12import io.netty.channel.ChannelInitializer;
13import io.netty.channel.socket.SocketChannel;
14import io.netty.handler.codec.string.StringDecoder;
15import io.netty.handler.codec.string.StringEncoder;
16import io.netty.handler.logging.LogLevel;
17import io.netty.handler.logging.LoggingHandler;
18import io.netty.handler.timeout.IdleStateHandler;
19import io.netty.util.CharsetUtil;
20
21public class TcpServerInitializer extends ChannelInitializer<SocketChannel> {
22
23 @Override
24 protected void initChannel(SocketChannel ch) throws Exception {
25 ch.pipeline()
26 //.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())))
27 .addLast(new LoggingHandler(LogLevel.DEBUG))
28 // 30秒idle,如果连续3次idle,则断开连接
29 .addLast("ping", new IdleStateHandler(NettyConstants.READERIDLE_TIMESECONDS, 0, 0, TimeUnit.SECONDS))
30 .addLast("heartbeatHandler", new HeartbeatHandler_Read())
31 // .addLast(new StringEncoder(CharsetUtil.UTF_8))
32 // .addLast(new StringDecoder(CharsetUtil.UTF_8))
33 .addLast(new MessageDecoder())
34 .addLast(new MessageEncoder())
35 .addLast("inHandler", new TcpInHandler())
36 .addLast("outHandler", new TcpOutHandler())
37 ;
38 }
39
40}
41
42 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| 1package com.sintech.handler.keepalive;
2
3import com.sintech.utils.NettyConstants;
4import com.sintech.utils.NettyLogger;
5
6import io.netty.buffer.ByteBuf;
7import io.netty.buffer.Unpooled;
8import io.netty.channel.ChannelHandlerContext;
9import io.netty.channel.ChannelInboundHandlerAdapter;
10import io.netty.channel.ChannelHandler.Sharable;
11import io.netty.channel.socket.DatagramPacket;
12import io.netty.handler.timeout.IdleState;
13import io.netty.handler.timeout.IdleStateEvent;
14
15
16@Sharable
17public class HeartbeatHandler_Read extends ChannelInboundHandlerAdapter {
18
19 // 心跳失败计数器:未收到client端发送的ping请求
20 private int unRecPingTimes = 0 ;
21
22 // 定义服务端没有收到心跳消息的最大次数
23 private static final int MAX_UN_REC_PING_TIMES = 3;
24
25 @Override
26 public void channelRead(ChannelHandlerContext ctx, Object msg)
27 throws Exception {
28 unRecPingTimes = 0;
29 if(msg instanceof ByteBuf) {
30 ByteBuf bb = (ByteBuf)msg;
31 if(bb.readableBytes() == 1 && bb.getByte(0) == NettyConstants.PING) {
32 ByteBuf b = ctx.alloc().buffer(1);
33 // b.writeBytes(NettyConstants.PONG);
34 b.writeByte(NettyConstants.PONG);
35 ctx.writeAndFlush(b);
36 bb.release();
37 } else {
38 super.channelRead(ctx, msg);
39 }
40 } else if (msg instanceof DatagramPacket) {
41 DatagramPacket dp = (DatagramPacket) msg;
42
43 ByteBuf bb = (ByteBuf) dp.copy().content();
44 if(bb.readableBytes() == 1 && bb.getByte(0) == NettyConstants.PING) {
45 ByteBuf b = ctx.alloc().buffer(1);
46 // b.writeBytes(PONG);
47 b.writeByte(NettyConstants.PONG);
48 DatagramPacket dp2 = new DatagramPacket(b, dp.sender());
49 ctx.writeAndFlush(dp2);
50 dp.release();
51 } else {
52 super.channelRead(ctx, msg);
53 }
54 } else {
55 super.channelRead(ctx, msg);
56 }
57
58 }
59
60
61 @Override
62 public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
63 throws Exception {
64 if (evt instanceof IdleStateEvent) {
65 IdleState state = ((IdleStateEvent) evt).state();
66 if (state == IdleState.READER_IDLE) {
67 unRecPingTimes++;
68 NettyLogger.getInstance().logInfo("第" + unRecPingTimes + "次IDLE", null);
69 if(unRecPingTimes >= MAX_UN_REC_PING_TIMES) {
70 ctx.close();
71 }
72
73 }
74 } else {
75 super.userEventTriggered(ctx, evt);
76 }
77 }
78}
79
80 |