Netty源码分析第4章(pipeline)—->第7节: 前章节内容回顾

释放双眼,带上耳机,听听看~!

Netty源码分析第4章(pipeline)—->第7节: 前章节内容回顾

 

Netty源码分析第四章: pipeline

 

第七节: 前章节内容回顾

 

我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾

首先看两个问题:

1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法

2.客户端handler是什么时候被添加的?

首先看第一个问题:

1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法?

我们首先看这段代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
1public void read() {
2    //必须是NioEventLoop方法调用的, 不能通过外部线程调用
3    assert eventLoop().inEventLoop();
4    //服务端channel的config
5    final ChannelConfig config = config();
6    //服务端channel的pipeline
7    final ChannelPipeline pipeline = pipeline();
8    //处理服务端接入的速率
9    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
10    //设置配置
11    allocHandle.reset(config);
12    boolean closed = false;
13    Throwable exception = null;
14    try {
15        try {
16            do {
17                //创建jdk底层的channel
18                //readBuf用于临时承载读到链接
19                int localRead = doReadMessages(readBuf);
20                if (localRead == 0) {
21                    break;
22                }
23                if (localRead < 0) {
24                    closed = true;
25                    break;
26                }
27                //分配器将读到的链接进行计数
28                allocHandle.incMessagesRead(localRead);
29                //连接数是否超过最大值
30            } while (allocHandle.continueReading());
31        } catch (Throwable t) {
32            exception = t;
33        }
34        int size = readBuf.size();
35        //遍历每一条客户端连接
36        for (int i = 0; i < size; i ++) {
37            readPending = false;
38            //传递事件, 将创建NioSokectChannel进行传递
39            //最终会调用ServerBootstrap的内部类ServerBootstrapAcceptor的channelRead()方法
40pipeline.fireChannelRead(readBuf.get(i));
41        }
42        readBuf.clear();
43        allocHandle.readComplete();
44        pipeline.fireChannelReadComplete();
45        //代码省略
46    } finally {
47        //代码省略
48    }
49}
50

重点看pipeline.fireChannelRead(readBuf.get(i))

首先, 这里pipeline是服务端channel的pipeline, 也就是NioServerSocketChannel的pipeline

我们学习过pipeline之后, 对这种写法并不陌生, 就是传递channelRead事件, 这里通过传递channelRead事件走到了ServerBootstrapAcceptor的channelRead()方法, 说明在这步之前, ServerBootstrapAcceptor作为一个handler添加到了服务端channel的pipeline中, 那么这个handler什么时候添加的呢?

我们回顾下第一章, 初始化NioServerSocketChannel的时候, 调用了ServerBootstrap的init方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
1void init(Channel channel) throws Exception {
2    //获取用户定义的选项(1)
3    final Map<ChannelOption<?>, Object> options = options0();
4    synchronized (options) {
5        channel.config().setOptions(options);
6    }
7
8    //获取用户定义的属性(2)
9    final Map<AttributeKey<?>, Object> attrs = attrs0();
10    synchronized (attrs) {
11        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
12            @SuppressWarnings("unchecked")
13            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
14            channel.attr(key).set(e.getValue());
15        }
16    }
17    //获取channel的pipline(3)
18    ChannelPipeline p = channel.pipeline();
19    //work线程组(4)
20    final EventLoopGroup currentChildGroup = childGroup;
21    //用户设置的Handler(5)
22    final ChannelHandler currentChildHandler = childHandler;
23    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
24    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
25    //选项转化为Entry对象(6)
26    synchronized (childOptions) {
27        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
28    }
29    //属性转化为Entry对象(7)
30    synchronized (childAttrs) {
31        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
32    }
33    //添加服务端handler(8)
34    p.addLast(new ChannelInitializer<Channel>() {
35        //初始化channel
36        @Override
37        public void initChannel(Channel ch) throws Exception {
38            final ChannelPipeline pipeline = ch.pipeline();
39            ChannelHandler handler = config.handler();
40            if (handler != null) {
41                pipeline.addLast(handler);
42            }
43            ch.eventLoop().execute(new Runnable() {
44                @Override
45                public void run() {
46                    pipeline.addLast(new ServerBootstrapAcceptor(
47                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
48                }
49            });
50        }
51    });
52}
53

这个方法比较长, 我们重点关注第8步, 添加服务端channel, 这里的pipeline, 是服务服务端channel的pipeline, 也就是NioServerSocketChannel绑定的pipeline, 这里添加了一个ChannelInitializer类型的handler

我们看一下ChannelInitializer这个类的继承关系:


1
2
3
4
1public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
2    //省略类体
3}
4

