Netty源码分析第4章(pipeline)—->第5节: 传播outbound事件

释放双眼,带上耳机,听听看~!

 

Netty源码分析第五章: pipeline

 

第五节: 传播outBound事件

了解了inbound事件的传播过程, 对于学习outbound事件传输的流程, 也不会太困难

在我们业务代码中, 有可能使用wirte方法往写数据:


1
2
3
4
1public void channelActive(ChannelHandlerContext ctx) throws Exception {
2    ctx.channel().write("test data");
3}
4

当然, 直接调用write方法是不能往对方channel中写入数据的, 因为这种方式只能写入到缓冲区, 还要调用flush方法才能将缓冲区数据刷到channel中, 或者直接调用writeAndFlush方法, 有关逻辑, 我们会在后面章节中详细讲解, 这里只是以wirte方法为例为了演示outbound事件的传播的流程

这里我们同样给出两种写法:


1
2
3
4
5
6
7
1public void channelActive(ChannelHandlerContext ctx) throws Exception {
2    //写法1
3    ctx.channel().write("test data");
4    //写法2
5    ctx.write("test data");
6}
7

这两种写法有什么区别, 我们首先跟到第一种写法中去:


1
2
1ctx.channel().write("test data");
2

这里获取ctx所绑定的channel

我们跟到AbstractChannel的write方法中:


1
2
3
4
1public ChannelFuture write(Object msg) {
2    return pipeline.write(msg);
3}
4

这里pipeline是DefaultChannelPipeline

跟到其write方法中:


1
2
3
4
5
1public final ChannelFuture write(Object msg) {
2    //从tail节点开始(从最后的节点往前写)
3    return tail.write(msg);
4}
5

这里调用
tail节点
write方法
, 这里我们应该能分析到
, outbound事件
, 是通过
tail节点开始往上传播的
, 带着这点猜想
, 我们继往下看

 

其实
tail节点并没有重写
write方法
, 最终会调用其父类
AbstractChannelHandlerContext的
write方法

AbstractChannelHandlerContext

write
方法:


1
2
3
4
1public ChannelFuture write(Object msg) {
2    return write(msg, newPromise());
3}
4

我们看到这里有个
newPromise()这个方法
, 这里是创建一个
Promise对象
, 有关
Promise的相关知识我们会在以后的章节剖析

我们继续跟
write:


1
2
3
4
5
6
1public ChannelFuture write(final Object msg, final ChannelPromise promise) {
2    //代码省略
3    write(msg, false, promise);
4    return promise;
5}
6

继续跟
write:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1private void write(Object msg, boolean flush, ChannelPromise promise) {
2    AbstractChannelHandlerContext next = findContextOutbound();
3    final Object m = pipeline.touch(msg, next);
4    EventExecutor executor = next.executor();
5    if (executor.inEventLoop()) {
6        if (flush) {
7            next.invokeWriteAndFlush(m, promise);
8        } else {
9            //没有调flush
10            next.invokeWrite(m, promise);
11        }
12    } else {
13        AbstractWriteTask task;
14        if (flush) {
15            task = WriteAndFlushTask.newInstance(next, m, promise);
16        }  else {
17            task = WriteTask.newInstance(next, m, promise);
18        }
19        safeExecute(executor, task, promise, m);
20    }
21}
22

这里跟我们上一小节剖析过
channelRead方法有点类似
, 但是事件传输的方向有所不同
, 这里
findContextOutbound()是获取上一个标注
outbound事件的
HandlerContext

跟到findContextOutbound中:


1
2
3
4
5
6
7
8
1private AbstractChannelHandlerContext findContextOutbound() {
2    AbstractChannelHandlerContext ctx = this;
3    do {
4        ctx = ctx.prev;
5    } while (!ctx.outbound);
6    return ctx;
7}
8

这里的逻辑我们似曾相识
, 跟我们上一小节的
findContextInbound()方法有点像
, 只是过程是反过来的

在这里
, 会找到当前
context的上一个节点
, 如果标注的事件不是
outbound事件
, 则继续往上找
, 意思就是找到上一个标注
outbound事件的节点

 

 

回到
write方法
:


1
2
1AbstractChannelHandlerContext next = findContextOutbound();
2

