Netty源码分析第2章(NioEventLoop)—->第8节: 执行任务队列

释放双眼,带上耳机,听听看~!

** **

Netty源码分析第二章: NioEventLoop

** **

第八节: 执行任务队列

继续回到NioEventLoop的run()方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1protected void run() {
2    for (;;) {
3        try {
4            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
5                case SelectStrategy.CONTINUE:
6                    continue;
7                case SelectStrategy.SELECT:
8                    //轮询io事件(1)
9                    select(wakenUp.getAndSet(false));
10                    if (wakenUp.get()) {
11                        selector.wakeup();
12                    }
13                default:
14            }
15            cancelledKeys = 0;
16            needsToSelectAgain = false;
17            //默认是50
18            final int ioRatio = this.ioRatio;
19            if (ioRatio == 100) {
20                try {
21                    processSelectedKeys();
22                } finally {
23                    runAllTasks();
24                }
25            } else {
26                //记录下开始时间
27                final long ioStartTime = System.nanoTime();
28                try {
29                    //处理轮询到的key(2)
30                    processSelectedKeys();
31                } finally {
32                    //计算耗时
33                    final long ioTime = System.nanoTime() - ioStartTime;
34                    //执行task(3)
35                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
36                }
37            }
38        } catch (Throwable t) {
39            handleLoopException(t);
40        }
41        //代码省略
42    }
43}
44

我们看到处理完轮询到的
key之后
, 首先记录下耗时
, 然后通过
runAllTasks(ioTime * (100 – ioRatio) / ioRatio)执行
taskQueue中的任务

我们知道
ioRatio默认是
50, 所以执行完
ioTime * (100 – ioRatio) / ioRatio后
, 方法传入的值为
ioTime, 也就是
processSelectedKeys()的执行时间
:

 

跟进runAllTasks方法
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
1protected boolean runAllTasks(long timeoutNanos) {
2    //定时任务队列中聚合任务
3    fetchFromScheduledTaskQueue();
4    //从普通taskQ里面拿一个任务
5    Runnable task = pollTask();
6    //task为空, 则直接返回
7    if (task == null) {
8        //跑完所有的任务执行收尾的操作
9        afterRunningAllTasks();
10        return false;
11    }
12    //如果队列不为空
13    //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
14    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
15    long runTasks = 0;
16    long lastExecutionTime;
17    //执行每一个任务
18    for (;;) {
19        safeExecute(task);
20        //标记当前跑完的任务
21        runTasks ++;
22        //当跑完64个任务的时候, 会计算一下当前时间
23        if ((runTasks & 0x3F) == 0) {
24            //定时任务初始化到当前的时间
25            lastExecutionTime = ScheduledFutureTask.nanoTime();
26            //如果超过截止时间则不执行(nanoTime()是耗时的)
27            if (lastExecutionTime >= deadline) {
28                break;
29            }
30        }
31        //如果没有超过这个时间, 则继续从普通任务队列拿任务
32        task = pollTask();
33        //直到没有任务执行
34        if (task == null) {
35            //记录下最后执行时间
36            lastExecutionTime = ScheduledFutureTask.nanoTime();
37            break;
38        }
39    }
40    //收尾工作
41    afterRunningAllTasks();
42    this.lastExecutionTime = lastExecutionTime;
43    return true;
44}
45

首先会执行
fetchFromScheduledTaskQueue()这个方法
, 这个方法的意思是从定时任务队列中聚合任务
, 也就是将定时任务中找到可以执行的任务添加到
taskQueue中

