Netty源码分析第1章(Netty启动流程)—->第4节: 注册多路复用

释放双眼,带上耳机,听听看~!

 

Netty源码分析第一章:Netty启动流程

** **

第四节:注册多路复用

 

回顾下以上的小节
,
我们知道了
channel
的的创建和初始化过程
,
那么
channel
是如何注册到
selector
中的呢
?
我们继续分析

回到上一小节的代码
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1final ChannelFuture initAndRegister() {
2    Channel channel = null;
3    try {
4        //创建channel
5        channel = channelFactory.newChannel();
6        //初始化channel
7        init(channel);
8    } catch (Throwable t) {
9        //忽略非关键代码
10    }
11    //注册channel
12    ChannelFuture regFuture = config().group().register(channel);
13    //忽略非关键代码
14    return regFuture;
15}
16

我们讲完创建
channel和初始化
channel的关键步骤
, 我们继续跟注册
channel的步骤:


1
2
1ChannelFuture regFuture = config().group().register(channel);
2

其中
, 重点关注下
register(channel)这个方法
, 这个方法最终会调用到
AbstractChannel中内部类
AbstractUnsafe的
register()方法
, 具体如何调用到这个方法
, 可以简单带大家捋一下

首先看下
config()方法
, 由于是
ServerBootstrap调用的
, 所以我们跟进去
:


1
2
3
4
1public final ServerBootstrapConfig config() {
2    return config;
3}
4

返回的
config是
ServerBootrap的成员变量
config:


1
2
1private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
2

 

 

跟到
ServerBootstrapConfig的构造方法
:


1
2
3
4
1ServerBootstrapConfig(ServerBootstrap bootstrap) {
2    super(bootstrap);
3}
4

继续跟到其父类
AbstractBootstrapConfig的构造方法
:


1
2
3
4
1protected AbstractBootstrapConfig(B bootstrap) {
2    this.bootstrap = ObjectUtil.checkNotNull(bootstrap, "bootstrap");
3}
4

我们发现我们创建的
ServerBootstrap作为参数初始化了其成员变量
bootstrap

 

回到initAndRegister()方法:

config()返回的是ServerBootstrapConfig对象

再继续跟到

group()方法
:


1
2
3
4
1public final EventLoopGroup group() {
2    return bootstrap.group();
3}
4

这里调用
Bootstrap的
group()方法:


1
2
3
4
1public final EventLoopGroup group() {
2    return group;
3}
4

这里返回了
AbstractBootstrap的成员变量
group, 我们回顾下第一小节
, 还记得
AbstractBootstrap的
group(EventLoopGroup group)方法吗
?


1
2
3
4
5
1public B group(EventLoopGroup group) {
2    this.group = group;
3    return (B) this;
4}
5

group(EventLoopGroup group)
方法初始化了我们
boss线程
, 而
group()返回了
boss线程
, 也就是说 
config().group().register(channel) 中的
register()方法是
boss线程对象调用的
, 由于我们当初初始化的是
NioEventLoopGroup, 因此走的是
NioEventLoopGroup的父类的
MultithreadEventLoopGroup的
register()方法

跟到MultithreadEventLoopGroup的
register()方法:


1
2
3
4
1public ChannelFuture register(Channel channel) {
2    return next().register(channel);
3}
4

这里的代码看起来有点晕
, 没关系
, 以后会讲到
, 现在可以大概做个了解
, NioEventLoopGroup是个线程组
, 而
next()方法就是从线程组中选出一个线程
, 也就是
NioEventLoop线程
, 所以这里的
next()方法返回的是
NioEventLoop对象
, 其中
register(channel)最终会调用
NioEventLoop的父类
SingleThreadEventLoop的
register(channel)方法

跟到
SingleThreadEventLoop的
register(channel)方法:


1
2
3
4
1public ChannelFuture register(Channel channel) {
2    return register(new DefaultChannelPromise(channel, this));
3}
4

其中
DefaultChannelPromise类我们之后也会讲到

 

我们先跟到
register(new DefaultChannelPromise(channel, this)):


1
2
3
4
5
6
1public ChannelFuture register(final ChannelPromise promise) {
2    ObjectUtil.checkNotNull(promise, "promise");
3    promise.channel().unsafe().register(this, promise);
4    return promise;
5}
6

channel()
会返回我们初始化的
NioServerSocketChannel, unsafe()会返回我们创建
channel的时候初始化的
unsafe对象

跟进去看
AbstractChannel的
unsafe()的实现:


1
2
3
4
1public Unsafe unsafe() {
2    return unsafe;
3}
4

这里返回的
unsafe, 就是我们初始化
channel创建的
unsafe

回顾下第二小节
channel初始化的步骤
:


1
2
3
4
5
6
7
1protected AbstractChannel(Channel parent) {
2    this.parent = parent;
3    id = newId();
4    unsafe = newUnsafe();
5    pipeline = newChannelPipeline();
6}
7

我们看
unsafe的初始化
:unsafe=newUnsafe()

 

跟到
newUnsafe()中
, 我们之前讲过
NioServerSokectChannel的父类是
AbstractNioMessageChannel, 所以会调用到到
AbstractNioMessageChannel类中的
newUnsafe()

跟到AbstractNioMessageChannel类中的
newUnsafe():


1
2
3
4
1protected AbstractNioUnsafe newUnsafe() {
2    return new NioMessageUnsafe();
3}
4

