[编织消息框架][netty源码分析]3 EventLoop 实现类SingleThreadEventLoop职责与实现

释放双眼,带上耳机,听听看~!

eventLoop是基于事件系统机制,主要技术由线程池同队列组成,是由生产/消费者模型设计,那么先搞清楚谁是生产者,消费者内容

SingleThreadEventLoop 实现


1
2
3
4
5
6
7
8
9
1public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
2    private final Queue<Runnable> tailTasks;
3
4    @Override
5    protected void afterRunningAllTasks() {
6        runAllTasksFrom(tailTasks);
7    }
8}
9

SingleThreadEventLoop是个抽象类,从实现代码上看出很简单的逻辑边界判断

SingleThreadEventExecutor也是个抽象类,代码量比较大,我们先看重要的成员属性


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
1public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
2    //事件队列
3    private final Queue<Runnable> taskQueue;
4    //执行事件线程,可以看出只有一个线程只要用来记录executor的当前线程
5    private volatile Thread thread;
6    //主要负责监控该线程的生命周期,提取出当前线程然后用thread记录
7    private final Executor executor;
8    //用Atomic*技术记录当前线程状态
9    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
10                AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
11}
12
13//启动线程做了比较判断
14private void startThread() {
15    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
16        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
17            doStartThread();
18        }
19    }
20}
21
22private void doStartThread() {
23    executor.execute(new Runnable() {
24        @Override
25        public void run() {
26            //记录当前执行线程
27            thread = Thread.currentThread();
28            if (interrupted) {
29                thread.interrupt();
30            }
31
32            boolean success = false;
33            updateLastExecutionTime();
34            try {
35                //这里调用的是子类,注意子类是死循环不停的执行任务
36                SingleThreadEventExecutor.this.run();
37                success = true;
38            } catch (Throwable t) {
39                logger.warn("Unexpected exception from an event executor: ", t);
40            } finally {
41                //更改线程结束状态 省略部分代码
42                for (;;) {
43                    int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
44                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
45                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
46                        break;
47                    }
48                }
49                try {
50                    // 执行未完成任务同 shutdown hooks.
51                    for (;;) {
52                        if (confirmShutdown()) {
53                            break;
54                        }
55                    }
56                } finally {
57                    try {
58                        //最后清理操作,如 NioEventLoop实现 selector.close();
59                        cleanup();
60                    } finally {
61                        //省略部分代码
62                    }
63                }
64            }
65        }
66    });
67}
68

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
1protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
2    Runnable task = pollTaskFrom(taskQueue);
3    if (task == null) {
4        return false;
5    }
6    for (;;) {
7        //安全执行任务
8        safeExecute(task);
9        //继续执行剩余任务
10        task = pollTaskFrom(taskQueue);
11        if (task == null) {
12            return true;
13        }
14    }
15}
16
17protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
18    for (;;) {
19        Runnable task = taskQueue.poll();
20        //忽略WAKEUP_TASK类型任务
21        if (task == WAKEUP_TASK) {
22            continue;
23        }
24        return task;
25    }
26}
27
28protected boolean runAllTasks(long timeoutNanos) {
29    //先执行周期任务
30    fetchFromScheduledTaskQueue();
31    //从taskQueue提一个任务,如果为空执行所有tailTasks
32    Runnable task = pollTask();
33    //如果taskQueue没有任务,立即执行子类的tailTasks
34    if (task == null) {
35         afterRunningAllTasks();
36        return false;
37    }
38    //计算出超时时间 = 当前 nanoTime + timeoutNanos
39    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
40    long runTasks = 0;
41    long lastExecutionTime;
42    for (;;) {
43        safeExecute(task);
44
45        runTasks ++;
46        //当执行任务次数大于64判断是否超时,防止长时间独占CPU
47        if ((runTasks & 0x3F) == 0) {
48            lastExecutionTime = ScheduledFutureTask.nanoTime();
49            if (lastExecutionTime >= deadline) {
50                break;
51            }
52        }
53
54        task = pollTask();
55        if (task == null) {
56            lastExecutionTime = ScheduledFutureTask.nanoTime();
57            break;
58        }
59    }
60
61    afterRunningAllTasks();
62    this.lastExecutionTime = lastExecutionTime;
63    return true;
64}
65

 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1//SingleThreadEventLoop run 实现
2public class DefaultEventLoop extends SingleThreadEventLoop {
3
4    @Override
5    protected void run() {
6        for (;;) {
7            Runnable task = takeTask();
8            if (task != null) {
9                task.run();
10                updateLastExecutionTime();
11            }
12
13            if (confirmShutdown()) {
14                break;
15            }
16        }
17    }
18}
19

 

我们可以在SingleThreadEventExecutor  两个runAllTasks 方法打上断点,看执行任务时调用逻辑

[编织消息框架][netty源码分析]3 EventLoop 实现类SingleThreadEventLoop职责与实现

 

 本人为了搞清楚 taskQueue 同tailTasks 类型任务,在任务入队时打断点,分别为 SingleThreadEventLoop executeAfterEventLoopIteration方法同 SingleThreadEventExecutor offerTask方法

[编织消息框架][netty源码分析]3 EventLoop 实现类SingleThreadEventLoop职责与实现

 

 

ServerBootstrap[bind address] ->

NioEventLoopGroup [register Channel] ->  [ChannelPromise] ->

NioEventLoop [build and push register task]

从调用链可以清晰看出,启动 netty server 绑定生成抽象 Channel 然后l转换成ChannelPromise,再调用注册实现register0

这里用了判断是否为当前线程,如果是不用加入队列马上执行,目前减少上下文切换开削


1
2
3
4
5
6
7
8
9
10
11
1if (eventLoop.inEventLoop()) {
2    register0(promise);
3} else {
4    eventLoop.execute(new Runnable() {
5        @Override
6        public void run() {
7            register0(promise);
8        }
9    });
10}
11

 

总结:

1.SingleThreadEventLoop 任务执行加了超时限制,目的防止当前线程长时间执行任务独占cpu

2.提交任务时做了减少上下文开削优化

3.执行任务优先级 1.周期任务 2.taskQueue 3.tailTasks

目前没有看到任何调用 SingleThreadEventLoop executeAfterEventLoopIteration 方法,估计是扩展处理。

4.用到Atomic*技术解决并发问题,从Executor提取当前线程,把单一线程维护交给Executor
 

给TA打赏
共{{data.count}}人
人已打赏
安全技术

用node.js从零开始去写一个简单的爬虫

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索