Netty源码分析第2章(NioEventLoop)—->第4节: NioEventLoop线程的启动

释放双眼,带上耳机,听听看~!

 

Netty源码分析第二章: NioEventLoop

** **

第四节: NioEventLoop线程的启动

 

之前的小节我们学习了
NioEventLoop
的创建以及线程分配器的初始化
,
那么
NioEventLoop
是如何开启的呢
,
我们这一小节继续学习

NioEventLoop
的开启方法在其父类
SingleThreadEventExecutor中的
execute(Runnable task)方法中
, 我们跟到这个方法
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1@Override
2public void execute(Runnable task) {
3    if (task == null) {
4        throw new NullPointerException("task");
5    }
6    //判断当前线程是不是eventLoop线程
7    boolean inEventLoop = inEventLoop();
8    //如果是eventLoop线程
9    if (inEventLoop) {
10        addTask(task);
11    } else {
12        //不是eventLoop线程启动线程
13        startThread();
14        //添加task
15        addTask(task);
16        if (isShutdown() && removeTask(task)) {
17            reject();
18        }
19    }
20    if (!addTaskWakesUp && wakesUpForTask(task)) {
21        wakeup(inEventLoop);
22    }
23}
24

这个方法传入一个
Runnble对象
, 也就是一个任务

首先
boolean inEventLoop = inEventLoop()方法会判断是不是
NioEventLoop线程

跟进
 inEventLoop()方法:


1
2
3
4
5
1@Override
2public boolean inEventLoop() {
3    return inEventLoop(Thread.currentThread());
4}
5

这里
inEventLoop(Thread.currentThread())方法传入了当前线程对象
, 这个方法会调用当前类的
inEventLoop(Thread thread)方法

跟进
inEventLoop(Thread thread)方法:


1
2
3
4
5
1@Override
2public boolean inEventLoop(Thread thread) {
3    return thread == this.thread;
4}
5

我们看到判断的依据是当前线程对象是不是
NioEventLoop绑定的线程对象
, 这里我们会想到开启线程肯定会为
NioEventLoop绑定一个线程对象
, 如果判断当前线程对象不是当前
NioEventLoop绑定的线程对象
, 说明执行此方法的线程不是当前
NioEventLoop线程
, 那么这个线程如何初始化的
, 后面我们会讲到
, 我们继续看
execute(Runnable task)方法
:

如果是
NioEventLoop线程
, 则会通过
addTask(task)添加任务
, 通过
NioEventLoop异步执行
, 那么这个
task是什么时候执行的
, 同样后面会讲到

跟一下
addTask(task):


1
2
3
4
5
6
7
8
9
10
1protected void addTask(Runnable task) {
2    if (task == null) {
3        throw new NullPointerException("task");
4    }
5    //如果添加不成功
6    if (!offerTask(task)) {
7        reject(task);
8    }
9}
10

这里
offerTask(task)代表添加一个
task, 跟进去
:


1
2
3
4
5
6
7
8
1final boolean offerTask(Runnable task) {
2    if (isShutdown()) {
3        reject();
4    }
5    //往taskQ中添加一个task
6    return taskQueue.offer(task);
7}
8

我们看到
taskQueue.offer(task)将一个
task添加到任务队列
, 而这个任务队列
taskQueue就是我们
NioEventLoop初始化的时候与
NioEventLoop唯一绑定的任务队列

回顾一下初始构造方法
:


1
2
3
4
5
6
7
8
9
10
11
1protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
2                                    boolean addTaskWakesUp, int maxPendingTasks,
3                                    RejectedExecutionHandler rejectedHandler) {
4    super(parent);
5    this.addTaskWakesUp = addTaskWakesUp;
6    this.maxPendingTasks = Math.max(16, maxPendingTasks);
7    this.executor = ObjectUtil.checkNotNull(executor, "executor");
8    taskQueue = newTaskQueue(this.maxPendingTasks);
9    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
10}
11

在这里通过 
taskQueue = newTaskQueue(
this.maxPendingTasks) 创建了
taskQueue

回到
execute(Runnable task)方法中
, 我们继续往下看
:

如果不是
NioEventLoop线程我们通过
startThread()开启一个
NioEventLoop线程

跟到
startThread()之前
, 我们先继续往下走
:

开启
NioEventLoop线程之后
, 又通过
addTask(task)往
taskQueue添加任务

 

最后我们注意有这么一段代码
:


