Netty源码分析第五章: ByteBuf
第四节: PooledByteBufAllocator简述
上一小节简单介绍了ByteBufAllocator以及其子类UnPooledByteBufAllocator的缓冲区分类的逻辑, 这一小节开始带大家剖析更为复杂的PooledByteBufAllocator, 我们知道PooledByteBufAllocator是通过自己取一块连续的内存进行ByteBuf的封装, 所以这里更为复杂, 在这一小节简单讲解有关PooledByteBufAllocator分配逻辑
友情提示: 从这一节开始难度开始加大, 请各位战友做好心理准备
PooledByteBufAllocator同样也重写了AbstractByteBuf的newDirectBuffer和newHeapBuffer两个抽象方法, 我们这一小节以newDirectBuffer为例, 先简述一下其逻辑
首先看UnPooledByteBufAllocator中newDirectBuffer这个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
2 PoolThreadCache cache = threadCache.get();
3 PoolArena<ByteBuffer> directArena = cache.directArena;
4 ByteBuf buf;
5 if (directArena != null) {
6 buf = directArena.allocate(cache, initialCapacity, maxCapacity);
7 } else {
8 if (PlatformDependent.hasUnsafe()) {
9 buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
10 } else {
11 buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
12 }
13 }
14 return toLeakAwareBuffer(buf);
15}
16
首先
PoolThreadCache cache = threadCache.get() 这一步是拿到一个线程局部缓存对象, 线程局部缓存, 顾明思议, 就是同一个线程共享的一个缓存
threadCache是PooledByteBufAllocator类的一个成员变量, 类型是PoolThreadLocalCache(这两个非常容易混淆, 切记):
1
2 1private final PoolThreadLocalCache threadCache;
2
再看其类型PoolThreadLocalCache的定义:
1
2
3
4
5
6
7
8
9
10
11
12 1final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
2 @Override
3 protected synchronized PoolThreadCache initialValue() {
4 final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
5 final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
6 return new PoolThreadCache(
7 heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
8 DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
9 }
10 //代码省略
11}
12
这里继承了一个FastThreadLocal类, 这个类相当于jdk的ThreadLocal, 只是性能更快, 有关FastThreadLocal, 我们在后面的章节会详细剖析, 这里我们只要知道, 继承FastThreadLocal类并且重写了initialValue方法, 则通过其get方法就能获得initialValue返回的对象, 并且这个对象是线程共享的
在这里我们看到, 在重写的initialValue方法中, 初始化了heapArena和directArena两个属性之后, 通过new PoolThreadCache()这种方式创建了PoolThreadCache对象
这里注意, PoolThreadLocalCache是一个FastThreadLocal, 而PoolThreadCache才是线程局部缓存, 这两个类名非常非常像, 千万别搞混了(我当初读这段代码时因为搞混所以懵逼了)
其中heapArena和directArena是分别是用来分配堆和堆外内存用的两个对象, 以directArena为例, 我们看到是通过leastUsedArena(directArenas)这种方式获得的, directArenas是一个directArena类型的数组, leastUsedArena(directArenas)这个方法是用来获取数组中一个使用最少的directArena对象
directArenas是PooledByteBufAllocator的成员变量, 是在其构造方法中初始化的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
2 int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
3
4 //代码省略
5 if (nDirectArena > 0) {
6 directArenas =newArenaArray(nDirectArena);
7 List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
8 for (int i = 0; i < directArenas.length; i ++) {
9 PoolArena.DirectArena arena = new PoolArena.DirectArena(
10 this, pageSize, maxOrder, pageShifts, chunkSize);
11 directArenas[i] = arena;
12 metrics.add(arena);
13 }
14 directArenaMetrics = Collections.unmodifiableList(metrics);
15 } else {
16 directArenas = null;
17 directArenaMetrics = Collections.emptyList();
18 }
19}
20
我们看到这里通过directArenas = newArenaArray(nDirectArena)初始化了directArenas, 其中nDirectArena, 默认是cpu核心数的2倍, 这点我们可以跟踪构造方法的调用链可以分析到
这样保证了每一个线程会有一个独享的arena
我们看newArenaArray(nDirectArena)这个方法:
1
2
3
4 1private static <T> PoolArena<T>[] newArenaArray(int size) {
2 return new PoolArena[size];
3}
4
这里只是创建了一个数组, 默认长度为nDirectArena
继续跟PooledByteBufAllocator的构造方法, 创建完了数组, 后面在for循环中为数组赋值:
首先通过new PoolArena.DirectArena创建一个DirectArena实例, 然后再为新创建的directArenas数组赋值
再回到PoolThreadLocalCache的构造方法中:
1
2
3
4
5
6
7
8
9
10
11
12 1final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
2 @Override
3 protected synchronized PoolThreadCache initialValue() {
4 final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
5 final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
6 return new PoolThreadCache(
7 heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
8 DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
9 }
10 //代码省略
11}
12
方法最后, 创建PoolThreadCache的一个对象, 我们跟进构造方法中:
1
2
3
4
5
6
7
8
9
10 1PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
2 int tinyCacheSize, int smallCacheSize, int normalCacheSize,
3 int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
4 //代码省略
5 //保存成两个成员变量
6 this.heapArena = heapArena;
7 this.directArena = directArena;
8 //代码省略
9}
10
这里省略了大段代码, 只需要关注这里将两个值保存在PoolThreadCache的成员变量中
我们回到newDirectBuffer中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
2 PoolThreadCache cache = threadCache.get();
3 PoolArena<ByteBuffer> directArena = cache.directArena;
4 ByteBuf buf;
5 if (directArena != null) {
6 buf = directArena.allocate(cache, initialCapacity, maxCapacity);
7 } else {
8 if (PlatformDependent.hasUnsafe()) {
9 buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
10 } else {
11 buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
12 }
13 }
14 return toLeakAwareBuffer(buf);
15}
16
简单分析的线程局部缓存初始化相关逻辑, 我们再往下跟:
1
2 1PoolArena<ByteBuffer> directArena = cache.directArena;
2
通过上面的分析, 这步我们应该不陌生, 在PoolThreadCache构造方法中将directArena和heapArena中保存在成员变量中, 这样就可以直接通过cache.directArena这种方式拿到其成员变量的内容
从以上逻辑, 我们可以大概的分析一下流程, 通常会创建和线程数量相等的arena, 并以数组的形式存储在PooledByteBufAllocator的成员变量中, 每一个PoolThreadCache创建的时候, 都会在当前线程拿到一个arena, 并保存在自身的成员变量中
5-4-1
PoolThreadCache除了维护了一个arena之外, 还维护了一个缓存列表, 我们在重复分配ByteBuf的时候, 并不需要每次都通过arena进行分配, 可以直接从缓存列表中拿一个ByteBuf
有关缓存列表, 我们循序渐进的往下看:
在PooledByteBufAllocator中维护了三个值:
1. tinyCacheSize
2. smallCacheSize
3. normalCacheSize
tinyCacheSize代表tiny类型的ByteBuf能缓存多少个
smallCacheSize代表small类型的ByteBuf能缓存多少个
normalCacheSize代表normal类型的ByteBuf能缓存多少个
具体tiny类型, small类型, normal是什么意思, 我们会在后面讲解
我们回到PoolThreadLocalCache类中看其构造方法:
1
2
3
4
5
6
7
8
9
10
11
12 1final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
2 @Override
3 protected synchronized PoolThreadCache initialValue() {
4 final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
5 final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
6 return new PoolThreadCache(
7 heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
8 DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
9 }
10 //代码省略
11}
12
我们看到这三个属性是在PoolThreadCache的构造方法中传入的
这三个属性是通过PooledByteBufAllocator的构造方法中初始化的, 跟随构造方法的调用链会走到这个构造方法:
1
2
3
4
5 1public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
2 this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
3 DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
4}
5
这里仍然调用了一个重载的构造方法, 这里我们关注这几个参数:
DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE
这里对应着几个静态的成员变量:
1
2
3
4 1private static final int DEFAULT_TINY_CACHE_SIZE;
2private static final int DEFAULT_SMALL_CACHE_SIZE;
3private static final int DEFAULT_NORMAL_CACHE_SIZE;
4
我们在static块中看其初始化过程:
1
2
3
4
5
6
7
8 1static{
2 //代码省略
3 DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
4 DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
5 DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
6 //代码省略
7}
8
在这里我们看到, 这三个属性分别初始化的大小是512, 256, 64, 这三个属性就对应了PooledByteBufAllocator另外的几个成员变量, tinyCacheSize, smallCacheSize, normalCacheSize
也就是说, tiny类型的ByteBuf在每个缓存中默认缓存的数量是512个, small类型的ByteBuf在每个缓存中默认缓存的数量是256个, normal类型的ByteBuf在每个缓存中默认缓存的数量是64个
我们再到PooledByteBufAllocator中重载的构造方法中:
1
2
3
4
5
6
7
8
9
10
11 1public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
2 int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
3 super(preferDirect);
4 threadCache = new PoolThreadLocalCache();
5 this.tinyCacheSize = tinyCacheSize;
6 this.smallCacheSize = smallCacheSize;
7 this.normalCacheSize = normalCacheSize;
8
9 //代码省略
10}
11
篇幅原因, 这里也省略了大段代码, 大家可以通过构造方法参数找到源码中相对的位置进行阅读
我们关注这段代码:
1
2
3
4 1this.tinyCacheSize = tinyCacheSize;
2this.smallCacheSize = smallCacheSize;
3this.normalCacheSize = normalCacheSize;
4
在这里将将参数的DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE的三个值保存到了成员变量tinyCacheSize, smallCacheSize, normalCacheSize
PooledByteBufAllocator中将这三个成员变量初始化之后, 在PoolThreadLocalCache的initialValue方法中就可以使用这三个成员变量的值了
我们再次跟到initialValue方法中:
1
2
3
4
5
6
7
8
9
10
11
12 1final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
2 @Override
3 protected synchronized PoolThreadCache initialValue() {
4 final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
5 final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
6 return new PoolThreadCache(
7 heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
8 DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
9 }
10 //代码省略
11}
12
这里就可以在创建PoolThreadCache对象的的构造方法中传入tinyCacheSize, smallCacheSize, normalCacheSize这三个成员变量了
我们再跟到PoolThreadCache的构造方法中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 1PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
2 int tinyCacheSize, int smallCacheSize, int normalCacheSize,
3 int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
4 //代码省略
5 this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
6 this.heapArena = heapArena;
7 this.directArena = directArena;
8 if (directArena != null) {
9 tinySubPageDirectCaches = createSubPageCaches(
10 tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
11 smallSubPageDirectCaches =createSubPageCaches(
12 smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
13
14 numShiftsNormalDirect = log2(directArena.pageSize);
15 normalDirectCaches =createNormalCaches(
16 normalCacheSize, maxCachedBufferCapacity, directArena);
17
18 directArena.numThreadCaches.getAndIncrement();
19 } else {
20 //代码省略
21 }
22 //代码省略
23 ThreadDeathWatcher.watch(thread, freeTask);
24}
25
其中tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches就代表了三种类型的缓存数组, 数组元素是MemoryRegionCache类型的对象, MemoryRegionCache就代表一个ByeBuf的缓存
以tinySubPageDirectCaches为例, 我们看到tiny类型的缓存是通过createSubPageCaches这个方法创建的
这里传入了三个参数tinyCacheSize我们之前分析过是512, PoolArena.numTinySubpagePools这里是32(这里不同类型的缓存大小不一样, small类型是4, normal类型是3) , SizeClass.Tiny代表其类型是tiny类型
我们跟到createSubPageCaches这个方法中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1private static <T> MemoryRegionCache<T>[] createSubPageCaches(
2 int cacheSize, int numCaches, SizeClass sizeClass) {
3 if (cacheSize > 0) {
4 //创建数组, 长度为32
5 @SuppressWarnings("unchecked")
6 MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
7 for (int i = 0; i < cache.length; i++) {
8 //每一个节点是ubPageMemoryRegionCache对象
9 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
10 }
11 return cache;
12 } else {
13 return null;
14 }
15}
16
这里首先创建了MemoryRegionCache, 长度是我们刚才分析过的32
然后通过for循环, 为数组赋值, 赋值的对象是SubPageMemoryRegionCache类型的, SubPageMemoryRegionCache就是MemoryRegionCache类型的子类, 同样也是一个缓存对象, 构造方法中, cacheSize, 就是其中缓存对象的数量, 如果是tiny类型就是512, sizeClass, 代表其类型, 比如tiny, small或者normal
再简单跟到其构造方法:
1
2
3
4 1SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
2 super(size, sizeClass);
3}
4
这里调用了父类的构造方法, 我们继续跟进去:
1
2
3
4
5
6
7
8 1MemoryRegionCache(int size, SizeClass sizeClass) {
2 //size会进行规格化
3 this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
4 //队列大小
5 queue = PlatformDependent.newFixedMpscQueue(this.size);
6 this.sizeClass = sizeClass;
7 }
8
首先会对其进行规格化, 其实就是查找大于等于当前size的2的幂次方的数, 这里如果是512那么规格化之后还是512, 然后初始化一个队列, 队列大小就是传入的大小, 如果是tiny, 这里大小就是512
最后并保存其类型
这里我们不难看出, 其实每个缓存的对象, 里面是通过一个队列保存的, 有关缓存队列和ByteBuf之间的逻辑, 后面的小节会进行剖析
从上面剖析我们不难看出, PoolThreadCache中维护了三种类型的缓存数组, 每个缓存数组中的每个值中, 又通过一个队列进行对象的存储
5-4-2
当然这里只举了Direct类型的对象关系, heap类型其实都是一样的, 这里不再赘述
这一小节逻辑较为复杂, 同学们可以自己在源码中跟踪一遍加深印象
上一节: 缓冲区分配器
下一节: directArena分配缓冲区概述