我们跟进fetchFromScheduledTaskQueue()方法
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1private boolean fetchFromScheduledTaskQueue() {
2    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
3    //从定时任务队列中抓取第一个定时任务
4    //寻找截止时间为nanoTime的任务
5    Runnable scheduledTask  = pollScheduledTask(nanoTime);
6    //如果该定时任务队列不为空, 则塞到普通任务队列里面
7    while (scheduledTask != null) {
8        //如果添加到普通任务队列过程中失败
9        if (!taskQueue.offer(scheduledTask)) {
10            //则重新添加到定时任务队列中
11            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
12            return false;
13        }
14        //继续从定时任务队列中拉取任务
15        //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中
16        scheduledTask = pollScheduledTask(nanoTime);
17    }
18    return true;
19}
20

 
long nanoTime = AbstractScheduledEventExecutor.nanoTime() 代表从定时任务初始化到现在过去了多长时间

 
Runnable scheduledTask= pollScheduledTask(nanoTime) 代表从定时任务队列中拿到小于
nanoTime时间的任务
, 因为小于初始化到现在的时间
, 说明该任务需要执行了

 

跟到其父类
AbstractScheduledEventExecutor的
pollScheduledTask(nanoTime)方法中
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1protected final Runnable pollScheduledTask(long nanoTime) {
2    assert inEventLoop();
3    //拿到定时任务队列
4    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
5    //peek()方法拿到第一个任务
6    ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
7    if (scheduledTask == null) {
8        return null;
9    }
10
11    if (scheduledTask.deadlineNanos() <= nanoTime) {
12        //从队列中删除
13        scheduledTaskQueue.remove();
14        //返回该任务
15        return scheduledTask;
16    }
17    return null;
18}
19

我们看到首先获得当前类绑定的定时任务队列的成员变量

如果不为空
, 则通过
scheduledTaskQueue.peek()弹出第一个任务

如果当前任务小于传来的时间
, 说明该任务需要执行
, 则从定时任务队列中删除

 

我们继续回到
fetchFromScheduledTaskQueue()方法中
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1private boolean fetchFromScheduledTaskQueue() {
2    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
3    //从定时任务队列中抓取第一个定时任务
4    //寻找截止时间为nanoTime的任务
5    Runnable scheduledTask  = pollScheduledTask(nanoTime);
6    //如果该定时任务队列不为空, 则塞到普通任务队列里面
7    while (scheduledTask != null) {
8        //如果添加到普通任务队列过程中失败
9        if (!taskQueue.offer(scheduledTask)) {
10            //则重新添加到定时任务队列中
11            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
12            return false;
13        }
14        //继续从定时任务队列中拉取任务
15        //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中
16        scheduledTask = pollScheduledTask(nanoTime);
17    }
18    return true;
19}
20

弹出需要执行的定时任务之后
, 我们通过
taskQueue.offer(scheduledTask)添加到
taskQueue中
, 如果添加失败
, 则通过
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)重新添加到定时任务队列中

 

如果添加成功
, 则通过
pollScheduledTask(nanoTime)方法继续添加
, 直到没有需要执行的任务

这样就将定时任务队列需要执行的任务添加到了
taskQueue中

 

回到
runAllTasks(long timeoutNanos)方法中
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
1protected boolean runAllTasks(long timeoutNanos) {
2    //定时任务队列中聚合任务
3    fetchFromScheduledTaskQueue();
4    //从普通taskQ里面拿一个任务
5    Runnable task = pollTask();
6    //task为空, 则直接返回
7    if (task == null) {
8        //跑完所有的任务执行收尾的操作
9        afterRunningAllTasks();
10        return false;
11    }
12    //如果队列不为空
13    //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
14    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
15    long runTasks = 0;
16    long lastExecutionTime;
17    //执行每一个任务
18    for (;;) {
19        safeExecute(task);
20        //标记当前跑完的任务
21        runTasks ++;
22        //当跑完64个任务的时候, 会计算一下当前时间
23        if ((runTasks &amp; 0x3F) == 0) {
24            //定时任务初始化到当前的时间
25            lastExecutionTime = ScheduledFutureTask.nanoTime();
26            //如果超过截止时间则不执行(nanoTime()是耗时的)
27            if (lastExecutionTime &gt;= deadline) {
28                break;
29            }
30        }
31        //如果没有超过这个时间, 则继续从普通任务队列拿任务
32        task = pollTask();
33        //直到没有任务执行
34        if (task == null) {
35            //记录下最后执行时间
36            lastExecutionTime = ScheduledFutureTask.nanoTime();
37            break;
38        }
39    }
40    //收尾工作
41    afterRunningAllTasks();
42    this.lastExecutionTime = lastExecutionTime;
43    return true;
44}
45

