netty拆包粘包

释放双眼,带上耳机,听听看~!

tcp拆包粘包的方案主要有三种:

拆包粘包方案:

1、消息定长,不足用空格补齐(手工补齐,不会自动补齐)

2、在包尾增加特殊字符进行分割,强调是在包尾加。一旦采用此种拆解接收到的消息,那么必须按照这种方式发送,否则消息是接收不到的。

3、将消息分为消息头和消息体,在消息头中包含表示消息长度的字段,然后进行业务逻辑处理。

**package **com.aowin.netty.chaibaoAndNianbao;

**import **io.netty.bootstrap.Bootstrap;

**import **io.netty.buffer.ByteBuf;

**import **io.netty.buffer.Unpooled;

**import **io.netty.channel.ChannelFuture;

**import **io.netty.channel.ChannelInitializer;

**import **io.netty.channel.EventLoopGroup;

**import **io.netty.channel.nio.NioEventLoopGroup;

**import **io.netty.channel.socket.SocketChannel;

**import **io.netty.channel.socket.nio.NioServerSocketChannel;

**import **io.netty.channel.socket.nio.NioSocketChannel;

**import **io.netty.handler.codec.DelimiterBasedFrameDecoder;

**import **io.netty.handler.codec.FixedLengthFrameDecoder;

**import **io.netty.handler.codec.string.StringDecoder;

