Netty源码分析第7章(编码器和写数据)—->第3节: 写buffer队列
Netty源码分析七章: 编码器和写数据
第三节: 写buffer队列
之前的小节我们介绍过, writeAndFlush方法其实最终会调用write和flush方法
write方法最终会传递到head节点, 调用HeadContext的write方法:
1
2
3
4 1public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
2 unsafe.write(msg, promise);
3}
4
这里通过unsafe对象的write方法, 将消息写入到缓存中, 具体的执行逻辑, 我们在这个小节进行剖析
我们跟到AbstractUnsafe的write方法中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 1public final void write(Object msg, ChannelPromise promise) {
2 assertEventLoop();
3 //负责缓冲写进来的byteBuf
4 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
5 if (outboundBuffer == null) {
6 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
7 ReferenceCountUtil.release(msg);
8 return;
9 }
10 int size;
11 try {
12 //非堆外内存转化为堆外内存
13 msg = filterOutboundMessage(msg);
14 size = pipeline.estimatorHandle().size(msg);
15 if (size < 0) {
16 size = 0;
17 }
18 } catch (Throwable t) {
19 safeSetFailure(promise, t);
20 ReferenceCountUtil.release(msg);
21 return;
22 }
23 //插入写队列
24 outboundBuffer.addMessage(msg, size, promise);
25}
26
首先看
ChannelOutboundBuffer outboundBuffer =
this.outboundBuffer
ChannelOutboundBuffer的功能就是缓存写入的ByteBuf
我们继续看try块中的
msg = filterOutboundMessage(msg)
这步的意义就是将非对外内存转化为堆外内存
filterOutboundMessage方法方法最终会调用AbstractNioByteChannel中的filterOutboundMessage方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1protected final Object filterOutboundMessage(Object msg) {
2 if (msg instanceof ByteBuf) {
3 ByteBuf buf = (ByteBuf) msg;
4 //是堆外内存, 直接返回
5 if (buf.isDirect()) {
6 return msg;
7 }
8 return newDirectBuffer(buf);
9 }
10 if (msg instanceof FileRegion) {
11 return msg;
12 }
13 throw new UnsupportedOperationException(
14 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
15}
16
首先判断msg是否byteBuf对象, 如果是, 判断是否堆外内存, 如果是堆外内存, 则直接返回, 否则, 通过newDirectBuffer(buf)这种方式转化为堆外内存
回到write方法中:
outboundBuffer.addMessage(msg, size, promise)将已经转化为堆外内存的msg插入到写队列
我们跟到addMessage方法当中, 这是ChannelOutboundBuffer中的方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1public void addMessage(Object msg, int size, ChannelPromise promise) {
2 Entry entry = Entry.newInstance(msg, size, total(msg), promise);
3 if (tailEntry == null) {
4 flushedEntry = null;
5 tailEntry = entry;
6 } else {
7 Entry tail = tailEntry;
8 tail.next = entry;
9 tailEntry = entry;
10 }
11 if (unflushedEntry == null) {
12 unflushedEntry = entry;
13 }
14 incrementPendingOutboundBytes(size, false);
15}
16
首先通过
Entry.newInstance(msg, size, total(msg), promise) 的方式将msg封装成entry
然后通过调整tailEntry, flushedEntry, unflushedEntry三个指针, 完成entry的添加
这三个指针均是ChannelOutboundBuffer的成员变量
flushedEntry指向第一个被flush的entry
unflushedEntry指向第一个未被flush的entry
也就是说, 从flushedEntry到unflushedEntry之间的entry, 都是被已经被flush的entry
tailEntry指向最后一个entry, 也就是从unflushedEntry到tailEntry之间的entry都是没flush的entry
我们回到代码中:
创建了entry之后首先判断尾指针是否为空, 在第一次添加的时候, 均是空, 所以会将flushedEntry设置为null, 并且将尾指针设置为当前创建的entry
最后判断unflushedEntry是否为空, 如果第一次添加这里也是空, 所以这里将unflushedEntry设置为新创建的entry
第一次添加如下图所示
7-3-1
如果不是第一次调用write方法, 则会进入
if (tailEntry ==
null) 中else块:
Entry tail = tailEntry 这里tail就是当前尾节点
tail.next = entry 代表尾节点的下一个节点指向新创建的entry
tailEntry = entry 将尾节点也指向entry
这样就完成了添加操作, 其实就是将新创建的节点追加到原来尾节点之后
第二次添加
if (unflushedEntry ==
null) 会返回false, 所以不会进入if块
第二次添加之后指针的指向情况如下图所示:
7-3-4
以后每次调用write, 如果没有调用flush的话都会在尾节点之后进行追加
回到代码中, 看这一步incrementPendingOutboundBytes(size, false)
这步时统计当前有多少字节需要被写出, 我们跟到这个方法中:
1
2
3
4
5
6
7
8
9
10
11
12 1private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
2 if (size == 0) {
3 return;
4 }
5 //TOTAL_PENDING_SIZE_UPDATER当前缓冲区里面有多少待写的字节
6 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
7 //getWriteBufferHighWaterMark() 最高不能超过64k
8 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
9 setUnwritable(invokeLater);
10 }
11}
12
看这一步:
1
2 1long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size)
2
TOTAL_PENDING_SIZE_UPDATER表示当前缓冲区还有多少待写的字节, addAndGet就是将当前的ByteBuf的长度进行累加, 累加到newWriteBufferSize中
在继续看判断
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark())
channel.config().getWriteBufferHighWaterMark() 表示写buffer的高水位值, 默认是64k, 也就是说写buffer的最大长度不能超过64k
如果超过了64k, 则会调用setUnwritable(invokeLater)方法设置写状态
我们跟到setUnwritable(invokeLater)方法中:
1
2
3
4
5
6
7
8
9
10
11
12
13 1private void setUnwritable(boolean invokeLater) {
2 for (;;) {
3 final int oldValue = unwritable;
4 final int newValue = oldValue | 1;
5 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
6 if (oldValue == 0 && newValue != 0) {
7 fireChannelWritabilityChanged(invokeLater);
8 }
9 break;
10 }
11 }
12}
13
这里通过自旋和cas操作, 传播一个ChannelWritabilityChanged事件, 最终会调用handler的channelWritabilityChanged方法进行处理
以上就是写buffer的相关逻辑
上一节: MessageToByteEncoder
下一节: 刷新buffer队列
posted on
2019-01-02 13:57 向南是个万人迷 阅读(
…) 评论(
…) 编辑 收藏