首先通过 
Runnable task = pollTask() 从
taskQueue中拿一个任务

任务不为空
, 则通过 
final
long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 计算一个截止时间
, 任务的执行时间不能超过这个时间

然后在
for循环中通过
safeExecute(task)执行
task

 

我们跟到
safeExecute(task)中
:


1
2
3
4
5
6
7
8
9
10
1protected static void safeExecute(Runnable task) {
2    try {
3        //直接调用run()方法执行
4        task.run();
5    } catch (Throwable t) {
6        //发生异常不终止
7        logger.warn(&quot;A task raised an exception. Task: {}&quot;, task, t);
8    }
9}
10

这里直接调用
task的
run()方法进行执行
, 其中发生异常
, 只打印一条日志
, 代表发生异常不终止
, 继续往下执行

 

回到
runAllTasks(long timeoutNanos)方法
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
1protected boolean runAllTasks(long timeoutNanos) {
2    //定时任务队列中聚合任务
3    fetchFromScheduledTaskQueue();
4    //从普通taskQ里面拿一个任务
5    Runnable task = pollTask();
6    //task为空, 则直接返回
7    if (task == null) {
8        //跑完所有的任务执行收尾的操作
9        afterRunningAllTasks();
10        return false;
11    }
12    //如果队列不为空
13    //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
14    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
15    long runTasks = 0;
16    long lastExecutionTime;
17    //执行每一个任务
18    for (;;) {
19        safeExecute(task);
20        //标记当前跑完的任务
21        runTasks ++;
22        //当跑完64个任务的时候, 会计算一下当前时间
23        if ((runTasks &amp; 0x3F) == 0) {
24            //定时任务初始化到当前的时间
25            lastExecutionTime = ScheduledFutureTask.nanoTime();
26            //如果超过截止时间则不执行(nanoTime()是耗时的)
27            if (lastExecutionTime &gt;= deadline) {
28                break;
29            }
30        }
31        //如果没有超过这个时间, 则继续从普通任务队列拿任务
32        task = pollTask();
33        //直到没有任务执行
34        if (task == null) {
35            //记录下最后执行时间
36            lastExecutionTime = ScheduledFutureTask.nanoTime();
37            break;
38        }
39    }
40    //收尾工作
41    afterRunningAllTasks();
42    this.lastExecutionTime = lastExecutionTime;
43    return true;
44}
45

每次执行完
task, runTasks自增

 

这里 
if ((runTasks & 0x3F) == 0) 代表是否执行了
64个任务
, 如果执行了
64个任务
, 则会通过 
lastExecutionTime = ScheduledFutureTask.nanoTime() 记录定时任务初始化到现在的时间
, 如果这个时间超过了截止时间
, 则退出循环

 

如果没有超过截止时间
, 则通过 
task = pollTask() 继续弹出任务执行

这里执行
64个任务统计一次时间
, 而不是每次执行任务都统计
, 主要原因是因为获取系统时间是个比较耗时的操作
, 这里是
netty的一种优化方式

如果没有
task需要执行
, 则通过
afterRunningAllTasks()做收尾工作
, 最后记录下最后的执行时间

以上就是有关执行任务队列的相关逻辑

 

 

第二章总结

 

        本章学习了有关
NioEventLoopGroup的创建
, NioEventLoop的创建和启动
, 以及多路复用器的轮询处理和
task执行的相关逻辑
, 通过本章学习
, 我们应该掌握如下内容
:

        1.  NioEventLoopGroup
如何选择分配
NioEventLoop

        2.  NioEventLoop
如何开启

        3.  NioEventLoop
如何进行
select操作

        4.  NioEventLoop
如何执行
task

 

 

上一节: 处理IO事件

下一节: 初始化NioSocketChannelConfig

 

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全技术

php 使用kafka

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索