Netty源码分析第二章: NioEventLoop
** **
第四节: NioEventLoop线程的启动
之前的小节我们学习了
NioEventLoop
的创建以及线程分配器的初始化
,
那么
NioEventLoop
是如何开启的呢
,
我们这一小节继续学习
NioEventLoop
的开启方法在其父类
SingleThreadEventExecutor中的
execute(Runnable task)方法中
, 我们跟到这个方法
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 1@Override
2public void execute(Runnable task) {
3 if (task == null) {
4 throw new NullPointerException("task");
5 }
6 //判断当前线程是不是eventLoop线程
7 boolean inEventLoop = inEventLoop();
8 //如果是eventLoop线程
9 if (inEventLoop) {
10 addTask(task);
11 } else {
12 //不是eventLoop线程启动线程
13 startThread();
14 //添加task
15 addTask(task);
16 if (isShutdown() && removeTask(task)) {
17 reject();
18 }
19 }
20 if (!addTaskWakesUp && wakesUpForTask(task)) {
21 wakeup(inEventLoop);
22 }
23}
24
这个方法传入一个
Runnble对象
, 也就是一个任务
首先
boolean inEventLoop = inEventLoop()方法会判断是不是
NioEventLoop线程
跟进
inEventLoop()方法:
1
2
3
4
5 1@Override
2public boolean inEventLoop() {
3 return inEventLoop(Thread.currentThread());
4}
5
这里
inEventLoop(Thread.currentThread())方法传入了当前线程对象
, 这个方法会调用当前类的
inEventLoop(Thread thread)方法
跟进
inEventLoop(Thread thread)方法:
1
2
3
4
5 1@Override
2public boolean inEventLoop(Thread thread) {
3 return thread == this.thread;
4}
5
我们看到判断的依据是当前线程对象是不是
NioEventLoop绑定的线程对象
, 这里我们会想到开启线程肯定会为
NioEventLoop绑定一个线程对象
, 如果判断当前线程对象不是当前
NioEventLoop绑定的线程对象
, 说明执行此方法的线程不是当前
NioEventLoop线程
, 那么这个线程如何初始化的
, 后面我们会讲到
, 我们继续看
execute(Runnable task)方法
:
如果是
NioEventLoop线程
, 则会通过
addTask(task)添加任务
, 通过
NioEventLoop异步执行
, 那么这个
task是什么时候执行的
, 同样后面会讲到
跟一下
addTask(task):
1
2
3
4
5
6
7
8
9
10 1protected void addTask(Runnable task) {
2 if (task == null) {
3 throw new NullPointerException("task");
4 }
5 //如果添加不成功
6 if (!offerTask(task)) {
7 reject(task);
8 }
9}
10
这里
offerTask(task)代表添加一个
task, 跟进去
:
1
2
3
4
5
6
7
8 1final boolean offerTask(Runnable task) {
2 if (isShutdown()) {
3 reject();
4 }
5 //往taskQ中添加一个task
6 return taskQueue.offer(task);
7}
8
我们看到
taskQueue.offer(task)将一个
task添加到任务队列
, 而这个任务队列
taskQueue就是我们
NioEventLoop初始化的时候与
NioEventLoop唯一绑定的任务队列
回顾一下初始构造方法
:
1
2
3
4
5
6
7
8
9
10
11 1protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
2 boolean addTaskWakesUp, int maxPendingTasks,
3 RejectedExecutionHandler rejectedHandler) {
4 super(parent);
5 this.addTaskWakesUp = addTaskWakesUp;
6 this.maxPendingTasks = Math.max(16, maxPendingTasks);
7 this.executor = ObjectUtil.checkNotNull(executor, "executor");
8 taskQueue = newTaskQueue(this.maxPendingTasks);
9 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
10}
11
在这里通过
taskQueue = newTaskQueue(
this.maxPendingTasks) 创建了
taskQueue
回到
execute(Runnable task)方法中
, 我们继续往下看
:
如果不是
NioEventLoop线程我们通过
startThread()开启一个
NioEventLoop线程
跟到
startThread()之前
, 我们先继续往下走
:
开启
NioEventLoop线程之后
, 又通过
addTask(task)往
taskQueue添加任务
最后我们注意有这么一段代码
:
1
2
3
4 1if (!addTaskWakesUp && wakesUpForTask(task)) {
2 wakeup(inEventLoop);
3}
4
addTaskWakesUp
代表添加
task之后
, NioEventLoop的
select()操作是不是要唤醒
, 这个属性是在初始化
NioEventLoop的时候传入的
, 大家可以回顾下
, 默认是
false, 这里
!addTaskWakesUp就是需要唤醒
, wakesUpForTask(task)与
addTaskWakesUp意义相同
, 默认是
true, 可以看代码
:
1
2
3
4 1protected boolean wakesUpForTask(Runnable task) {
2 return true;
3}
4
这里恒为
true, 所以这段代码就是添加
task时需要通过
wakeup(inEventLoop)唤醒
, 这样
NioEventLoop在做
select()操作时如果正在阻塞则立刻唤醒
, 然后执行任务队列的
task
回到
execute(Runnable task)方法中我们跟进开启线程的
startThread()方法中
:
1
2
3
4
5
6
7
8
9
10 1private void startThread() {
2 //判断线程是否启动, 未启动则启动
3 if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
4 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
5 //当前线程未启动, 则启动
6 doStartThread();
7 }
8 }
9}
10
前面的判断是判断当前
NioEventLoop线程是否启动
, 如果未启动
, 则通过
doStartThread()方法启动
, 我们第一次执行
execute(Runnable task)线程是未启动的
, 所以会执行
doStartThread(), 后续该线程则不会再执行
doStartThread()方法
我们跟进
doStartThread()方法中
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 1private void doStartThread() {
2 assert thread == null;
3 //线程执行器执行线程(所有的eventLoop共用一个线程执行器)
4 executor.execute(new Runnable() {
5 @Override
6 public void run() {
7 //将当前线程复制给属性
8 thread = Thread.currentThread();
9 if (interrupted) {
10 thread.interrupt();
11 }
12 boolean success = false;
13 updateLastExecutionTime();
14 try {
15 //开始轮询
16 SingleThreadEventExecutor.this.run();
17 success = true;
18 } catch (Throwable t) {
19 logger.warn("Unexpected exception from an event executor: ", t);
20 } finally {
21 //代码省略
22 }
23 }
24 });
25}
26
我们重点关注
executor.execute()这个方法
, 其中
executor就是我们创建
NioEventLoop的线程器
, execute()就是开启一个线程
回顾下
execute()方法
:
1
2
3
4
5 1public void execute(Runnable command) {
2 //起一个线程
3 threadFactory.newThread(command).start();
4}
5
我们看到通过线程工厂开启一个线程
, 由于前面的小节已经剖析
, 这里不再赘述
开启线程则执行
Runnble类中的
run()方法
, 我们看到在
run()方法里通过
thread = Thread.currentThread() 将新开启的线程对象赋值
NioEventLoop的
thread的属性
, 这样就可以通过线程对象的判断
, 来确定是不是
NioEventLoop线程了
后面我们看到
SingleThreadEventExecutor.
this.run()
, 这里
this, 就是当前
NioEventLoop对象
, 而这里的
run()方法
, 就是
NioEventLoop中的
run()方法
, 在这个
run()方法中
, 真正开始了
selector的轮询工作
, 对于
run()方法的详细剖析
, 我们会在之后的小节中进行
刚才我们剖析了
NioEventLoop的启动方法
, 那么根据我们的分析
, 就是第一次调用
NioEventLoop的
execute(Runnable task)方法的时候
, 则会开启
NioEventLoop线程
, 之后的调用只是往
taskQueue中添加任务
, 那么第一次是什么时候开启的呢
?这里我们要回顾上一章讲过的内容
上一章中我们讲过在
AbstractServerBootstrap中有个
initAndRegister()方法
, 这个方法主要用于
channel的初始化和注册
, 其中注册的代码为
:
1
2 1ChannelFuture regFuture = config().group().register(channel);
2
其中
group()我们剖析过是
Boss线程的
group, 我们剖析过其中的
register(channel)方法
:
1
2
3
4 1public ChannelFuture register(Channel channel) {
2 return next().register(channel);
3}
4
首先跟到
next()方法
:
1
2
3
4 1public EventLoop next() {
2 return (EventLoop) super.next();
3}
4
首先调用了其父类
MultithreadEventExecutorGroup的
next方法
, 跟进去
:
1
2
3
4 1public EventExecutor next() {
2 return chooser.next();
3}
4
这里
chooser, 就是初始化
NioEventLoopGroup的线程选择器
, 为此分配了不同的策略
, 这里不再赘述
, 通过这个方法
, 返回一个
NioEventLoop线程
回到
MultithreadEventLoopGroup类的
register()方法中
, next().register(channel)代表分配后的
NioEventLoop的
register()方法
, 这里会调用
NioEventLoop的父类
SingleThreadEventLoop类中的
register()方法
跟到
SingleThreadEventLoop类中的
register()方法:
1
2
3
4 1public ChannelFuture register(Channel channel) {
2 return register(new DefaultChannelPromise(channel, this));
3}
4
DefaultChannelPromise
是一个监听器
, 它会跟随
channel的读写进行监听
, 绑定传入的
channel和
NioEventLoop, 有关
Promise后面的章节会讲到
这里我们继续跟进
register(new DefaultChannelPromise(channel, this))
1
2
3
4
5
6 1public ChannelFuture register(final ChannelPromise promise) {
2 ObjectUtil.checkNotNull(promise, "promise");
3 promise.channel().unsafe().register(this, promise);
4 return promise;
5}
6
unsafe()
方法返回创建
channel初始化的
unsafe()对象
, 如果是
NioSeverSocketChannel, 则绑定
NioMessageUnsafe
对象
, 上一小节进行剖析过这里不再赘述
最终这个
unsafe对象会调用到
AbstractChannel的内部类
AbstractUnsafe中的
register()方法
, 这里
register(), 无论是客户端
channel和服务器
channel都会通过这个一个
register注册
, 在以后的客户端接入章节中我们会看到
这里我们继续看
register方法
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 1public final void register(EventLoop eventLoop, final ChannelPromise promise) {
2 //代码省略
3 //所有的复制操作, 都交给eventLoop处理(1)
4 AbstractChannel.this.eventLoop = eventLoop;
5 if (eventLoop.inEventLoop()) {
6 register0(promise);
7 } else {
8 try {
9 eventLoop.execute(new Runnable() {
10 @Override
11 public void run() {
12 //做实际主注册(2)
13 register0(promise);
14 }
15 });
16 } catch (Throwable t) {
17 //代码省略
18 }
19 }
20}
21
这里我们上一小节分析过
, 不再陌生
, 这里只分析有关
NioEventLoop相关的内容
我们首先看到
AbstractChannel.
this.eventLoop = eventLoop
, 获取当前
channel的
NioEventLoop, 通过上一章的学习
, 我们知道每个
channel创建的时候会绑定一个
NioEventLoop
这里通过
eventLoop.inEventLoop()判断当前线程是否是
NioEventLoop线程
, inEventLoop()方法在前面的小节剖析过
, 这里不再赘述
如果是
NioEventLoop线程则通过
register0(promise)
方法做实际的注册
, 但是我们第一次执行注册方法的时候
, 如果是服务器
channel是则是由
server的用户线程执行的
, 如果是客户端
channel, 则是由
Boss线程执行的
, 所以走到这里均不是当前
channel的
NioEventLoop的线程
, 于是会走到下面的
eventLoop.execute()方法中
eventLoop.execute()
上一小节剖析过
, 就是将
task添加到
taskQueue中并且开启器
NioEventLoop线程
, 所以
, 在这里就开启了
NioEventLoop线程
, 有关开启步骤
, 可以通过上一小节内容进行回顾
这里注意一点
, 有的资料会讲第一次开启
NioEventLoop线程是在
AbstractBootstrap的
doBind0(regFuture, channel, localAddress, promise)方法中开启的
, 个人经过
debug和分析
, 实际上并不是那样的
, 希望大家不要被误导
简单看下
doBind0(regFuture, channel, localAddress, promise)方法
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
2 channel.eventLoop().execute(new Runnable() {
3 @Override
4 public void run() {
5 if (regFuture.isSuccess()) {
6 //绑定端口
7 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
8 } else {
9 promise.setFailure(regFuture.cause());
10 }
11 }
12 });
13}
14
这里虽然调用了
eventLoop的
execute()方法
, 但是
eventLoop线程在注册期间已经启动
, 所以这里不会重复启动
, 只会将任务添加到
taskQueue中
其实这里我们也能够看出
, 其实绑定端口的相关操作
, 同样是也是
eventLoop线程中执行的
上一节: 初始化线程选择器
下一节: 优化selector