Netty.心跳

释放双眼,带上耳机,听听看~!

1、Netty的心跳,不像Mina,Mina有个心跳基类,而Netty没有,Netty的心跳也是继承ChannelInboundHandlerAdapter重写channelRead;

以下代码实现:服务端30读空闲,则给客户端发送‘+’,客户端收到后,回'-',如果服务端连续发送3次还是未收到‘-’,则断开连接


1
2
1TcpServerInitializer 中主要看:
2

1
2
3
1.addLast("ping", new IdleStateHandler(NettyConstants.READERIDLE_TIMESECONDS, 0, 0, TimeUnit.SECONDS))
2.addLast("heartbeatHandler", new HeartbeatHandler_Read())
3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
1package com.sintech.initializer;
2
3import java.util.concurrent.TimeUnit;
4
5import com.sintech.handler.TcpInHandler;
6import com.sintech.handler.TcpOutHandler;
7import com.sintech.handler.keepalive.HeartbeatHandler_Read;
8import com.sintech.message.codec.MessageDecoder;
9import com.sintech.message.codec.MessageEncoder;
10import com.sintech.utils.NettyConstants;
11
12import io.netty.channel.ChannelInitializer;
13import io.netty.channel.socket.SocketChannel;
14import io.netty.handler.codec.string.StringDecoder;
15import io.netty.handler.codec.string.StringEncoder;
16import io.netty.handler.logging.LogLevel;
17import io.netty.handler.logging.LoggingHandler;
18import io.netty.handler.timeout.IdleStateHandler;
19import io.netty.util.CharsetUtil;
20
21public class TcpServerInitializer extends ChannelInitializer<SocketChannel> {
22 
23  @Override
24  protected void initChannel(SocketChannel ch) throws Exception {
25      ch.pipeline()
26      //.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())))
27      .addLast(new LoggingHandler(LogLevel.DEBUG))
28      // 30秒idle,如果连续3次idle,则断开连接
29      .addLast("ping", new IdleStateHandler(NettyConstants.READERIDLE_TIMESECONDS, 0, 0, TimeUnit.SECONDS))
30      .addLast("heartbeatHandler", new HeartbeatHandler_Read())
31      // .addLast(new StringEncoder(CharsetUtil.UTF_8))
32      // .addLast(new StringDecoder(CharsetUtil.UTF_8))
33      .addLast(new MessageDecoder())
34      .addLast(new MessageEncoder())
35      .addLast("inHandler", new TcpInHandler())
36      .addLast("outHandler", new TcpOutHandler())
37      ;
38  }
39
40}
41
42

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
1package com.sintech.handler.keepalive;
2
3import com.sintech.utils.NettyConstants;
4import com.sintech.utils.NettyLogger;
5
6import io.netty.buffer.ByteBuf;
7import io.netty.buffer.Unpooled;
8import io.netty.channel.ChannelHandlerContext;
9import io.netty.channel.ChannelInboundHandlerAdapter;
10import io.netty.channel.ChannelHandler.Sharable;
11import io.netty.channel.socket.DatagramPacket;
12import io.netty.handler.timeout.IdleState;
13import io.netty.handler.timeout.IdleStateEvent;
14
15
16@Sharable
17public class HeartbeatHandler_Read extends ChannelInboundHandlerAdapter {
18 
19  // 心跳失败计数器:未收到client端发送的ping请求
20    private int unRecPingTimes = 0 ;
21    
22    // 定义服务端没有收到心跳消息的最大次数
23    private static final int MAX_UN_REC_PING_TIMES = 3;
24    
25  @Override
26  public void channelRead(ChannelHandlerContext ctx, Object msg)
27          throws Exception {
28      unRecPingTimes = 0;
29      if(msg instanceof ByteBuf) {
30          ByteBuf bb = (ByteBuf)msg;
31          if(bb.readableBytes() == 1 && bb.getByte(0) == NettyConstants.PING) {
32              ByteBuf b = ctx.alloc().buffer(1);
33              // b.writeBytes(NettyConstants.PONG);
34              b.writeByte(NettyConstants.PONG);
35              ctx.writeAndFlush(b);
36              bb.release();
37          } else {
38              super.channelRead(ctx, msg);
39          }
40      } else if (msg instanceof DatagramPacket) {
41          DatagramPacket dp = (DatagramPacket) msg;
42         
43          ByteBuf bb = (ByteBuf) dp.copy().content();
44            if(bb.readableBytes() == 1 && bb.getByte(0) == NettyConstants.PING) {
45              ByteBuf b = ctx.alloc().buffer(1);
46              // b.writeBytes(PONG);
47              b.writeByte(NettyConstants.PONG);
48              DatagramPacket dp2 = new DatagramPacket(b, dp.sender());
49              ctx.writeAndFlush(dp2);
50              dp.release();
51          } else {
52              super.channelRead(ctx, msg);
53          }
54      } else {
55          super.channelRead(ctx, msg);
56      }
57     
58  }
59 
60 
61  @Override
62  public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
63          throws Exception {
64      if (evt instanceof IdleStateEvent) {  
65            IdleState state = ((IdleStateEvent) evt).state();  
66            if (state == IdleState.READER_IDLE) {
67              unRecPingTimes++;
68              NettyLogger.getInstance().logInfo("第" + unRecPingTimes + "次IDLE", null);
69              if(unRecPingTimes >= MAX_UN_REC_PING_TIMES) {
70                  ctx.close();
71              }
72                
73            }  
74        } else {  
75            super.userEventTriggered(ctx, evt);  
76        }  
77  }
78}
79
80

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

气候事件

1510号台风“莲花”继续向台湾逼近

2015-7-7 8:56:33

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索