** **
Netty源码分析第二章: NioEventLoop
** **
第八节: 执行任务队列
继续回到NioEventLoop的run()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 1protected void run() {
2 for (;;) {
3 try {
4 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
5 case SelectStrategy.CONTINUE:
6 continue;
7 case SelectStrategy.SELECT:
8 //轮询io事件(1)
9 select(wakenUp.getAndSet(false));
10 if (wakenUp.get()) {
11 selector.wakeup();
12 }
13 default:
14 }
15 cancelledKeys = 0;
16 needsToSelectAgain = false;
17 //默认是50
18 final int ioRatio = this.ioRatio;
19 if (ioRatio == 100) {
20 try {
21 processSelectedKeys();
22 } finally {
23 runAllTasks();
24 }
25 } else {
26 //记录下开始时间
27 final long ioStartTime = System.nanoTime();
28 try {
29 //处理轮询到的key(2)
30 processSelectedKeys();
31 } finally {
32 //计算耗时
33 final long ioTime = System.nanoTime() - ioStartTime;
34 //执行task(3)
35 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
36 }
37 }
38 } catch (Throwable t) {
39 handleLoopException(t);
40 }
41 //代码省略
42 }
43}
44
我们看到处理完轮询到的
key之后
, 首先记录下耗时
, 然后通过
runAllTasks(ioTime * (100 – ioRatio) / ioRatio)执行
taskQueue中的任务
我们知道
ioRatio默认是
50, 所以执行完
ioTime * (100 – ioRatio) / ioRatio后
, 方法传入的值为
ioTime, 也就是
processSelectedKeys()的执行时间
:
跟进runAllTasks方法
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 1protected boolean runAllTasks(long timeoutNanos) {
2 //定时任务队列中聚合任务
3 fetchFromScheduledTaskQueue();
4 //从普通taskQ里面拿一个任务
5 Runnable task = pollTask();
6 //task为空, 则直接返回
7 if (task == null) {
8 //跑完所有的任务执行收尾的操作
9 afterRunningAllTasks();
10 return false;
11 }
12 //如果队列不为空
13 //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
14 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
15 long runTasks = 0;
16 long lastExecutionTime;
17 //执行每一个任务
18 for (;;) {
19 safeExecute(task);
20 //标记当前跑完的任务
21 runTasks ++;
22 //当跑完64个任务的时候, 会计算一下当前时间
23 if ((runTasks & 0x3F) == 0) {
24 //定时任务初始化到当前的时间
25 lastExecutionTime = ScheduledFutureTask.nanoTime();
26 //如果超过截止时间则不执行(nanoTime()是耗时的)
27 if (lastExecutionTime >= deadline) {
28 break;
29 }
30 }
31 //如果没有超过这个时间, 则继续从普通任务队列拿任务
32 task = pollTask();
33 //直到没有任务执行
34 if (task == null) {
35 //记录下最后执行时间
36 lastExecutionTime = ScheduledFutureTask.nanoTime();
37 break;
38 }
39 }
40 //收尾工作
41 afterRunningAllTasks();
42 this.lastExecutionTime = lastExecutionTime;
43 return true;
44}
45
首先会执行
fetchFromScheduledTaskQueue()这个方法
, 这个方法的意思是从定时任务队列中聚合任务
, 也就是将定时任务中找到可以执行的任务添加到
taskQueue中
我们跟进fetchFromScheduledTaskQueue()方法
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1private boolean fetchFromScheduledTaskQueue() {
2 long nanoTime = AbstractScheduledEventExecutor.nanoTime();
3 //从定时任务队列中抓取第一个定时任务
4 //寻找截止时间为nanoTime的任务
5 Runnable scheduledTask = pollScheduledTask(nanoTime);
6 //如果该定时任务队列不为空, 则塞到普通任务队列里面
7 while (scheduledTask != null) {
8 //如果添加到普通任务队列过程中失败
9 if (!taskQueue.offer(scheduledTask)) {
10 //则重新添加到定时任务队列中
11 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
12 return false;
13 }
14 //继续从定时任务队列中拉取任务
15 //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中
16 scheduledTask = pollScheduledTask(nanoTime);
17 }
18 return true;
19}
20
long nanoTime = AbstractScheduledEventExecutor.nanoTime() 代表从定时任务初始化到现在过去了多长时间
Runnable scheduledTask= pollScheduledTask(nanoTime) 代表从定时任务队列中拿到小于
nanoTime时间的任务
, 因为小于初始化到现在的时间
, 说明该任务需要执行了
跟到其父类
AbstractScheduledEventExecutor的
pollScheduledTask(nanoTime)方法中
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1protected final Runnable pollScheduledTask(long nanoTime) {
2 assert inEventLoop();
3 //拿到定时任务队列
4 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
5 //peek()方法拿到第一个任务
6 ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
7 if (scheduledTask == null) {
8 return null;
9 }
10
11 if (scheduledTask.deadlineNanos() <= nanoTime) {
12 //从队列中删除
13 scheduledTaskQueue.remove();
14 //返回该任务
15 return scheduledTask;
16 }
17 return null;
18}
19
我们看到首先获得当前类绑定的定时任务队列的成员变量
如果不为空
, 则通过
scheduledTaskQueue.peek()弹出第一个任务
如果当前任务小于传来的时间
, 说明该任务需要执行
, 则从定时任务队列中删除
我们继续回到
fetchFromScheduledTaskQueue()方法中
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1private boolean fetchFromScheduledTaskQueue() {
2 long nanoTime = AbstractScheduledEventExecutor.nanoTime();
3 //从定时任务队列中抓取第一个定时任务
4 //寻找截止时间为nanoTime的任务
5 Runnable scheduledTask = pollScheduledTask(nanoTime);
6 //如果该定时任务队列不为空, 则塞到普通任务队列里面
7 while (scheduledTask != null) {
8 //如果添加到普通任务队列过程中失败
9 if (!taskQueue.offer(scheduledTask)) {
10 //则重新添加到定时任务队列中
11 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
12 return false;
13 }
14 //继续从定时任务队列中拉取任务
15 //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中
16 scheduledTask = pollScheduledTask(nanoTime);
17 }
18 return true;
19}
20
弹出需要执行的定时任务之后
, 我们通过
taskQueue.offer(scheduledTask)添加到
taskQueue中
, 如果添加失败
, 则通过
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)重新添加到定时任务队列中
如果添加成功
, 则通过
pollScheduledTask(nanoTime)方法继续添加
, 直到没有需要执行的任务
这样就将定时任务队列需要执行的任务添加到了
taskQueue中
回到
runAllTasks(long timeoutNanos)方法中
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 1protected boolean runAllTasks(long timeoutNanos) {
2 //定时任务队列中聚合任务
3 fetchFromScheduledTaskQueue();
4 //从普通taskQ里面拿一个任务
5 Runnable task = pollTask();
6 //task为空, 则直接返回
7 if (task == null) {
8 //跑完所有的任务执行收尾的操作
9 afterRunningAllTasks();
10 return false;
11 }
12 //如果队列不为空
13 //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
14 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
15 long runTasks = 0;
16 long lastExecutionTime;
17 //执行每一个任务
18 for (;;) {
19 safeExecute(task);
20 //标记当前跑完的任务
21 runTasks ++;
22 //当跑完64个任务的时候, 会计算一下当前时间
23 if ((runTasks & 0x3F) == 0) {
24 //定时任务初始化到当前的时间
25 lastExecutionTime = ScheduledFutureTask.nanoTime();
26 //如果超过截止时间则不执行(nanoTime()是耗时的)
27 if (lastExecutionTime >= deadline) {
28 break;
29 }
30 }
31 //如果没有超过这个时间, 则继续从普通任务队列拿任务
32 task = pollTask();
33 //直到没有任务执行
34 if (task == null) {
35 //记录下最后执行时间
36 lastExecutionTime = ScheduledFutureTask.nanoTime();
37 break;
38 }
39 }
40 //收尾工作
41 afterRunningAllTasks();
42 this.lastExecutionTime = lastExecutionTime;
43 return true;
44}
45
首先通过
Runnable task = pollTask() 从
taskQueue中拿一个任务
任务不为空
, 则通过
final
long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 计算一个截止时间
, 任务的执行时间不能超过这个时间
然后在
for循环中通过
safeExecute(task)执行
task
我们跟到
safeExecute(task)中
:
1
2
3
4
5
6
7
8
9
10 1protected static void safeExecute(Runnable task) {
2 try {
3 //直接调用run()方法执行
4 task.run();
5 } catch (Throwable t) {
6 //发生异常不终止
7 logger.warn("A task raised an exception. Task: {}", task, t);
8 }
9}
10
这里直接调用
task的
run()方法进行执行
, 其中发生异常
, 只打印一条日志
, 代表发生异常不终止
, 继续往下执行
回到
runAllTasks(long timeoutNanos)方法
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 1protected boolean runAllTasks(long timeoutNanos) {
2 //定时任务队列中聚合任务
3 fetchFromScheduledTaskQueue();
4 //从普通taskQ里面拿一个任务
5 Runnable task = pollTask();
6 //task为空, 则直接返回
7 if (task == null) {
8 //跑完所有的任务执行收尾的操作
9 afterRunningAllTasks();
10 return false;
11 }
12 //如果队列不为空
13 //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
14 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
15 long runTasks = 0;
16 long lastExecutionTime;
17 //执行每一个任务
18 for (;;) {
19 safeExecute(task);
20 //标记当前跑完的任务
21 runTasks ++;
22 //当跑完64个任务的时候, 会计算一下当前时间
23 if ((runTasks & 0x3F) == 0) {
24 //定时任务初始化到当前的时间
25 lastExecutionTime = ScheduledFutureTask.nanoTime();
26 //如果超过截止时间则不执行(nanoTime()是耗时的)
27 if (lastExecutionTime >= deadline) {
28 break;
29 }
30 }
31 //如果没有超过这个时间, 则继续从普通任务队列拿任务
32 task = pollTask();
33 //直到没有任务执行
34 if (task == null) {
35 //记录下最后执行时间
36 lastExecutionTime = ScheduledFutureTask.nanoTime();
37 break;
38 }
39 }
40 //收尾工作
41 afterRunningAllTasks();
42 this.lastExecutionTime = lastExecutionTime;
43 return true;
44}
45
每次执行完
task, runTasks自增
这里
if ((runTasks & 0x3F) == 0) 代表是否执行了
64个任务
, 如果执行了
64个任务
, 则会通过
lastExecutionTime = ScheduledFutureTask.nanoTime() 记录定时任务初始化到现在的时间
, 如果这个时间超过了截止时间
, 则退出循环
如果没有超过截止时间
, 则通过
task = pollTask() 继续弹出任务执行
这里执行
64个任务统计一次时间
, 而不是每次执行任务都统计
, 主要原因是因为获取系统时间是个比较耗时的操作
, 这里是
netty的一种优化方式
如果没有
task需要执行
, 则通过
afterRunningAllTasks()做收尾工作
, 最后记录下最后的执行时间
以上就是有关执行任务队列的相关逻辑
第二章总结
本章学习了有关
NioEventLoopGroup的创建
, NioEventLoop的创建和启动
, 以及多路复用器的轮询处理和
task执行的相关逻辑
, 通过本章学习
, 我们应该掌握如下内容
:
1. NioEventLoopGroup
如何选择分配
NioEventLoop
2. NioEventLoop
如何开启
3. NioEventLoop
如何进行
select操作
4. NioEventLoop
如何执行
task
上一节: 处理IO事件
下一节: 初始化NioSocketChannelConfig