这里将找到节点赋值到
next属性中

因为我们之前分析的
write事件是从
tail节点传播的
, 所以上一个节点就有可能是用户自定的
handler所属的
context

 

然后判断是否为当前
eventLoop线程
, 如果是不是
, 则封装成
task异步执行
, 如果不是
, 则继续判断是否调用了
flush方法
, 因为我们这里没有调用
, 所以会执行到
next.invokeWrite(m, promise),

我们继续跟
invokeWrite:


1
2
3
4
5
6
7
8
1private void invokeWrite(Object msg, ChannelPromise promise) {
2    if (invokeHandler()) {
3        invokeWrite0(msg, promise);
4    } else {
5        write(msg, promise);
6    }
7}
8

这里会判断当前
handler的状态是否是添加状态
, 这里返回的是
true, 将会走到
invokeWrite0(msg, promise)这一步

继续跟
invokeWrite0:


1
2
3
4
5
6
7
8
9
1private void invokeWrite0(Object msg, ChannelPromise promise) {
2    try {
3        //调用当前handler的wirte()方法
4        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
5    } catch (Throwable t) {
6        notifyOutboundHandlerException(t, promise);
7    }
8}
9

这里的逻辑也似曾相识
, 调用了当前节点包装的
handler的
write方法
, 如果用户没有重写
write方法
, 则会交给其父类处理

我们跟到
ChannelOutboundHandlerAdapter的
write方法中看

:


1
2
3
4
1public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
2    ctx.write(msg, promise);
3}
4

这里调用了当前
ctx的
write方法
, 这种写法和我们小节开始的写法是相同的
, 我们回顾一下
:


1
2
3
4
5
6
7
1public void channelActive(ChannelHandlerContext ctx) throws Exception {
2    //写法1
3    ctx.channel().write("test data");
4    //写法2
5    ctx.write("test data");
6}
7

我们跟到其
write方法中
, 这里走到的是
AbstractChannelHandlerContext类的
write方法
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1private void write(Object msg, boolean flush, ChannelPromise promise) {
2    AbstractChannelHandlerContext next = findContextOutbound();
3    final Object m = pipeline.touch(msg, next);
4    EventExecutor executor = next.executor();
5    if (executor.inEventLoop()) {
6        if (flush) {
7            next.invokeWriteAndFlush(m, promise);
8        } else {
9            //没有调flush
10            next.invokeWrite(m, promise);
11        }
12    } else {
13        AbstractWriteTask task;
14        if (flush) {
15            task = WriteAndFlushTask.newInstance(next, m, promise);
16        }  else {
17            task = WriteTask.newInstance(next, m, promise);
18        }
19        safeExecute(executor, task, promise, m);
20    }
21}
22

又是我们所熟悉逻辑
, 找到当前节点的上一个标注事件为
outbound事件的节点
, 继续执行
invokeWrite方法
, 根据之前的剖析
, 我们知道最终会执行到上一个
handler的
write方法中

走到这里已经不难理解
, ctx.channel().write("test data")其实是从
tail节点开始传播写事件
, 而
ctx.write("test data")是从自身开始传播写事件

 

所以
, 在
handler中如果重写了
write方法要传递
write事件
, 一定采用
ctx.write("test data")这种方式或者交给其父类处理处理
, 而不能采用
ctx.channel().write("test data")这种方式
, 因为会造成每次事件传输到这里都会从
tail节点重新传输
, 导致不可预知的错误

 

 

如果用代码中没有重写
handler的
write方法
, 则事件会一直往上传输
, 当传输完所有的
outbound节点之后
, 最后会走到
head节点的
wirte方法中

我们跟到
HeadContext的
write方法中

:


1
2
3
4
1public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
2    unsafe.write(msg, promise);
3}
4

我们看到
write事件最终会流向这里
, 通过
unsafe对象进行最终的写操作

 

有关
inbound事件和
outbound事件的传输
, 可通过下图进行说明
:

Netty源码分析第4章(pipeline)---->第5节: 传播outbound事件

4-5-1

 

上一节: 传播inbound事件

下一节: 传播异常事件

 

给TA打赏
共{{data.count}}人
人已打赏
安全技术

用node.js做cluster,监听异常的邮件提醒服务

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索