1
2
3
4
1if (!addTaskWakesUp && wakesUpForTask(task)) {
2    wakeup(inEventLoop);
3}
4

addTaskWakesUp
代表添加
task之后
, NioEventLoop的
select()操作是不是要唤醒
, 这个属性是在初始化
NioEventLoop的时候传入的
, 大家可以回顾下
, 默认是
false, 这里
!addTaskWakesUp就是需要唤醒
, wakesUpForTask(task)与
addTaskWakesUp意义相同
, 默认是
true, 可以看代码
:


1
2
3
4
1protected boolean wakesUpForTask(Runnable task) {
2    return true;
3}
4

这里恒为
true, 所以这段代码就是添加
task时需要通过
wakeup(inEventLoop)唤醒
, 这样
NioEventLoop在做
select()操作时如果正在阻塞则立刻唤醒
, 然后执行任务队列的
task

回到
execute(Runnable task)方法中我们跟进开启线程的
startThread()方法中
:


1
2
3
4
5
6
7
8
9
10
1private void startThread() {
2    //判断线程是否启动, 未启动则启动
3    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
4        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
5            //当前线程未启动, 则启动
6            doStartThread();
7        }
8    }
9}
10

前面的判断是判断当前
NioEventLoop线程是否启动
, 如果未启动
, 则通过
doStartThread()方法启动
, 我们第一次执行
execute(Runnable task)线程是未启动的
, 所以会执行
doStartThread(), 后续该线程则不会再执行
doStartThread()方法

我们跟进
doStartThread()方法中
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1private void doStartThread() {
2    assert thread == null;
3    //线程执行器执行线程(所有的eventLoop共用一个线程执行器)
4    executor.execute(new Runnable() {
5        @Override
6        public void run() {
7            //将当前线程复制给属性
8            thread = Thread.currentThread();
9            if (interrupted) {
10                thread.interrupt();
11            }
12            boolean success = false;
13            updateLastExecutionTime();
14            try {
15                //开始轮询
16                SingleThreadEventExecutor.this.run();
17                success = true;
18            } catch (Throwable t) {
19                logger.warn("Unexpected exception from an event executor: ", t);
20            } finally {
21               //代码省略
22            }
23        }
24    });
25}
26

我们重点关注
executor.execute()这个方法
, 其中
executor就是我们创建
NioEventLoop的线程器
, execute()就是开启一个线程

回顾下
execute()方法
:


1
2
3
4
5
1public void execute(Runnable command) {
2    //起一个线程
3    threadFactory.newThread(command).start();
4}
5

我们看到通过线程工厂开启一个线程
, 由于前面的小节已经剖析
, 这里不再赘述

 

开启线程则执行
Runnble类中的
run()方法
, 我们看到在
run()方法里通过 
thread = Thread.currentThread() 将新开启的线程对象赋值
NioEventLoop的
thread的属性
, 这样就可以通过线程对象的判断
, 来确定是不是
NioEventLoop线程了

 

后面我们看到 
SingleThreadEventExecutor.
this.run() 
, 这里
this, 就是当前
NioEventLoop对象
, 而这里的
run()方法
, 就是
NioEventLoop中的
run()方法
, 在这个
run()方法中
, 真正开始了
selector的轮询工作
, 对于
run()方法的详细剖析
, 我们会在之后的小节中进行

 

刚才我们剖析了
NioEventLoop的启动方法
, 那么根据我们的分析
, 就是第一次调用
NioEventLoop的
execute(Runnable task)方法的时候
, 则会开启
NioEventLoop线程
, 之后的调用只是往
taskQueue中添加任务
, 那么第一次是什么时候开启的呢
?这里我们要回顾上一章讲过的内容

 

上一章中我们讲过在
AbstractServerBootstrap中有个
initAndRegister()方法
, 这个方法主要用于
channel的初始化和注册
, 其中注册的代码为
:


1
2
1ChannelFuture regFuture = config().group().register(channel);
2

其中
group()我们剖析过是
Boss线程的
group, 我们剖析过其中的
register(channel)方法
:


1
2
3
4
1public ChannelFuture register(Channel channel) {
2    return next().register(channel);
3}
4

首先跟到
next()方法
:


1
2
3
4
1public EventLoop next() {
2    return (EventLoop) super.next();
3}
4

首先调用了其父类
MultithreadEventExecutorGroup的
next方法
, 跟进去
:


1
2
3
4
1public EventExecutor next() {
2    return chooser.next();
3}
4

