tcp拆包粘包的方案主要有三种:
拆包粘包方案:
1、消息定长,不足用空格补齐(手工补齐,不会自动补齐)
2、在包尾增加特殊字符进行分割,强调是在包尾加。一旦采用此种拆解接收到的消息,那么必须按照这种方式发送,否则消息是接收不到的。
3、将消息分为消息头和消息体,在消息头中包含表示消息长度的字段,然后进行业务逻辑处理。
**package **com.aowin.netty.chaibaoAndNianbao;
**import **io.netty.bootstrap.Bootstrap;
**import **io.netty.buffer.ByteBuf;
**import **io.netty.buffer.Unpooled;
**import **io.netty.channel.ChannelFuture;
**import **io.netty.channel.ChannelInitializer;
**import **io.netty.channel.EventLoopGroup;
**import **io.netty.channel.nio.NioEventLoopGroup;
**import **io.netty.channel.socket.SocketChannel;
**import **io.netty.channel.socket.nio.NioServerSocketChannel;
**import **io.netty.channel.socket.nio.NioSocketChannel;
**import **io.netty.handler.codec.DelimiterBasedFrameDecoder;
**import **io.netty.handler.codec.FixedLengthFrameDecoder;
**import **io.netty.handler.codec.string.StringDecoder;
**
**public class **Client {
**public void **connect() {
EventLoopGroup workGroup =
**new **NioEventLoopGroup();
Bootstrap bootstrap =
**new **Bootstrap();
bootstrap.group(workGroup).channel(NioSocketChannel.
class)
.handler(
**new **ChannelInitializer<SocketChannel>() {
**protected void **initChannel(SocketChannel socketChannel)
**throws *Exception {
/*// *
方式一:包尾添加特殊字符
**
* //
设置特殊分割符
**
* ByteBuf f=Unpooled.copiedBuffer("$_".getBytes());
*
* socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,f) );*/
*
* //
方式二 固定长度 关键类
*FixedLengthFrameDecoder *
*每一个固定长度算作一段消息
*
* * socketChannel.pipeline().addLast(
**new *FixedLengthFrameDecoder(
5));
//5
代表长度
* 5
*个字节
*
* * socketChannel.pipeline().addLast(
**new **StringDecoder());
socketChannel.pipeline().addLast(
**new **ClientHandler());
}
});
**try **{
ChannelFuture f = bootstrap.connect(
"127.0.0.1",
8765).sync();
// f.channel().writeAndFlush(Unpooled.copiedBuffer("adbcefg".getBytes()));//
超过
5
个,其实只能收到
*adbce *
,后面的是收不到的,因为不足
5
*个长度
*
* *
//f.channel().writeAndFlush(Unpooled.copiedBuffer("adb".getBytes()));//
不足
5
*个长度,这个是收不到的,
*
* * f.channel().writeAndFlush(Unpooled.
copiedBuffer(
"adb ".getBytes()));
//
补足
5
*个长度,能收到
*
* *f.channel().closeFuture().sync();
workGroup.shutdownGracefully();
}
**catch **(InterruptedException e) {
e.printStackTrace();
}
}
**public static void **main(String[] args) {
Client c1=
**new **Client();
c1.connect();
}
}
ClientHandler
**package **com.aowin.netty.chaibaoAndNianbao;
**import **com.aowin.netty.hello.ClientHanler;
**import **io.netty.bootstrap.Bootstrap;
**import **io.netty.buffer.Unpooled;
**import **io.netty.channel.ChannelHandlerAdapter;
**import **io.netty.channel.ChannelHandlerContext;
**import **io.netty.channel.ChannelInitializer;
**import **io.netty.channel.EventLoopGroup;
**import **io.netty.channel.nio.NioEventLoopGroup;
**import **io.netty.channel.socket.SocketChannel;
**import **io.netty.channel.socket.nio.NioServerSocketChannel;
**import **io.netty.handler.codec.string.StringDecoder;
**import **org.apache.commons.logging.Log;
**import **org.apache.commons.logging.LogFactory;
**
**public class **ClientHandler
**extends **ChannelHandlerAdapter {
**private **Log
logger= LogFactory.
getLog(ClientHanler.
class);
@Override
**public void **exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
**throws **Exception {
ctx.close();
}
@Override
**public void **channelActive(ChannelHandlerContext ctx)
**throws **Exception {
}
@Override
**public void **channelRead(ChannelHandlerContext ctx, Object msg)
**throws **Exception {
String rev_msg= (String) msg;
logger.info(
"
客户端收到的消息
///"+rev_msg);
}
@Override
**public void **channelReadComplete(ChannelHandlerContext ctx)
**throws **Exception {
}
}
Server
**package **com.aowin.netty.chaibaoAndNianbao;
**import **io.netty.bootstrap.ServerBootstrap;
**import **io.netty.buffer.ByteBuf;
**import **io.netty.buffer.PooledByteBufAllocator;
**import **io.netty.buffer.Unpooled;
**import **io.netty.channel.ChannelFuture;
**import **io.netty.channel.ChannelInitializer;
**import **io.netty.channel.ChannelOption;
**import **io.netty.channel.EventLoopGroup;
**import **io.netty.channel.nio.NioEventLoopGroup;
**import **io.netty.channel.socket.SocketChannel;
**import **io.netty.channel.socket.nio.NioServerSocketChannel;
**import **io.netty.handler.codec.DelimiterBasedFrameDecoder;
**import **io.netty.handler.codec.FixedLengthFrameDecoder;
**import **io.netty.handler.codec.string.StringDecoder;
**import **java.nio.Buffer;
**
**public class **Server {
**public void **bind() {
EventLoopGroup boosGroup =
**new **NioEventLoopGroup();
EventLoopGroup workGroup =
**new **NioEventLoopGroup();
ServerBootstrap bootstrap =
**new **ServerBootstrap();
bootstrap.group(boosGroup, workGroup)
.channel(NioServerSocketChannel.
class)
.option(ChannelOption.
SO_BACKLOG,
10)
.option(ChannelOption.
CONNECT_TIMEOUT_MILLIS,
2000)
.childHandler(
**new **ChannelInitializer<SocketChannel>() {
**protected void **initChannel(SocketChannel socketChannel)
*throws Exception {
/*
*
* //
方式一:包尾添加特殊字符
**
* //
设置拆包粘包特殊字符 关键类
* DelimiterBasedFrameDecoder
*
* //
当客户端发来的消息每出现一个
“$_”
就算是一段消息
**
* ByteBuf f = Unpooled.copiedBuffer("$_".getBytes());
*
* socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, f));//1024
指的是分隔符的长度上限
*/
*
* //
*设置字符串形式的编解码
*
**
* *
//
*方式二:固定长度
*
* * socketChannel.pipeline().addLast(
**new **FixedLengthFrameDecoder(
5));
socketChannel.pipeline().addLast(
**new **StringDecoder());
socketChannel.pipeline().addLast(
**new **com.aowin.netty.chaibaoAndNianbao.ServerHandler());
}
});
**try **{
ChannelFuture future = bootstrap.bind(
"127.0.0.1",
8765).sync();
future.channel().closeFuture().sync();
boosGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
**catch **(InterruptedException e) {
e.printStackTrace();
}
}
**public static void **main(String[] args) {
Server s1 =
**new **Server();
s1.bind();
}
}
ServerHandler
**package **com.aowin.netty.chaibaoAndNianbao;
**import **io.netty.buffer.Unpooled;
**import **io.netty.channel.ChannelFuture;
**import **io.netty.channel.ChannelFutureListener;
**import **io.netty.channel.ChannelHandlerAdapter;
**import **io.netty.channel.ChannelHandlerContext;
**import **org.apache.commons.logging.Log;
**import **org.apache.commons.logging.LogFactory;
**public class **ServerHandler
**extends **ChannelHandlerAdapter {
**private **Log
**logger **= LogFactory.
getLog(ServerHandler.
class);
@Override
**public void **exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
**throws **Exception {
logger.error(
"
服务端异常
", cause);
ctx.close();
}
@Override
**public void **channelActive(ChannelHandlerContext ctx)
**throws **Exception {
logger.info(
"
服务端激活
");
}
@Override
**public void **channelRead(ChannelHandlerContext ctx, Object msg)
**throws **Exception {
logger.info(
"
服务端开始读数据
");
String rev_msg = (String) msg;
logger.info(rev_msg);
}
@Override
**public void **channelReadComplete(
**final **ChannelHandlerContext ctx)
**throws **Exception {
logger.info(
"
服务端读完数据
");
ChannelFuture cf = ctx.writeAndFlush(Unpooled.
copiedBuffer(
"adbcefg".getBytes()));
cf.addListener(
**new **ChannelFutureListener() {
**public void **operationComplete(ChannelFuture channelFuture)
**throws **Exception {
logger.info(
"
客户端收到消息
");
ctx.close();
}
});
}
}