网络数据的基本单位总是字节。Java NIO提供了ByteBuffer作为它的字节容器,但是这个类使用起来过于复杂,而且也有些繁琐。Netty的ByteBuffer替代品是ByteBuf,一个强大的实现,既解决了JDK API的局限性,又为网络应用程序的开发者提供了更好的API。
Netty的数据处理API通过两个组件暴露——abstract class ByteBuf和interface ByteBufHolder。
它的优点有:
- 它可以被用户自定义的缓冲区类型扩展;
- 通过内置的复合缓冲区类型实现了透明的零拷贝;
- 容量可以按需增长(类似于JDK的StringBuilder);
- 在读和写这两种模式之间切换不需要调用ByteBuffer的flip()方法;
- 读和写使用了不同的索引;
- 支持方法的链式调用;
- 支持引用计数;
- 支持池化。
ByteBuf类——Netty的数据容器
工作方式
ByteBuf维护了两个不同的索引:一个用于读取,一个用于写入。当你从ByteBuf读取时,它的readerIndex将会被递增已经被读取的字节数。同样地,当你写入ByteBuf时,它的writerIndex也会被递增。
如果打算读取字节直到readerIndex达到和writerIndex同样的值时,你将会得到“可以读取的”数据的末尾。就如同试图读取超出数组末尾的数据一样,试图读取超出该点的数据将会触发一个IndexOutOfBoundsException。
名称以read或者write开头的ByteBuf方法,将会推进其对应的索引,而名称以set或者get开头的操作则不会。后面的这些方法将在作为一个参数传入的一个相对索引上执行操作。可以指定ByteBuf的最大容量。试图移动写索引(即writerIndex)超过这个 值将会触发一个异常。(默认的限制是Integer.MAX_VALUE。)
ByteBuf的使用模式
堆缓冲区
最常用的ByteBuf模式是将数据存储在JVM的堆空间中。这种模式被称为支撑数组(backing array),它能 在没有使用池化的情况下提供快速的分配和释放。
适用于有遗留数据需要处理的情况:
1
2
3
4
5
6
7
8
9 1 ByteBuf buf = (ByteBuf) msg;
2 if(buf.hasArray()){
3 byte[] array=buf.array();//返回该缓冲区的备份字节数组。
4 int offset=buf.arrayOffset()+buf.readerIndex();//计算第一个字节的偏移量
5 int length=buf.readableBytes();//获取可读字节数
6 String s=new String(array,offset,length);
7 System.out.println("s="+s);
8 }
9
当hasArray()方法返回false时,尝试访问支撑数组会触发一个UnsupportedOperationException,这个模式类似于JDK的ByteBuffer的用法。
直接缓冲区
直接缓冲区是另外一种ByteBuf模式。我们期望用于对象创建的内存分配永远都来自于堆中,但这并不是必须的——NIO在JDK 1.4中引入ByteBuffer类允许JVM实现通过本地调用来分配内存。这主要是为了避免在每次调用本地I/O操作之前(或者之后)将缓冲区的内容复制到一个中间缓冲区(或者从中间缓冲区把内容复制到缓冲区)。
**直接缓冲区的内容将驻留在常规的会被垃圾回收的堆之外。**这也就解释了为何直接缓冲区对于网络数据传输是理想的选择。如果你的数据包含在一个在堆上分配的缓冲区中,那么事实上,在通过套接字发送它之前,JVM将会在内部把你的缓冲区复制到一个直接缓冲区中。
直接缓冲区的主要缺点是,相对于基于堆的缓冲区,它们的分配和释放都较为昂贵。如果你正在处理遗留代码,你也可能会遇到另外一个缺点:因为数据不是在堆上,所以你不得不进行一次复制。
1
2
3
4
5 1 byte[] array = new byte[buf.readableBytes()];//获取可读字节数并分配一个新的数组来保存
2 buf.getBytes(buf.readerIndex(),array);//将字节复制到该数组
3 String s=new String(array,0,buf.readableBytes());
4 System.out.println("直接缓冲区:"+s);
5
复合缓冲区
第三种也是最后一种模式使用的是复合缓冲区,它为多个ByteBuf提供一个聚合视图。在这里你可以根据需要添加或者删除ByteBuf实例,这是一个JDK的ByteBuffer实现完全缺失的特性。
Netty通过一个ByteBuf子类——CompositeByteBuf——实现了这个式,它提供了一个将多个缓冲区表示为单个合并缓冲区的虚拟表示。
警告
CompositeByteBuf中的ByteBuf实例可能同时包含直接内存分配和非直接内存分配。如果其中只有一个实例,那么对CompositeByteBuf上的hasArray()方法的调用将返回该组件上的hasArray()方法的值;否则它将返回false。
假如此时一个消息分为两部分,头部和主体,这两部分由应用程序的不同模块产生,将会在消息被发送的时候组装。该应用程序可以选择为多个消息重用相同的消息主体。当这种情况 发生时,对于每个消息都将会创建一个新的头部。因为我们不想为每个消息都重新分配这两个缓冲区,所以使用CompositeByteBuf是一个完美的选择。它在消除了没必要的复制的同时,暴露了通用的ByteBuf API。
使用CompositeByteBuf的复合缓冲区模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1 public void channelActive(ChannelHandlerContext ctx) throws Exception {
2 CompositeByteBuf messageBuf=Unpooled.compositeBuffer();
3 ByteBuf headerBuf=buf;
4 ByteBuf bodyBuf=buf;
5 messageBuf.addComponent(bodyBuf);//将ByteBuf实例追加到CompositeByteBuf
6 messageBuf.addComponent(headerBuf);
7 for (ByteBuf buf:messageBuf){//遍历所有ByteBuf
8 System.out.println(buf);
9 byte[] req = new byte[buf.readableBytes()];
10 buf.readBytes(req);
11 String body = new String(req, "UTF-8");
12 System.out.println("复合缓冲区:"+body);
13 }
14 ctx.writeAndFlush(buf);
15 }
16
字节级操作
ByteBuf提供了许多超出基本读、写操作 的方法用于修改它的数据
随机访问索引
ByteBuf的索引是从零开始的:第一个字节的索引是0,最后一个字节的索引总是capacity() – 1。
1
2
3
4
5
6
7 1 ByteBuf buf = (ByteBuf) msg;
2 for (int i = 0; i < buf.capacity(); i++) {
3 byte b=buf.getByte(i);
4 if((char)b>='a'&&(char)b<='z'||(char)b>='A'&&(char)b<='Z'||(char)b==',')
5 System.out.println("i="+(char)b);
6 }
7
如果有需要,也可以通过调用readerIndex(index)或者writerIndex(index)来手动移动这两者。
顺序访问索引
ByteBuf同时具有读索引和写索引,但是JDK的ByteBuffer却只有一个索引,这也就是为什么必须调用flip()方法来 在读模式和写模式之间进行切换的原因。
可丢弃字节
丢弃字节的分段包含了已经被读过的字节。通过调用discardReadBytes()方法, 可以丢弃它们并回收空间。这个分段的初始大小为0,存储在readerIndex中,会随着read操作的执行 而增加(get*操作不会移动readerIndex)。
注意,在调用discardReadBytes()之后,对可写分段的内容并没有任何的保证。
虽然你可能会倾向于频繁地调用discardReadBytes()方法以确保可写分段的最大化,但是请注意,这将极有可能会导致内存复制,因为可读字节(图中标记为CONTENT的部分)必须被移动到缓冲区的开始位置。所以建议只有在真正需要的时候才这样做,如内存十分宝贵的时候。
可读字节
ByteBuf的可读字节分段存储了实际数据。新分配的、包装的或者复制的缓冲区的默认的readerIndex值为0。任何名称以read或者skip开头的操作都将检索或者跳过位于当前readerIndex的数据,并且将它增加已读字节数。如果被调用的方法需要一个ByteBuf参数作为写入的目标,并且没有指定目标索引参数,那么该目标缓冲区的writerIndex也将被增加,例如:
readBytes(ByteBuf dest);
如果尝试在缓冲区的可读字节数已经耗尽时从中读取数据,那么将会引发一个IndexOutOfBoundsException。
可写字节
可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域。新分配的缓冲区的writerIndex的默认值为0。任何名称以write开头的操作都将从当前的writerIndex处开始写数据,并将它增加已经写入的字节数。如果写操作的目标也是ByteBuf,并且没有指定源索引的值,则源缓冲区的readerIndex也同样会被增加相同的大小。这个调用如下所示:
writeBytes(ByteBuf dest);
如果尝试往目标写入超过目标容量的数据,将会引发一个IndexOutOfBoundException。
索引管理
JDK的InputStream定义了mark(int readlimit)和reset()方法,这些方法分别被用来将流中的当前位置标记为指定的值,以及将流重置到该位置。
同样,可以通过调用markReaderIndex()、markWriterIndex()、resetWriterIndex()和resetReaderIndex()来标记和重置ByteBuf的readerIndex和writerIndex。这些和InputStream上的调用类似,只是没有readlimit参数来指定标记什么时候失效。
也可以通过调用readerIndex(int)或者writerIndex(int)来将索引移动到指定位置。试图将任何一个索引设置到一个无效的位置都将导致一个IndexOutOfBoundsException。可以通过调用clear()方法来将readerIndex和writerIndex都设置为0。注意,这并不会清除内存中的内容。
调用clear()比调用discardReadBytes()轻量得多,因为它将只是重置索引而不会复制任何的内存。
查找操作
在ByteBuf中有多 种可以用来确定指定值的索引的方法。最简单的是使用indexOf()方法。较复杂的查找可以通过那些需要一个ByteBufProcessor作为参数的方法达成。这个接口只定义了一个方法:
boolean process(byte value):它将检查输入值是否是正在查找的值。
1
2
3
4
5
6
7
8
9
10
11
12 1 int i=buf.forEachByte(new ByteProcessor() {
2 @Override
3 public boolean process(byte value) throws Exception {
4 byte[] b=",".getBytes();
5 if (b[0]!=value)
6 return true;
7 else
8 return false;
9 }
10 });
11 System.out.println("i="+i+" value="+(char) buf.getByte(i));
12
派生缓冲区
派生缓冲区为ByteBuf提供了以专门的方式来呈现其内容的视图。这类视图是通过以下方法被创建的:
- duplicate();
- slice();
- slice(int, int);
- Unpooled.unmodifiableBuffer(…);
- order(ByteOrder);
- readSlice(int)。
每个这些方法都将返回一个新的ByteBuf实例,它具有自己的读索引、写索引和标记索引。其内部存储和JDK的ByteBuffer一样也是共享的。这使得派生缓冲区的创建成本是很低廉的,但是这也意味着,如果你修改了它的内容,也同时修改了其对应的源实例,所以要小心。
ByteBuf复制
如果需要一个现有缓冲区的真实副本,请使用copy()或者copy(int, int)方法。不同于派生缓冲区,由这个调用所返回的ByteBuf拥有独立的数据副本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 1 public void channelRead(ChannelHandlerContext ctx, Object msg)
2 throws Exception {
3 ByteBuf buf = (ByteBuf) msg;
4 for (int i = 0; i < buf.capacity(); i++) {
5 byte b=buf.getByte(i);
6 if((char)b>='a'&&(char)b<='z'||(char)b>='A'&&(char)b<='Z'||(char)b==',')
7 System.out.println("i="+(char)b);
8 }
9 int i=buf.forEachByte(new ByteProcessor() {
10 @Override
11 public boolean process(byte value) throws Exception {
12 byte[] b=",".getBytes();
13 if (b[0]!=value)
14 return true;
15 else
16 return false;
17 }
18 });
19 System.out.println("i="+i+" value="+(char) buf.getByte(i));
20 ByteBuf sliced = buf.slice(0,2);
21 sliced.setByte(0,(byte)'h');
22 byte[] req = new byte[buf.readableBytes()];
23 buf.readBytes(req);
24 String body = new String(req, "UTF-8");
25 System.out.println(body);
26 ctx.close();
27 }
28 });
29 }
30 }
31
读/ 写操作
有两种类别的读/写操作:
- get()和set()操作, 从给定的索引开始,并且保持索引不变;
- read()和write()操作, 从给定的索引开始,并且会根据已经访问过的字节数索引进行调整。
get() 操作
名称 | 描述 |
getBoolean(int) | 返回给定索引处的 Bollean值 |
getByte(int) | 返回给定索引处的 字节 |
getUnsignedByte(int) | 将给定索引处的 无符号字节值作为short返回 |
getMedium(int) | 返回给定索引处的 24位的中等int值 |
getUnsignedMedium(int) | 返回给定索引处的 无符号的24位的中等 int 值 |
getInt(int) | 返回给定索引处的 int 值 |
getUnsignedInt(int) | 将给定索引处的 无符号 int 值作为 long 返回 |
getLong(int) | 返回给定索引处的 long 值 |
getShort(int) | 返回给定索引处的 short 值 |
getUnsignedShrt(int) | 将给定索引处的无符号 short 值作为 int 返回 |
getByte(int, …) | 将该缓冲区中从给定索引开始的数据传送到指定的目的地 |
1 | 1 |
set() 操作
名称 | 描述 |
setBoolean(int, boolean) | 设定给定索引处的 Boolean 值 |
setByte(int index, int value) | 设定给定索引处的 字节值 |
setMedium(int index, int value) | 设定给定索引处的 24位的中等 int 值 |
setInt(int index, int value) | 设定给定索引处的 int 值 |
setLong(int index, long value) | 设定给定索引处的 long 值 |
setShort(int index, int value) | 设定给定索引处的 short 值 |
1 | 1 |
read() 操作
名称 | 描述 |
readBoolean() | 返回当前 readerIndex 处的 Boolean, 并将readerIndex 增加 1 |
readByte() | 返回当前readerIndex 处的字节,并将readerIndex 增加 1 |
readUnsignedByte() | 将当前 readerIndex 处的无符号字节值作为short 返回,并将readerIndex 增加 1 |
readMedium() | 返回当前readerIndex 处的24位的中等int 值,并将readerIndex 增加 3 |
readUnsignedMedium() | 返回当前 readerIndex 处的 24位的无符号的中等int 值, 并将readerIndex 增加 3 |
readInt() | 返回当前 readerIndex 处的int 值,并将readerIndex增加 4 |
readUnsignedInt() | 将当前readerIndex处的无符号的int值作为long值返回, 并将readerIndex 增加 4 |
readLong() | 返回当前readerIndex 处的long值,并将readerIndex 增加 8 |
readShort() | 返回当前readerIndex 处的short 值,并将readerIndex 增加 2 |
readUnsignedShort() | 将当前 readerIndex 处无符号 short 值 作为 int 值返回,并将readerIndex 增加 2 |
readBytes( ByteBuf | byte [] destinatuion, int dstIndex [ , int length] ) | 将当前 ByteBuf 中 从当前 readerIndex 处 开始的数据传送到一个目标 ByteBuf 或者 byte[],从目标的dstIndex开始的位置。本地的readerIndex将被增加已经传输的字节数。 |
1 | 1 |
write() 操作
名称 | 描述 |
writeBoolean( boolean ) | 在当前 writeIndex 处写入一个Boolean ,并将 writeIndex 增加 1 |
writeByte( int ) | 在当前 writeIndex 处写入一个字节值,并将writeIndex 增加 1 |
writeMedium( int ) | 在当前 writeIndex 处写入一个中等的 int 值, 并将writeIndex 增加 3 |
writeInt ( int ) | 在当前 writeIndex 处写入一个 int 值,并将writeIndex 增加 4 |
writeLong ( long ) | 在当前 writeIndex 处写入一个 long 值,并将 writeIndex 增加 8 |
writeShort ( int ) | 在当前writeIndex 处写入一个 short 值, 并将 writeIndex 增加 2 |
writeBytes ( source ByteBuf | byte [] [ , int srcIndex , int length ] ) | 从 当前writeIndex 开始,传输来自于指定源的数据,如果提供了srcIndex 和 length, 则从 srcIndex 开始读取,并且处理长度为 length 的字节,当前 writeIndex 将会被增加所写入的字节数。 |
1 | 1 |
更多操作
名称 | 描述 |
isReadable() | 如果至少有一个字节可读取,返回 true |
isWritable() | 如果至少有一个字节可写入,返回 true |
readableByte() | 返回可被读取的字节数 |
writableBytes() | 返回可被写入的字节数 |
capacity() | 返回ByteBuf 可容纳的字节数, 此后,会尝试再次扩展,直到达到maxCapacity() |
maxCapacity() | 返回ByteBuf 可以容纳的最大字节数 |
hasArray() | 如果ByteBuf 由一个字节数组支撑,则返回 true |
array() | 如果ByteBuf 由一个字节数组支撑则返回该数组,否则,它将抛出一个 UnsupportedOperationException 异常 |
1 | 1 |
ByteBufHolder接口
有时候除了实际的数据负载之外,我们还 需要存储各种属性值。HTTP响应便是一个很好的例子,除了表示为字节的内容,还包括状态码、cookie等。
为了处理这种常见的用例,Netty提供了ByteBufHolder。ByteBufHolder也为Netty的高级特性提供了支持,如缓冲区池化,其中可以从池中借用ByteBuf,并且在需要时自动释放。ByteBufHolder只有几种用于访问底层数据和引用计数的方法。
名称 | 描述 |
content() | 返回由这个ByteBufHolder所持有的ByteBuf |
copy() | 返回这个ByteBufHolder的一个深拷贝,包括一个其所包含的ByteBuf的非共享 拷贝 |
duplicate() | 返回这个ByteBufHolder的一个浅拷贝,包括一个其所包含的ByteBuf的共享拷贝 |
1 | 1 |
ByteBuf分配
按需分配:ByteBufAllocator接口
为了降低分配和释放内存的开销,Netty通过interface ByteBufAllocator实现了(ByteBuf的)池化,它可以用来分配我们所描述过的任意类型的ByteBuf实例。
可以通过Channel(每个都可以有一个不同的ByteBufAllocator实例)或者绑定到ChannelHandler的ChannelHandlerContext获取一个ByteBufAllocator的引用。
1
2
3 1 ByteBufAllocator allocator=ch.alloc();//ch是Channel的实例
2
3
Netty提供了两种ByteBufAllocator的实现:PooledByteBufAllocator和UnpooledByteBufAllocator。前者池化了ByteBuf的实例以提高性能并最大限度地减少内存碎片。后者的实现不池化ByteBuf实例,并且在每次它被调用时都会返回一个新的实例。
Netty默认使用了PooledByteBufAllocator。
Unpooled缓冲区
可能某些情况下,你未能获取一个到ByteBufAllocator的引用。对于这种情况,Netty提供了一个简单的称为Unpooled的工具类,它提供了静态的辅助方法来创建未池化的ByteBuf实例。
1
2 1final ByteBuf bufs= Unpooled.copiedBuffer("Hello,刘德华", Charset.forName("UTF-8"));
2
ByteBufUtil类
ByteBufUtil提供了用于操作ByteBuf的静态的辅助方法。因为这个API是通用的,并且和池化无关,所以这些方法已然在分配类的外部实现。这些静态方法中最有价值的可能就是hexdump()方法,它以十六进制的表示形式打印ByteBuf的内容。另一个有用的方法是boolean equals(ByteBuf, ByteBuf),它被用来判断两个ByteBuf实例的相等性。
引用计数
引用计数是一种通过在某个对象所持有的资源不再被其他对象引用时释放该对象所持有的资源来优化内存使用和性能的技术。引用计数对于池化实现(如PooledByteBufAllocator)来说是至关重要的,它降低了内存分配的开销。Netty在第4版中为ByteBuf和ByteBufHolder引入了引用计数技术,它们都实现了interface ReferenceCounted。
它主要涉及跟踪到某个特定对象的活动引用的数量。一个ReferenceCounted
实现的实例将通常以活动的引用计数为1作为开始。只要引用计数大于0,就能保证对象不会被释放。当活动引用的数量减少到0时,该实例就会被释放。
1
2
3 1 System.out.println(buf.refCnt());//返回此对象的引用计数。如果为0,则表示此对象已被释放。
2 buf.release();//释放引用计数对象
3
试图访问一个已经被释放的引用计数的对象,将会导致一个IllegalReferenceCountException。注意, 一个特定(ReferenceCounted的实现)类,可以用它自己的独特方式来定义它的引用计数规则。
参考《Netty实战》