eventLoop是基于事件系统机制,主要技术由线程池同队列组成,是由生产/消费者模型设计,那么先搞清楚谁是生产者,消费者内容
SingleThreadEventLoop 实现
1
2
3
4
5
6
7
8
9 1public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
2 private final Queue<Runnable> tailTasks;
3
4 @Override
5 protected void afterRunningAllTasks() {
6 runAllTasksFrom(tailTasks);
7 }
8}
9
SingleThreadEventLoop是个抽象类,从实现代码上看出很简单的逻辑边界判断
SingleThreadEventExecutor也是个抽象类,代码量比较大,我们先看重要的成员属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 1public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
2 //事件队列
3 private final Queue<Runnable> taskQueue;
4 //执行事件线程,可以看出只有一个线程只要用来记录executor的当前线程
5 private volatile Thread thread;
6 //主要负责监控该线程的生命周期,提取出当前线程然后用thread记录
7 private final Executor executor;
8 //用Atomic*技术记录当前线程状态
9 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
10 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
11}
12
13//启动线程做了比较判断
14private void startThread() {
15 if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
16 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
17 doStartThread();
18 }
19 }
20}
21
22private void doStartThread() {
23 executor.execute(new Runnable() {
24 @Override
25 public void run() {
26 //记录当前执行线程
27 thread = Thread.currentThread();
28 if (interrupted) {
29 thread.interrupt();
30 }
31
32 boolean success = false;
33 updateLastExecutionTime();
34 try {
35 //这里调用的是子类,注意子类是死循环不停的执行任务
36 SingleThreadEventExecutor.this.run();
37 success = true;
38 } catch (Throwable t) {
39 logger.warn("Unexpected exception from an event executor: ", t);
40 } finally {
41 //更改线程结束状态 省略部分代码
42 for (;;) {
43 int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
44 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
45 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
46 break;
47 }
48 }
49 try {
50 // 执行未完成任务同 shutdown hooks.
51 for (;;) {
52 if (confirmShutdown()) {
53 break;
54 }
55 }
56 } finally {
57 try {
58 //最后清理操作,如 NioEventLoop实现 selector.close();
59 cleanup();
60 } finally {
61 //省略部分代码
62 }
63 }
64 }
65 }
66 });
67}
68
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 1protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
2 Runnable task = pollTaskFrom(taskQueue);
3 if (task == null) {
4 return false;
5 }
6 for (;;) {
7 //安全执行任务
8 safeExecute(task);
9 //继续执行剩余任务
10 task = pollTaskFrom(taskQueue);
11 if (task == null) {
12 return true;
13 }
14 }
15}
16
17protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
18 for (;;) {
19 Runnable task = taskQueue.poll();
20 //忽略WAKEUP_TASK类型任务
21 if (task == WAKEUP_TASK) {
22 continue;
23 }
24 return task;
25 }
26}
27
28protected boolean runAllTasks(long timeoutNanos) {
29 //先执行周期任务
30 fetchFromScheduledTaskQueue();
31 //从taskQueue提一个任务,如果为空执行所有tailTasks
32 Runnable task = pollTask();
33 //如果taskQueue没有任务,立即执行子类的tailTasks
34 if (task == null) {
35 afterRunningAllTasks();
36 return false;
37 }
38 //计算出超时时间 = 当前 nanoTime + timeoutNanos
39 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
40 long runTasks = 0;
41 long lastExecutionTime;
42 for (;;) {
43 safeExecute(task);
44
45 runTasks ++;
46 //当执行任务次数大于64判断是否超时,防止长时间独占CPU
47 if ((runTasks & 0x3F) == 0) {
48 lastExecutionTime = ScheduledFutureTask.nanoTime();
49 if (lastExecutionTime >= deadline) {
50 break;
51 }
52 }
53
54 task = pollTask();
55 if (task == null) {
56 lastExecutionTime = ScheduledFutureTask.nanoTime();
57 break;
58 }
59 }
60
61 afterRunningAllTasks();
62 this.lastExecutionTime = lastExecutionTime;
63 return true;
64}
65
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1//SingleThreadEventLoop run 实现
2public class DefaultEventLoop extends SingleThreadEventLoop {
3
4 @Override
5 protected void run() {
6 for (;;) {
7 Runnable task = takeTask();
8 if (task != null) {
9 task.run();
10 updateLastExecutionTime();
11 }
12
13 if (confirmShutdown()) {
14 break;
15 }
16 }
17 }
18}
19
我们可以在SingleThreadEventExecutor 两个runAllTasks 方法打上断点,看执行任务时调用逻辑
本人为了搞清楚 taskQueue 同tailTasks 类型任务,在任务入队时打断点,分别为 SingleThreadEventLoop executeAfterEventLoopIteration方法同 SingleThreadEventExecutor offerTask方法
ServerBootstrap[bind address] ->
NioEventLoopGroup [register Channel] -> [ChannelPromise] ->
NioEventLoop [build and push register task]
从调用链可以清晰看出,启动 netty server 绑定生成抽象 Channel 然后l转换成ChannelPromise,再调用注册实现register0
这里用了判断是否为当前线程,如果是不用加入队列马上执行,目前减少上下文切换开削
1
2
3
4
5
6
7
8
9
10
11 1if (eventLoop.inEventLoop()) {
2 register0(promise);
3} else {
4 eventLoop.execute(new Runnable() {
5 @Override
6 public void run() {
7 register0(promise);
8 }
9 });
10}
11
总结:
1.SingleThreadEventLoop 任务执行加了超时限制,目的防止当前线程长时间执行任务独占cpu
2.提交任务时做了减少上下文开削优化
3.执行任务优先级 1.周期任务 2.taskQueue 3.tailTasks
目前没有看到任何调用 SingleThreadEventLoop executeAfterEventLoopIteration 方法,估计是扩展处理。
4.用到Atomic*技术解决并发问题,从Executor提取当前线程,把单一线程维护交给Executor