我们看到其继承了ChannelInboundHandlerAdapter, 说明是一个inbound类型的handler

这里我们可能会想到, 添加完handler会执行handlerAdded, 然后再handlerAdded方法中做了添加ServerBootstrapAcceptor这个handler

但是, 实际上并不是这样的, 当程序执行到这里, 并没有马上执行handlerAdded, 我们紧跟addLast方法

最后会跟到DefualtChannelPipeline的一个addLast方法中去:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
2    final AbstractChannelHandlerContext newCtx;
3    synchronized (this) {
4        //判断handler是否被重复添加(1)
5        checkMultiplicity(handler);
6        //创建一个HandlerContext并添加到列表(2)
7        newCtx = newContext(group, filterName(name, handler), handler);
8
9        //添加HandlerContext(3)
10        addLast0(newCtx);
11
12        //是否已注册
13        if (!registered) {
14            newCtx.setAddPending();
15            callHandlerCallbackLater(newCtx, true);
16            return this;
17        }
18
19        EventExecutor executor = newCtx.executor();
20        if (!executor.inEventLoop()) {
21            newCtx.setAddPending();
22            //回调用户事件
23            executor.execute(new Runnable() {
24                @Override
25                public void run() {
26                    callHandlerAdded0(newCtx);
27                }
28            });
29            return this;
30        }
31    }
32    //回调添加事件(4)
33    callHandlerAdded0(newCtx);
34    return this;
35}
36

首先完成了handler的添加, 但是并没有马上执行回调

这里我们重点关注if (!registered)这个条件判断, 其实在注册完成, registered会变成true, 但是走到这一步的时候NioServerSockeChannel并没有完成注册(可以回顾第一章看注册在哪一步), 所以会进到if里并返回自身

我们重点关注callHandlerCallbackLater这个方法, 我们跟进去:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
2    assert !registered;
3    //判断是否已添加, 未添加, 进行添加, 已添加进行删除
4    PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
5    //获取第一个Callback任务
6    PendingHandlerCallback pending = pendingHandlerCallbackHead;
7    //如果第一个Callback任务为空
8    if (pending == null) {
9        //将第一个任务设置为刚创建的任务
10        pendingHandlerCallbackHead = task;
11    } else {
12        while (pending.next != null) {
13            pending = pending.next;
14        }
15        pending.next = task;
16    }
17}
18

因我们调用这个方法的时候added传的true, 所以PendingHandlerCallback task赋值为new PendingHandlerAddedTask(ctx)

PendingHandlerAddedTask这个类, 我们从名字可以看出, 这是一个handler添加的延迟任务, 用于执行handler延迟添加的操作, 同样也对应一个名字为PendingHandlerRemovedTask的类, 用于执行延迟删除handler的操作, 这两个类都继承抽象类PendingHandlerCallback

 

我们看PendingHandlerAddedTask类构造方法:


1
2
3
4
1PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
2    super(ctx);
3}
4

这里调用了父类的构造方法, 再跟进去:


1
2
3
4
1PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
2    this.ctx = ctx;
3}
4

在父类中, 保存了要添加的context, 也就是ChannelInitializer类型的包装类

回到callHandlerCallbackLater方法中:


1
2
1PendingHandlerCallback pending = pendingHandlerCallbackHead;
2

这表示获取第一个PendingHandlerCallback的任务, 其实PendingHandlerCallback是一个单向链表, 自身维护一个PendingHandlerCallback类型的next, 指向下一个任务, 在DefaultChannelPipeline这个类中, 定义了个PendingHandlerCallback类型的引用pendingHandlerCallbackHead, 用来指向延迟回调任务的中的第一个任务

 

之后判断这个任务是为空, 如果是第一次添加handler, 那么这里就是空, 所以将第一个任务赋值为我们刚创建的添加任务

如果不是第一次添加handler, 则将我们新创建的任务添加到链表的尾部, 因为这里我们是第一次添加, 所以第一个回调任务就指向了我们创建的添加handler的任务

完成这一系列操作之后, addLast方法返归, 此时并没有完成添加操作

而什么时候完成添加操作的呢?

