Netty零拷贝机制
-
Netty自己的ByteBuf
-
JDK ByteBuffer的缺点:
- ByteBuf做了那些增强
-
ByteBuf操作
-
ByteBuf动态扩容原理
-
选择合适的ByteBuf实现
-
Unsafe的实现
-
PooledByteBuf对象、内存复用
-
零拷贝机制
Netty自己的ByteBuf
ByteBuf是为解决ByteBuffer的问题和满足网络应用程序开发人员的日常需求儿设计的。
JDK ByteBuffer的缺点:
1、无法动态扩容
长度是固定的,不能动态扩展和收缩,当数据大于ByteBuffer容量时,会发生索引越界异常。
2、API使用复杂
读写的时候需要手工调用flip()和rewind()等方法,使用时,需要非常谨慎的使用这些api,否则很容易出现错误。
ByteBuf做了那些增强
- API操作便捷性
- 动态扩容
- 多种ByteBuf实现
- 高效的零拷贝机制
ByteBuf操作
ByteBuf三个重要属性:capacity容量、readerIndex读取位置、writerIndex写入位置。
提供了两个指针变量来支持顺序读和写操作,分别是readerIndex和写错误writerIndex。
代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 1public void apiTest() {
2 // +-------------------+------------------+------------------+
3 // | discardable bytes | readable bytes | writable bytes |
4 // | | (CONTENT) | |
5 // +-------------------+------------------+------------------+
6 // | | | |
7 // 0 <= readerIndex <= writerIndex <= capacity
8
9 // 1.创建一个非池化的ByteBuf,大小为10个字节(源码中不建议new一个,建议使用Unpooled创建ByteBuf)
10 ByteBuf buf = Unpooled.buffer(10);
11 System.out.println("原始ByteBuf为====================>" + buf.toString());
12 System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
13
14 // 2.写入一段内容
15 byte[] bytes = {1, 2, 3, 4, 5};
16 buf.writeBytes(bytes);
17 System.out.println("写入的bytes为====================>" + Arrays.toString(bytes));
18 System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
19 System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
20
21 // 3.读取一段内容
22 byte b1 = buf.readByte();
23 byte b2 = buf.readByte();
24 System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
25 System.out.println("读取一段内容后ByteBuf为===========>" + buf.toString());
26 System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
27
28 // 4.将读取的内容丢弃
29 buf.discardReadBytes();
30 System.out.println("将读取的内容丢弃后ByteBuf为========>" + buf.toString());
31 System.out.println("4.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
32
33 // 5.清空读写指针
34 buf.clear();
35 System.out.println("将读写指针清空后ByteBuf为==========>" + buf.toString());
36 System.out.println("5.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
37
38 // 6.再次写入一段内容,比第一段内容少
39 byte[] bytes2 = {1, 2, 3};
40 buf.writeBytes(bytes2);
41 System.out.println("写入的bytes为====================>" + Arrays.toString(bytes2));
42 System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
43 System.out.println("6.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
44
45 // 7.将ByteBuf清零
46 buf.setZero(0, buf.capacity());
47 System.out.println("将内容清零后ByteBuf为==============>" + buf.toString());
48 System.out.println("7.ByteBuf中的内容为================>" + Arrays.toString(buf.array()) + "\n");
49
50 // 8.再次写入一段超过容量的内容
51 byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
52 buf.writeBytes(bytes3);
53 System.out.println("写入的bytes为====================>" + Arrays.toString(bytes3));
54 System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
55 System.out.println("8.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
56 // 随机访问索引 getByte
57 // 顺序读 read*
58 // 顺序写 write*
59 // 清除已读内容 discardReadBytes
60 // 清除缓冲区 clear
61 // 搜索操作
62 // 标记和重置
63 // 完整代码示例:参考
64 // 搜索操作 读取指定位置 buf.getByte(1);
65 //
66 }
67
68
69
执行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 1原始ByteBuf为====================>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
21.ByteBuf中的内容为===============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
3
4写入的bytes为====================>[1, 2, 3, 4, 5]
5写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 10)
62.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
7
8读取的bytes为====================>[1, 2]
9读取一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 2, widx: 5, cap: 10)
103.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
11
12将读取的内容丢弃后ByteBuf为========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
134.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
14
15将读写指针清空后ByteBuf为==========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
165.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
17
18写入的bytes为====================>[1, 2, 3]
19写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
206.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
21
22将内容清零后ByteBuf为==============>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
237.ByteBuf中的内容为================>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
24
25写入的bytes为====================>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
26写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 14, cap: 64)
278.ByteBuf中的内容为===============>[0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
28
29
30
ByteBuf动态扩容原理
capacity默认值:256字节、最大值:Integer.MAX_VALUE(2GB)
write 方法调用,通过AbstractByteBuf.ensureWritable()进行检查。
容量计算发放:AbstractByteBufAllocator.calculateNewCapacity(新capacity的最小要求,capacity最大值)。
根据新capacity的最小值要求,对应有两套计算方法:
没超过4M:从64字节开始,每次增加一倍,直到计算出来的newCapacity满足新容量最小要求。
示例:当前大小256,已经250,继续写10字节数据,需要的容量最小要求是261,则新容量是6422*2=512
超过4M:新容量 = 新容量最小要求/4M * 4M + 4M
示例:当前大小3M,已写3M,继续写2M数据,需要的容量最小要求是5M,则新容量是9M(不能超过最大值)。
4M的来源:一个固定的阈值:AbstractByteBufAllocator.CALCULATE_THRESHOLD
选择合适的ByteBuf实现
3个维度划分,8中具体实现
在使用中,都是通过ByteBufAllocator分配器进行申请,同时分配器具备有内存管理的功能。
Unsafe的实现
unsafe意味着不安全的操作。但是更底层的操作会带来性能提升和特殊功能,Netty中会尽力使用unsafe。Java语言很重要的特性是“
一次编写到处运行”,所以他针对底层的内存或者其他操作,做了很多封装。而unsafe提供了一系列我们操作底层的方法,可能会导致不兼容活着不可知的异常。
PooledByteBuf对象、内存复用
释放对象:
1
2
3
4 1((ByteBuf) msg).release();//引用计数 减1
2ctx.fireChannelRead(msg);//在调用链最后一个tailContent,会对ByteBuf释放
3
4
PoolThreadCache:PooledByteBufAllocator实例维护的一个线程变量。
多种分类的MemoryRegionCache数组用作内存缓存,MemoryRegionCache内部是链表,队列里存Chunk。
PoolChunk里面维护了内存引用,内存复用的做法就是把buf的memory指向chunk的memory。
零拷贝机制
Netty的零拷贝机制,是一种应用层的实现。和底层JVM、操作系统内存机制并无过多关联。