Netty零拷贝机制

释放双眼,带上耳机,听听看~!

Netty零拷贝机制

  • Netty自己的ByteBuf

  • JDK ByteBuffer的缺点:

    • ByteBuf做了那些增强
  • ByteBuf操作

  • ByteBuf动态扩容原理

  • 选择合适的ByteBuf实现

  • Unsafe的实现

  • PooledByteBuf对象、内存复用

  • 零拷贝机制

Netty自己的ByteBuf

ByteBuf是为解决ByteBuffer的问题和满足网络应用程序开发人员的日常需求儿设计的。

JDK ByteBuffer的缺点:

1、无法动态扩容
长度是固定的,不能动态扩展和收缩,当数据大于ByteBuffer容量时,会发生索引越界异常。
2、API使用复杂
读写的时候需要手工调用flip()和rewind()等方法,使用时,需要非常谨慎的使用这些api,否则很容易出现错误。

ByteBuf做了那些增强

  • API操作便捷性
  • 动态扩容
  • 多种ByteBuf实现
  • 高效的零拷贝机制

ByteBuf操作

ByteBuf三个重要属性:capacity容量、readerIndex读取位置、writerIndex写入位置。
提供了两个指针变量来支持顺序读和写操作,分别是readerIndex和写错误writerIndex。
Netty零拷贝机制

Netty零拷贝机制
代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
1public void apiTest() {
2        //  +-------------------+------------------+------------------+
3        //  | discardable bytes |  readable bytes  |  writable bytes  |
4        //  |                   |     (CONTENT)    |                  |
5        //  +-------------------+------------------+------------------+
6        //  |                   |                  |                  |
7        //  0      <=       readerIndex   <=   writerIndex    <=    capacity
8
9        // 1.创建一个非池化的ByteBuf,大小为10个字节(源码中不建议new一个,建议使用Unpooled创建ByteBuf)
10        ByteBuf buf = Unpooled.buffer(10);
11        System.out.println("原始ByteBuf为====================>" + buf.toString());
12        System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
13
14        // 2.写入一段内容
15        byte[] bytes = {1, 2, 3, 4, 5};
16        buf.writeBytes(bytes);
17        System.out.println("写入的bytes为====================>" + Arrays.toString(bytes));
18        System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
19        System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
20
21        // 3.读取一段内容
22        byte b1 = buf.readByte();
23        byte b2 = buf.readByte();
24        System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
25        System.out.println("读取一段内容后ByteBuf为===========>" + buf.toString());
26        System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
27
28        // 4.将读取的内容丢弃
29        buf.discardReadBytes();
30        System.out.println("将读取的内容丢弃后ByteBuf为========>" + buf.toString());
31        System.out.println("4.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
32
33        // 5.清空读写指针
34        buf.clear();
35        System.out.println("将读写指针清空后ByteBuf为==========>" + buf.toString());
36        System.out.println("5.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
37
38        // 6.再次写入一段内容,比第一段内容少
39        byte[] bytes2 = {1, 2, 3};
40        buf.writeBytes(bytes2);
41        System.out.println("写入的bytes为====================>" + Arrays.toString(bytes2));
42        System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
43        System.out.println("6.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
44
45        // 7.将ByteBuf清零
46        buf.setZero(0, buf.capacity());
47        System.out.println("将内容清零后ByteBuf为==============>" + buf.toString());
48        System.out.println("7.ByteBuf中的内容为================>" + Arrays.toString(buf.array()) + "\n");
49
50        // 8.再次写入一段超过容量的内容
51        byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
52        buf.writeBytes(bytes3);
53        System.out.println("写入的bytes为====================>" + Arrays.toString(bytes3));
54        System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
55        System.out.println("8.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
56        //  随机访问索引 getByte
57        //  顺序读 read*
58        //  顺序写 write*
59        //  清除已读内容 discardReadBytes
60        //  清除缓冲区 clear
61        //  搜索操作
62        //  标记和重置
63        //  完整代码示例:参考
64        // 搜索操作 读取指定位置 buf.getByte(1);
65        //
66    }
67
68
69

执行结果:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1原始ByteBuf为====================>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
21.ByteBuf中的内容为===============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
3
4写入的bytes为====================>[1, 2, 3, 4, 5]
5写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 10)
62.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
7
8读取的bytes为====================>[1, 2]
9读取一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 2, widx: 5, cap: 10)
103.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
11
12将读取的内容丢弃后ByteBuf为========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
134.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
14
15将读写指针清空后ByteBuf为==========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
165.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
17
18写入的bytes为====================>[1, 2, 3]
19写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
206.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
21
22将内容清零后ByteBuf为==============>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
237.ByteBuf中的内容为================>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
24
25写入的bytes为====================>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
26写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 14, cap: 64)
278.ByteBuf中的内容为===============>[0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
28
29
30

ByteBuf动态扩容原理

capacity默认值:256字节、最大值:Integer.MAX_VALUE(2GB)
write 方法调用,通过AbstractByteBuf.ensureWritable()进行检查。
容量计算发放:AbstractByteBufAllocator.calculateNewCapacity(新capacity的最小要求,capacity最大值)。

根据新capacity的最小值要求,对应有两套计算方法:
没超过4M:从64字节开始,每次增加一倍,直到计算出来的newCapacity满足新容量最小要求。
示例:当前大小256,已经250,继续写10字节数据,需要的容量最小要求是261,则新容量是6422*2=512

超过4M:新容量 = 新容量最小要求/4M * 4M + 4M
示例:当前大小3M,已写3M,继续写2M数据,需要的容量最小要求是5M,则新容量是9M(不能超过最大值)。

4M的来源:一个固定的阈值:AbstractByteBufAllocator.CALCULATE_THRESHOLD

选择合适的ByteBuf实现

3个维度划分,8中具体实现
Netty零拷贝机制
在使用中,都是通过ByteBufAllocator分配器进行申请,同时分配器具备有内存管理的功能。

Unsafe的实现

unsafe意味着不安全的操作。但是更底层的操作会带来性能提升和特殊功能,Netty中会尽力使用unsafe。Java语言很重要的特性是“
一次编写到处运行”,所以他针对底层的内存或者其他操作,做了很多封装。而unsafe提供了一系列我们操作底层的方法,可能会导致不兼容活着不可知的异常。
Netty零拷贝机制

PooledByteBuf对象、内存复用

释放对象:


1
2
3
4
1((ByteBuf) msg).release();//引用计数  减1
2ctx.fireChannelRead(msg);//在调用链最后一个tailContent,会对ByteBuf释放
3
4

PoolThreadCache:PooledByteBufAllocator实例维护的一个线程变量。
多种分类的MemoryRegionCache数组用作内存缓存,MemoryRegionCache内部是链表,队列里存Chunk。
PoolChunk里面维护了内存引用,内存复用的做法就是把buf的memory指向chunk的memory。
Netty零拷贝机制

零拷贝机制

Netty的零拷贝机制,是一种应用层的实现。和底层JVM、操作系统内存机制并无过多关联。
Netty零拷贝机制

给TA打赏
共{{data.count}}人
人已打赏
安全技术

用node.js做cluster,监听异常的邮件提醒服务

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索