在服务端channel注册时候的会走到AbstractChannel的register0方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1private void register0(ChannelPromise promise) {
2    try {
3        //做实际的注册(1)
4        doRegister();
5        neverRegistered = false;
6        registered = true;
7        //触发事件(2)
8        pipeline.invokeHandlerAddedIfNeeded();
9        safeSetSuccess(promise);
10        //触发注册成功事件(3)
11        pipeline.fireChannelRegistered();
12        if (isActive()) {
13            if (firstRegistration) {
14                //传播active事件(4)
15                pipeline.fireChannelActive();
16            } else if (config().isAutoRead()) {
17                beginRead();
18            }
19        }
20    } catch (Throwable t) {
21        //省略代码
22    }
23}
24

重点关注第二步pipeline.invokeHandlerAddedIfNeeded(), 这里已经通过doRegister()方法完成了实际的注册, 我们跟到该方法中:


1
2
3
4
5
6
7
8
1final void invokeHandlerAddedIfNeeded() {
2    assert channel.eventLoop().inEventLoop();
3    if (firstRegistration) {
4        firstRegistration = false;
5        callHandlerAddedForAllHandlers();
6    }
7}
8

这里会判断是否第一次注册, 这里返回true, 然后会执行callHandlerAddedForAllHandlers()方法, 我们跟进去:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1private void callHandlerAddedForAllHandlers() {
2    final PendingHandlerCallback pendingHandlerCallbackHead;
3    synchronized (this) {
4        assert !registered;
5        registered = true;
6        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
7        this.pendingHandlerCallbackHead = null;
8    }
9    //获取task
10    PendingHandlerCallback task = pendingHandlerCallbackHead;
11    while (task != null) {
12        //执行添加handler方法
13        task.execute();
14        task = task.next;
15    }
16}
17

这里拿到第一个延迟执行handler添加的task其实就是我们之前剖析过的, 延迟执行handler添加的task, 就是PendingHandlerAddedTask对象

在while循环中, 通过执行execute()方法将handler添加

我们跟到PendingHandlerAddedTask的execute()方法中:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1void execute() {
2    //获取当前eventLoop线程
3    EventExecutor executor = ctx.executor();
4    //是当前执行的线程
5    if (executor.inEventLoop()) {
6        callHandlerAdded0(ctx);
7    } else {
8        try {
9            //添加到队列
10            executor.execute(this);
11        } catch (RejectedExecutionException e) {
12            //代码省略
13        }
14    }
15}
16

终于在这里, 我们看到了执行回调的方法

再回到init方法中:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
1void init(Channel channel) throws Exception {
2    //获取用户定义的选项(1)
3    final Map<ChannelOption<?>, Object> options = options0();
4    synchronized (options) {
5        channel.config().setOptions(options);
6    }
7
8    //获取用户定义的属性(2)
9    final Map<AttributeKey<?>, Object> attrs = attrs0();
10    synchronized (attrs) {
11        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
12            @SuppressWarnings("unchecked")
13            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
14            channel.attr(key).set(e.getValue());
15        }
16    }
17    //获取channel的pipline(3)
18    ChannelPipeline p = channel.pipeline();
19    //work线程组(4)
20    final EventLoopGroup currentChildGroup = childGroup;
21    //用户设置的Handler(5)
22    final ChannelHandler currentChildHandler = childHandler;
23    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
24    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
25    //选项转化为Entry对象(6)
26    synchronized (childOptions) {
27        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
28    }
29    //属性转化为Entry对象(7)
30    synchronized (childAttrs) {
31        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
32    }
33    //添加服务端handler(8)
34    p.addLast(new ChannelInitializer<Channel>() {
35        //初始化channel
36        @Override
37        public void initChannel(Channel ch) throws Exception {
38            final ChannelPipeline pipeline = ch.pipeline();
39            ChannelHandler handler = config.handler();
40            if (handler != null) {
41                pipeline.addLast(handler);
42            }
43            ch.eventLoop().execute(new Runnable() {
44                @Override
45                public void run() {
46                    pipeline.addLast(new ServerBootstrapAcceptor(
47                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
48                }
49            });
50        }
51    });
52}
53

我们继续看第8步添加服务端handler

因为这里的handler是ChannelInitializer, 所以完成添加之后会调用ChannelInitializer的handlerAdded方法

跟到handlerAdded方法:


1
2
3
4
5
6
7
1public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
2    //默认情况下, 会返回true
3    if (ctx.channel().isRegistered()) {
4        initChannel(ctx);
5    }
6}
7

