Netty源码分析第五章: pipeline
第五节: 传播outBound事件
了解了inbound事件的传播过程, 对于学习outbound事件传输的流程, 也不会太困难
在我们业务代码中, 有可能使用wirte方法往写数据:
1
2
3
4 1public void channelActive(ChannelHandlerContext ctx) throws Exception {
2 ctx.channel().write("test data");
3}
4
当然, 直接调用write方法是不能往对方channel中写入数据的, 因为这种方式只能写入到缓冲区, 还要调用flush方法才能将缓冲区数据刷到channel中, 或者直接调用writeAndFlush方法, 有关逻辑, 我们会在后面章节中详细讲解, 这里只是以wirte方法为例为了演示outbound事件的传播的流程
这里我们同样给出两种写法:
1
2
3
4
5
6
7 1public void channelActive(ChannelHandlerContext ctx) throws Exception {
2 //写法1
3 ctx.channel().write("test data");
4 //写法2
5 ctx.write("test data");
6}
7
这两种写法有什么区别, 我们首先跟到第一种写法中去:
1
2 1ctx.channel().write("test data");
2
这里获取ctx所绑定的channel
我们跟到AbstractChannel的write方法中:
1
2
3
4 1public ChannelFuture write(Object msg) {
2 return pipeline.write(msg);
3}
4
这里pipeline是DefaultChannelPipeline
跟到其write方法中:
1
2
3
4
5 1public final ChannelFuture write(Object msg) {
2 //从tail节点开始(从最后的节点往前写)
3 return tail.write(msg);
4}
5
这里调用
tail节点
write方法
, 这里我们应该能分析到
, outbound事件
, 是通过
tail节点开始往上传播的
, 带着这点猜想
, 我们继往下看
其实
tail节点并没有重写
write方法
, 最终会调用其父类
AbstractChannelHandlerContext的
write方法
AbstractChannelHandlerContext
的
write
方法:
1
2
3
4 1public ChannelFuture write(Object msg) {
2 return write(msg, newPromise());
3}
4
我们看到这里有个
newPromise()这个方法
, 这里是创建一个
Promise对象
, 有关
Promise的相关知识我们会在以后的章节剖析
我们继续跟
write:
1
2
3
4
5
6 1public ChannelFuture write(final Object msg, final ChannelPromise promise) {
2 //代码省略
3 write(msg, false, promise);
4 return promise;
5}
6
继续跟
write:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 1private void write(Object msg, boolean flush, ChannelPromise promise) {
2 AbstractChannelHandlerContext next = findContextOutbound();
3 final Object m = pipeline.touch(msg, next);
4 EventExecutor executor = next.executor();
5 if (executor.inEventLoop()) {
6 if (flush) {
7 next.invokeWriteAndFlush(m, promise);
8 } else {
9 //没有调flush
10 next.invokeWrite(m, promise);
11 }
12 } else {
13 AbstractWriteTask task;
14 if (flush) {
15 task = WriteAndFlushTask.newInstance(next, m, promise);
16 } else {
17 task = WriteTask.newInstance(next, m, promise);
18 }
19 safeExecute(executor, task, promise, m);
20 }
21}
22
这里跟我们上一小节剖析过
channelRead方法有点类似
, 但是事件传输的方向有所不同
, 这里
findContextOutbound()是获取上一个标注
outbound事件的
HandlerContext
跟到findContextOutbound中:
1
2
3
4
5
6
7
8 1private AbstractChannelHandlerContext findContextOutbound() {
2 AbstractChannelHandlerContext ctx = this;
3 do {
4 ctx = ctx.prev;
5 } while (!ctx.outbound);
6 return ctx;
7}
8
这里的逻辑我们似曾相识
, 跟我们上一小节的
findContextInbound()方法有点像
, 只是过程是反过来的
在这里
, 会找到当前
context的上一个节点
, 如果标注的事件不是
outbound事件
, 则继续往上找
, 意思就是找到上一个标注
outbound事件的节点
回到
write方法
:
1
2 1AbstractChannelHandlerContext next = findContextOutbound();
2
这里将找到节点赋值到
next属性中
因为我们之前分析的
write事件是从
tail节点传播的
, 所以上一个节点就有可能是用户自定的
handler所属的
context
然后判断是否为当前
eventLoop线程
, 如果是不是
, 则封装成
task异步执行
, 如果不是
, 则继续判断是否调用了
flush方法
, 因为我们这里没有调用
, 所以会执行到
next.invokeWrite(m, promise),
我们继续跟
invokeWrite:
1
2
3
4
5
6
7
8 1private void invokeWrite(Object msg, ChannelPromise promise) {
2 if (invokeHandler()) {
3 invokeWrite0(msg, promise);
4 } else {
5 write(msg, promise);
6 }
7}
8
这里会判断当前
handler的状态是否是添加状态
, 这里返回的是
true, 将会走到
invokeWrite0(msg, promise)这一步
继续跟
invokeWrite0:
1
2
3
4
5
6
7
8
9 1private void invokeWrite0(Object msg, ChannelPromise promise) {
2 try {
3 //调用当前handler的wirte()方法
4 ((ChannelOutboundHandler) handler()).write(this, msg, promise);
5 } catch (Throwable t) {
6 notifyOutboundHandlerException(t, promise);
7 }
8}
9
这里的逻辑也似曾相识
, 调用了当前节点包装的
handler的
write方法
, 如果用户没有重写
write方法
, 则会交给其父类处理
我们跟到
ChannelOutboundHandlerAdapter的
write方法中看
:
1
2
3
4 1public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
2 ctx.write(msg, promise);
3}
4
这里调用了当前
ctx的
write方法
, 这种写法和我们小节开始的写法是相同的
, 我们回顾一下
:
1
2
3
4
5
6
7 1public void channelActive(ChannelHandlerContext ctx) throws Exception {
2 //写法1
3 ctx.channel().write("test data");
4 //写法2
5 ctx.write("test data");
6}
7
我们跟到其
write方法中
, 这里走到的是
AbstractChannelHandlerContext类的
write方法
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 1private void write(Object msg, boolean flush, ChannelPromise promise) {
2 AbstractChannelHandlerContext next = findContextOutbound();
3 final Object m = pipeline.touch(msg, next);
4 EventExecutor executor = next.executor();
5 if (executor.inEventLoop()) {
6 if (flush) {
7 next.invokeWriteAndFlush(m, promise);
8 } else {
9 //没有调flush
10 next.invokeWrite(m, promise);
11 }
12 } else {
13 AbstractWriteTask task;
14 if (flush) {
15 task = WriteAndFlushTask.newInstance(next, m, promise);
16 } else {
17 task = WriteTask.newInstance(next, m, promise);
18 }
19 safeExecute(executor, task, promise, m);
20 }
21}
22
又是我们所熟悉逻辑
, 找到当前节点的上一个标注事件为
outbound事件的节点
, 继续执行
invokeWrite方法
, 根据之前的剖析
, 我们知道最终会执行到上一个
handler的
write方法中
走到这里已经不难理解
, ctx.channel().write("test data")其实是从
tail节点开始传播写事件
, 而
ctx.write("test data")是从自身开始传播写事件
所以
, 在
handler中如果重写了
write方法要传递
write事件
, 一定采用
ctx.write("test data")这种方式或者交给其父类处理处理
, 而不能采用
ctx.channel().write("test data")这种方式
, 因为会造成每次事件传输到这里都会从
tail节点重新传输
, 导致不可预知的错误
如果用代码中没有重写
handler的
write方法
, 则事件会一直往上传输
, 当传输完所有的
outbound节点之后
, 最后会走到
head节点的
wirte方法中
我们跟到
HeadContext的
write方法中
:
1
2
3
4 1public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
2 unsafe.write(msg, promise);
3}
4
我们看到
write事件最终会流向这里
, 通过
unsafe对象进行最终的写操作
有关
inbound事件和
outbound事件的传输
, 可通过下图进行说明
:
4-5-1
上一节: 传播inbound事件
下一节: 传播异常事件