**
**public class **Client {
**public void **connect() {
EventLoopGroup workGroup = 
**new **NioEventLoopGroup();
Bootstrap bootstrap = 
**new **Bootstrap();
bootstrap.group(workGroup).channel(NioSocketChannel.
class)
.handler(
**new **ChannelInitializer<SocketChannel>() {
**protected void **initChannel(SocketChannel socketChannel) 
**throws *Exception {
/*// *
方式一:包尾添加特殊字符
**
*                        //

设置特殊分割符
**
*                        ByteBuf f=Unpooled.copiedBuffer("$_".getBytes());
*
*                        socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,f) );*/
*
*                       //

方式二 固定长度 关键类
*FixedLengthFrameDecoder *
*每一个固定长度算作一段消息
*
*                       * socketChannel.pipeline().addLast(
**new *FixedLengthFrameDecoder(
5));
//5
代表长度
* 5

*个字节
*
*                       * socketChannel.pipeline().addLast(
**new **StringDecoder());
socketChannel.pipeline().addLast(
**new **ClientHandler());
}
});
**try **{
ChannelFuture f = bootstrap.connect(
"127.0.0.1"
8765).sync();
// f.channel().writeAndFlush(Unpooled.copiedBuffer("adbcefg".getBytes()));//
超过
5
个,其实只能收到
*adbce *
,后面的是收不到的,因为不足
5
*个长度
*
*               *
//f.channel().writeAndFlush(Unpooled.copiedBuffer("adb".getBytes()));//
不足
5
*个长度,这个是收不到的,
*
*               * f.channel().writeAndFlush(Unpooled.
copiedBuffer(
"adb   ".getBytes()));
//
补足
5
*个长度,能收到
*
*                 *f.channel().closeFuture().sync();
workGroup.shutdownGracefully();

**catch **(InterruptedException e) {
e.printStackTrace();
}
}
**public static void **main(String[] args) {
Client c1=
**new **Client();
c1.connect();

    }

}

ClientHandler

**package **com.aowin.netty.chaibaoAndNianbao;

**import **com.aowin.netty.hello.ClientHanler;

**import **io.netty.bootstrap.Bootstrap;

**import **io.netty.buffer.Unpooled;

**import **io.netty.channel.ChannelHandlerAdapter;

**import **io.netty.channel.ChannelHandlerContext;

**import **io.netty.channel.ChannelInitializer;

**import **io.netty.channel.EventLoopGroup;

**import **io.netty.channel.nio.NioEventLoopGroup;

**import **io.netty.channel.socket.SocketChannel;

**import **io.netty.channel.socket.nio.NioServerSocketChannel;

**import **io.netty.handler.codec.string.StringDecoder;

**import **org.apache.commons.logging.Log;

**import **org.apache.commons.logging.LogFactory;

**
**public class **ClientHandler 
**extends **ChannelHandlerAdapter {
**private **Log 
logger= LogFactory.
getLog(ClientHanler.
class);
@Override

   
**public void **exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
**throws **Exception {
ctx.close();
}
@Override

   
**public void **channelActive(ChannelHandlerContext ctx) 
**throws **Exception {
}
@Override

   
**public void **channelRead(ChannelHandlerContext ctx, Object msg) 
**throws **Exception {
String rev_msg= (String) msg;
logger.info(
"
客户端收到的消息
///"+rev_msg);
}
@Override

   
**public void **channelReadComplete(ChannelHandlerContext ctx) 
**throws **Exception {
}
}

Server

**package **com.aowin.netty.chaibaoAndNianbao;

**import **io.netty.bootstrap.ServerBootstrap;

**import **io.netty.buffer.ByteBuf;

**import **io.netty.buffer.PooledByteBufAllocator;

**import **io.netty.buffer.Unpooled;

**import **io.netty.channel.ChannelFuture;

**import **io.netty.channel.ChannelInitializer;

**import **io.netty.channel.ChannelOption;

**import **io.netty.channel.EventLoopGroup;

**import **io.netty.channel.nio.NioEventLoopGroup;

**import **io.netty.channel.socket.SocketChannel;

**import **io.netty.channel.socket.nio.NioServerSocketChannel;

**import **io.netty.handler.codec.DelimiterBasedFrameDecoder;

**import **io.netty.handler.codec.FixedLengthFrameDecoder;

**import **io.netty.handler.codec.string.StringDecoder;

**import **java.nio.Buffer;

**
**public class **Server {
**public void **bind() {
EventLoopGroup boosGroup = 
**new **NioEventLoopGroup();
EventLoopGroup workGroup = 
**new **NioEventLoopGroup();
ServerBootstrap bootstrap = 
**new **ServerBootstrap();
bootstrap.group(boosGroup, workGroup)
.channel(NioServerSocketChannel.
class)
.option(ChannelOption.
SO_BACKLOG
10)
.option(ChannelOption.
CONNECT_TIMEOUT_MILLIS,
2000)
.childHandler(
**new **ChannelInitializer<SocketChannel>() {
**protected void **initChannel(SocketChannel socketChannel) 
*throws Exception {
/*
*
*                       //

方式一:包尾添加特殊字符
**
*                        //

设置拆包粘包特殊字符 关键类
* DelimiterBasedFrameDecoder
*
*                        //

当客户端发来的消息每出现一个
“$_”
就算是一段消息
**
*                        ByteBuf f = Unpooled.copiedBuffer("$_".getBytes());
*
*                        socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, f));//1024

指的是分隔符的长度上限
*/
*
*                        //

*设置字符串形式的编解码
*
**
*                       *
//
*方式二:固定长度
*
*                       * socketChannel.pipeline().addLast(
**new **FixedLengthFrameDecoder(
5));
socketChannel.pipeline().addLast(
**new **StringDecoder());
socketChannel.pipeline().addLast(
**new **com.aowin.netty.chaibaoAndNianbao.ServerHandler());
}
});
**try **{
ChannelFuture future = bootstrap.bind(
"127.0.0.1"
8765).sync();
future.channel().closeFuture().sync();
boosGroup.shutdownGracefully();
workGroup.shutdownGracefully();

**catch **(InterruptedException e) {
e.printStackTrace();
}
}
**public static void **main(String[] args) {
Server s1 = 
**new **Server();
s1.bind();
}

}

ServerHandler

**package **com.aowin.netty.chaibaoAndNianbao;

**import **io.netty.buffer.Unpooled;

**import **io.netty.channel.ChannelFuture;

**import **io.netty.channel.ChannelFutureListener;

**import **io.netty.channel.ChannelHandlerAdapter;

**import **io.netty.channel.ChannelHandlerContext;

**import **org.apache.commons.logging.Log;

**import **org.apache.commons.logging.LogFactory;

**public class **ServerHandler 
**extends **ChannelHandlerAdapter {
**private **Log 
**logger **= LogFactory.
getLog(ServerHandler.
class);
@Override

   
**public void **exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
**throws **Exception {
logger.error(
"
服务端异常
", cause);
ctx.close();
}
@Override

   
**public void **channelActive(ChannelHandlerContext ctx) 
**throws **Exception {
logger.info(
"
服务端激活
");
}
@Override

   
**public void **channelRead(ChannelHandlerContext ctx, Object msg) 
**throws **Exception {
logger.info(
"
服务端开始读数据
");
String rev_msg = (String) msg;
logger.info(rev_msg);
}
@Override

   
**public void **channelReadComplete(
**final **ChannelHandlerContext ctx) 
**throws **Exception {
logger.info(
"
服务端读完数据
");
ChannelFuture cf = ctx.writeAndFlush(Unpooled.
copiedBuffer(
"adbcefg".getBytes()));
cf.addListener(
**new **ChannelFutureListener() {
**public void **operationComplete(ChannelFuture channelFuture) 
**throws **Exception {
logger.info(
"
客户端收到消息
");
ctx.close();
}
});
}
}

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

气候知识

台风来了怎么办?

2011-7-16 22:36:33

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索