因为执行到这步服务端channel已经完成注册, 所以会执行到initChannel方法

跟到initChannel方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
2    //这段代码是否被执行过
3    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
4        try {
5            initChannel((C) ctx.channel());
6        } catch (Throwable cause) {
7            exceptionCaught(ctx, cause);
8        } finally {
9            //调用之后会删除当前节点
10            remove(ctx);
11        }
12        return true;
13    }
14    return false;
15}
16

我们关注initChannel这个方法, 这个方法是在ChannelInitializer的匿名内部来实现的, 这里我们注意, 在initChannel方法执行完毕之后会调用remove(ctx)删除当前节点

我们继续跟进initChannel方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1@Override
2public void initChannel(Channel ch) throws Exception {
3    final ChannelPipeline pipeline = ch.pipeline();
4    ChannelHandler handler = config.handler();
5    if (handler != null) {
6        pipeline.addLast(handler);
7    }
8    ch.eventLoop().execute(new Runnable() {
9        @Override
10        public void run() {
11            pipeline.addLast(new ServerBootstrapAcceptor(
12                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
13        }
14    });
15}
16

这里首先添加用户自定义的handler, 这里如果用户没有定义, 则添加不成功, 然后, 会调用addLast将ServerBootstrapAcceptor这个handler添加了进去, 同样这个handler也继承了ChannelInboundHandlerAdapter, 在这个handler中, 重写了channelRead方法, 所以, 这就是第一个问题的答案

紧接着我们看第二个问题:

2.客户端handler是什么时候被添加的?

我们这里看ServerBootstrapAcceptor的channelRead方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1public void channelRead(ChannelHandlerContext ctx, Object msg) {
2    final Channel child = (Channel) msg;
3    //添加channelHadler, 这个channelHandler, 就是用户代码添加的ChannelInitializer
4    child.pipeline().addLast(childHandler);
5
6    //代码省略
7
8    try {
9        //work线程注册channel
10        childGroup.register(child).addListener(new ChannelFutureListener() {
11            //代码省略
12        });
13    } catch (Throwable t) {
14        forceClose(child, t);
15    }
16}
17

这里真相可以大白了, 服务端再创建完客户端channel之后, 将新创建的NioSocketChannel作为参数触发channelRead事件(可以回顾NioMessageUnsafe的read方法, 代码这里就不贴了), 所以这里的参数msg就是NioSocketChannel

拿到channel时候再将客户端的handler添加进去, 我们回顾客户端handler的添加过程:


1
2
3
4
5
6
7
8
9
1.childHandler(new ChannelInitializer<SocketChannel>() {
2    @Override
3    public void initChannel(SocketChannel ch) {
4        ch.pipeline().addLast(new StringDecoder());
5        ch.pipeline().addLast(new StringEncoder());
6        ch.pipeline().addLast(new ServerHandler());
7    }
8});
9

和服务端channel的逻辑一样, 首先会添加ChannelInitializer这个handler但是没有注册所以没有执行添加handler的回调, 将任务保存到一个延迟回调的task中

等客户端channel注册完毕, 会将执行添加handler的回调, 也就是handlerAdded方法, 在回调中执行initChannel方法将客户端handler添加进去, 然后删除ChannelInitializer这个handler

因为在服务端channel中这块逻辑已经进行了详细的剖析, 所以这边就不在赘述, 同学们可以自己跟进去走一遍流程

这里注意, 因为每创建一个NioSoeketChannel都会调用服务端ServerBootstrapAcceptor的channelRead方法, 所以这里会将每一个NioSocketChannel的handler进行添加

 

第四章总结

        本章剖析了事件传输的相关逻辑, 包括handler的添加, 删除, inbound和outbound以及异常事件的传输, 最后结合第一章和第三章, 剖析了服务端channel和客户端channel的添加过程, 同学们可以课后跟进源码, 将这些功能自己再走一遍以加深印象.其他的有关事件传输的逻辑, 可以结合这一章的知识点进行自行剖析

 

上一节: 传播异常事件

下一节: AbstractByteBuf

 

posted on
2019-01-01 12:36 向南是个万人迷 阅读(
…) 评论(
…) 编辑 收藏

转载于:https://www.cnblogs.com/xiangnan6122/p/10204523.html

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全经验

Lettuce 5.0.5 和 4.4.6 发布,线程安全的 Redis 客户端

2018-8-1 11:12:22

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索