我们看到这里创建了
NioMessageUnsafe()对象
, 所以在 
promise.channel().unsafe().register(
this, promise) 
代码中
, unsafe()是返回的
NioMessageUnsafe()对象
, 最后调用其父类
AbstractUnsafe(也就是
AbstractChannel的内部类
)的
register()方法
,

 

简单介绍下
unsafe接口
, unsafe顾名思义就是不安全的
, 因为很多对
channel的
io方法都定义在
unsafe中
, 所以
netty将其作为内部类进行封装
, 防止被外部直接调用
, unsafe接口是
Channel接口的内部接口
, unsafe的子类也分别封装在
Channel的子类中
, 比如我们现在剖析的
register()方法
, 就是封装在
AbstractChannel类的内部类
AbstractUnsafe中的方法
, 有关
Unsafe和
Channel的继承关系如下
:

Netty源码分析第1章(Netty启动流程)---->第4节: 注册多路复用

1-4-1

以上内容如果不明白没有关系
, 有关
NioEventLoop相关会在后面的章节讲到
, 目前我们只是了解是如何走到
AbstractUnsafe类的
register()即可

 

我们继续看看
register()方法
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1public final void register(EventLoop eventLoop, final ChannelPromise promise) {
2    //代码省略
3    //所有的复制操作, 都交给eventLoop处理(1)
4    AbstractChannel.this.eventLoop = eventLoop;
5    if (eventLoop.inEventLoop()) {
6        register0(promise);
7    } else {
8        try {
9            eventLoop.execute(new Runnable() {
10                @Override
11                public void run() {
12                    //做实际主注册(2)
13                    register0(promise);
14                }
15            });
16        } catch (Throwable t) {
17            //代码省略
18        }
19    }
20}
21

我们跟着注释的步骤继续走
, 第一步
, 绑定
eventLoop线程:


1
2
1AbstractChannel.this.eventLoop = eventLoop;
2

eventLoop

AbstractChannel的成员变量
, 有关
eventLoop, 我们会在绪章节讲到
, 这里我们只需要知道
, 每个
channel绑定唯一的
eventLoop线程
, eventLoop线程和
channel的绑定关系就是在这里展现的

 

再看第二步
, 做实际注册
:

我们先看
if判断
, if(eventLoop.inEventLoop())

 

这里是判断是不是
eventLoop线程
, 显然我们现在是
main()方法所在的线程
, 所以走的
else, eventLoop.execute()是开启一个
eventLoop线程
, 而
register0(promise)就是再开启线程之后
, 通过
eventLoop线程执行的
, 这里大家暂时作为了解

 

我们重点关注
register0(promise), 跟进去:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1private void register0(ChannelPromise promise) {
2    try {
3        //做实际的注册(1)
4        doRegister();
5        neverRegistered = false;
6        registered = true;
7        //触发事件(2)
8        pipeline.invokeHandlerAddedIfNeeded();
9        safeSetSuccess(promise);
10        //触发注册成功事件(3)
11        pipeline.fireChannelRegistered();
12        if (isActive()) {
13            if (firstRegistration) {
14                //传播active事件(4)
15                pipeline.fireChannelActive();
16            } else if (config().isAutoRead()) {
17                beginRead();
18            }
19        }
20    } catch (Throwable t) {
21        //省略代码
22    }
23}
24

我们重点关注
doRegister()这个方法

 

doRegister()
最终会调用
AbstractNioChannel的
doRegister()方法
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1protected void doRegister() throws Exception {
2    boolean selected = false;
3    for (;;) {
4        try {
5            //jdk底层的注册方法
6            //第一个参数为selector, 第二个参数表示不关心任何事件
7            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
8            return;
9        } catch (CancelledKeyException e) {
10            //省略代码
11        }
12    }
13}
14

我们终于看到和
java底层相关的方法了

跟到
javaChannel()的方法中
:


1
2
3
4
1protected SelectableChannel javaChannel() {
2    return ch;
3}
4

这个
ch, 就是本章第二小节创建
NioServerSocketChannel中初始化的
jdk底层
ServerSocketChannel

这里
register(eventLoop().selector, 0, this)方法中
eventLoop().selector, 是获得每一个
eventLoop绑定的唯一的
selector, 0代表这次只是注册
, 并不监听任何事件
, this是代表将自身
(NioEventLoopChannel)作为属性绑定在返回的
selectionKey当中
, 这个
selectionKey就是与每个
channel绑定的
jdk底层的
SelectionKey对象
, 熟悉
nio的小伙伴应该不会陌生
, 这里不再赘述

 

回到
register0(ChannelPromise promise)方法
, 我们看后续步骤

:

步骤
(2)是触发
handler的需要添加事件
, 事件传递的内容我们将在后续课程详细介绍
, 这里不必深究

步骤
(3)是触发注册成功事件
(3), 同上

步骤
(4)是传播
active事件
(4), 这里简单强调一下
, 这里的方法
pipeline.fireChannelActive()第一个注册是执行不到的
, 因为
isActive()会返回
false, 因为链路没完成

本小节梳理了有注册多路复用的相关逻辑
, 同学们可以跟着代码自己走一遍以加深印象

 

上一节: 服务端Channel的初始化

下一节: 绑定端口

 

给TA打赏
共{{data.count}}人
人已打赏
安全技术

用node.js做cluster,监听异常的邮件提醒服务

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索