这里
chooser, 就是初始化
NioEventLoopGroup的线程选择器
, 为此分配了不同的策略
, 这里不再赘述
, 通过这个方法
, 返回一个
NioEventLoop线程

 

回到
MultithreadEventLoopGroup类的
register()方法中
, next().register(channel)代表分配后的
NioEventLoop的
register()方法
, 这里会调用
NioEventLoop的父类
SingleThreadEventLoop类中的
register()方法

跟到
SingleThreadEventLoop类中的
register()方法:


1
2
3
4
1public ChannelFuture register(Channel channel) {
2    return register(new DefaultChannelPromise(channel, this));
3}
4

DefaultChannelPromise
是一个监听器
, 它会跟随
channel的读写进行监听
, 绑定传入的
channel和
NioEventLoop, 有关
Promise后面的章节会讲到

这里我们继续跟进
register(new DefaultChannelPromise(channel, this))


1
2
3
4
5
6
1public ChannelFuture register(final ChannelPromise promise) {
2    ObjectUtil.checkNotNull(promise, "promise");
3    promise.channel().unsafe().register(this, promise);
4    return promise;
5}
6

unsafe()
方法返回创建
channel初始化的
unsafe()对象
, 如果是
NioSeverSocketChannel, 则绑定
NioMessageUnsafe
对象
, 上一小节进行剖析过这里不再赘述

 

最终这个
unsafe对象会调用到
AbstractChannel的内部类
AbstractUnsafe中的
register()方法
, 这里
register(), 无论是客户端
channel和服务器
channel都会通过这个一个
register注册
, 在以后的客户端接入章节中我们会看到

这里我们继续看
register方法
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1public final void register(EventLoop eventLoop, final ChannelPromise promise) {
2    //代码省略
3    //所有的复制操作, 都交给eventLoop处理(1)
4    AbstractChannel.this.eventLoop = eventLoop;
5    if (eventLoop.inEventLoop()) {
6        register0(promise);
7    } else {
8        try {
9            eventLoop.execute(new Runnable() {
10                @Override
11                public void run() {
12                    //做实际主注册(2)
13                    register0(promise);
14                }
15            });
16        } catch (Throwable t) {
17            //代码省略
18        }
19    }
20}
21

这里我们上一小节分析过
, 不再陌生
, 这里只分析有关
NioEventLoop相关的内容

我们首先看到 
AbstractChannel.
this.eventLoop = eventLoop 
, 获取当前
channel的
NioEventLoop, 通过上一章的学习
, 我们知道每个
channel创建的时候会绑定一个
NioEventLoop

这里通过
eventLoop.inEventLoop()判断当前线程是否是
NioEventLoop线程
, inEventLoop()方法在前面的小节剖析过
, 这里不再赘述

 

如果是
NioEventLoop线程则通过
register0(promise)
方法做实际的注册
, 但是我们第一次执行注册方法的时候
, 如果是服务器
channel是则是由
server的用户线程执行的
, 如果是客户端
channel, 则是由
Boss线程执行的
, 所以走到这里均不是当前
channel的
NioEventLoop的线程
, 于是会走到下面的
eventLoop.execute()方法中

 

eventLoop.execute()
上一小节剖析过
, 就是将
task添加到
taskQueue中并且开启器
NioEventLoop线程
, 所以
, 在这里就开启了
NioEventLoop线程
, 有关开启步骤
, 可以通过上一小节内容进行回顾

 

这里注意一点
, 有的资料会讲第一次开启
NioEventLoop线程是在
AbstractBootstrap的
doBind0(regFuture, channel, localAddress, promise)方法中开启的
, 个人经过
debug和分析
, 实际上并不是那样的
, 希望大家不要被误导

 

 

简单看下
doBind0(regFuture, channel, localAddress, promise)方法
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
2    channel.eventLoop().execute(new Runnable() {
3        @Override
4        public void run() {
5            if (regFuture.isSuccess()) {
6                //绑定端口
7                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
8            } else {
9                promise.setFailure(regFuture.cause());
10            }
11        }
12    });
13}
14

这里虽然调用了
eventLoop的
execute()方法
, 但是
eventLoop线程在注册期间已经启动
, 所以这里不会重复启动
, 只会将任务添加到
taskQueue中

 

其实这里我们也能够看出
, 其实绑定端口的相关操作
, 同样是也是
eventLoop线程中执行的

 

上一节: 初始化线程选择器

下一节: 优化selector

 

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全技术

Spring+Log4j+ActiveMQ实现远程记录日志——实战+分析

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索