Netty源码分析第2章(NioEventLoop)—->第1节: NioEventLoopGroup之创建线程执行器

释放双眼,带上耳机,听听看~!

 

Netty源码分析第二章: NioEventLoop

 

概述:

     
  通过上一章的学习, 我们了解了Server启动的大致流程, 有很多组件与模块并没有细讲, 从这个章开始, 我们开始详细剖析netty的各个组件, 并结合启动流程, 将这些组件的使用场景及流程进行一个详细的说明

        这一章主要学习NioEventLoop相关的知识, 何为NioEventLoop? NioEventLoop是netty的一个线程, 在上一节我们创建两个NioEventLoopGroup:


1
2
3
1EventLoopGroup bossGroup = new NioEventLoopGroup(1);
2EventLoopGroup workerGroup = new NioEventLoopGroup();
3

        这里创建了两个group, 我们提过这是boss线程组和worker线程组, 其实这两个线程组就相当于两个NioEventLoop的集合, 默认每个NioEventLoopGroup创建时, 如果不传入线程数, 会创建cpu核数*2个NioEventLoop线程, 其中boss线程通过轮询处理Server的accept事件, 而完成accept事件之后, 就会创建客户端channel, 通过一定的策略, 分发到worker线程进行处理, 而worker线程, 则主要用于处理客户端的读写事件

        除了轮询事件, EventLoop线程还维护了两个队列, 一个是延迟任务队列, 另一个是普通任务队列, 在进行事件轮询的同时, 如果队列中有任务需要执行则会去执行队列中的任务

        一个NioEventLoop绑定一个selector用于处理多个客户端channel, 但是一个客户端channel只能被一个NioEventLoop处理, 具体关系如图2-0-1所示:

 

Netty源码分析第2章(NioEventLoop)---->第1节: NioEventLoopGroup之创建线程执行器

2-0-1

        图中我们看到, 一个NioEventLoopGroup下有多个NioEventLoop线程, 而一个线程可以处理多个channel, 其中有个叫pipeline和handler的东西, 同学们可能比较陌生, 这是netty的事件传输机制, 每个pipeline和channel唯一绑定, 这里只需要稍作了解, 之后章节会带大家详细剖析

 

        了解了这些概念, 我们继续以小节的形式对NioEventLoop进行剖析

 

第一节:  NioEventLoopGroup之创建线程执行器

 

首先回到第一章最开始的
demo, 我们最初创建了两个线程组
:


1
2
3
1EventLoopGroup bossGroup = new NioEventLoopGroup(1);
2EventLoopGroup workerGroup = new NioEventLoopGroup();
3

这里
, 我们跟随创建
EventLoopGroup
的构造方法
, 来继续学习
NioEventLoopGroup的创建过程


workerGroup为例我们跟进其构造方法
:


1
2
3
4
1public NioEventLoopGroup() {
2    this(0);
3}
4

继续跟进
this(0):


1
2
3
4
1public NioEventLoopGroup(int nThreads) {
2    this(nThreads, (Executor) null);
3}
4

这里的
nThreads就是刚传入的
0, 继续跟进:


1
2
3
4
1public NioEventLoopGroup(int nThreads, Executor executor) {
2    this(nThreads, executor, SelectorProvider.provider());
3}
4

这里
nThreads仍然为
0, executor为
null, 这个
execute是用于开启
NioEventLoop线程所需要的线程执行器
, SelectorProvider.provider()是用于创建
selector, 这个之后会讲到

我们一直跟到构造方法最后
:


1
2
3
4
5
6
7
1public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
2                         final SelectorProvider selectorProvider,
3                         final SelectStrategyFactory selectStrategyFactory,
4                         final RejectedExecutionHandler rejectedExecutionHandler) {
5    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
6}
7

这里调用了父类的构造方法

跟进
super, 进入了其父类
MultithreadEventExecutorGroup的构造方法中:


1
2
3
4
5
1protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
2                                 Object... args) {
3    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
4}
5

这里我们看到
, 如果传入的线程数量参数为
0, 则会给一个默认值
, 这个默认值就是两倍的
CPU核数
, chooserFactory是用于创建线程选择器
, 之后会讲到
, 继续跟代码之后
, 我们就看到了创建
NioEventLoop的真正逻辑
,

MultithreadEventExecutorGroup
类的构造方法中

跟到MultithreadEventExecutorGroup类的构造方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
2                                        EventExecutorChooserFactory chooserFactory, Object... args) {
3    //代码省略
4    if (executor == null) {
5        //创建一个新的线程执行器(1)
6        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
7    }
8    //构造NioEventLoop(2)
9    children = new EventExecutor[nThreads];
10    for (int i = 0; i < nThreads; i ++) {
11        boolean success = false;
12        try {
13            children[i] = newChild(executor, args);
14            success = true;
15        } catch (Exception e) {
16            throw new IllegalStateException("failed to create a child event loop", e);
17        } finally {
18           //代码省略
19        }
20    }
21    //创建线程选择器(3)
22    chooser = chooserFactory.newChooser(children);
23    //代码省略
24}
25

这边将代码主要分为三个步骤
:

创建线程执行器

创建
EventLoop

创建线程选择器

这一小节我们主要剖析第
1步
, 创建线程执行器
:

这里有个
new DefaultThreadFactory()创建一个
DefaultThreadFactory对象
, 这个对象作为参数传入
ThreadPerTaskExecutor的构造函数
,  DefaultThreadFactory顾名思义
, 是一个线程工厂
, 用于创建线程的
, 简单看下这个类的继承关系
:


1
2
1public class DefaultThreadFactory implements ThreadFactory{//类体}
2

这里继承了
jdk底层
ThreadFactory类
, 用于创建线程

我们继续跟进该类的构造方法
:


1
2
3
4
1protected ThreadFactory newDefaultThreadFactory() {
2    return new DefaultThreadFactory(getClass());
3}
4

其中
getClass()就是当前类的
class对象
, 而当前类是
NioEventLoopGroup

继续跟进到
DefaultThreadFactory的构造方法中:


1
2
3
4
1public DefaultThreadFactory(Class<?> poolType) {
2    this(poolType, false, Thread.NORM_PRIORITY);
3}
4

poolType

NioEventLoop的
class对象
, Thread.NORM_PRIORITY是设置默认的优先级为
5

继续跟构造方法
:


1
2
3
4
1public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
2    this(toPoolName(poolType), daemon, priority);
3}
4

这里的
toPoolName(poolType)是将线程组命名
, 这里返回后结果是
"nioEventLoopGroup"(开
n头小写
), daemon为
false, priority为
5

继续跟构造方法
:


1
2
3
4
5
1public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
2    this(poolName, daemon, priority, System.getSecurityManager() == null ?
3            Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
4}
5

 
System.getSecurityManager() ==
null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup() 这段代码是通过三目运算创建
jdk底层的线程组

继续跟
this():


1
2
3
4
5
6
7
8
9
10
11
1public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
2    //省略验证代码
3    //线程名字前缀
4    prefix = poolName + '-' + poolId.incrementAndGet() + '-';
5    this.daemon = daemon;
6    //优先级
7    this.priority = priority;
8    //初始化线程组
9    this.threadGroup = threadGroup;
10}
11

这里初始化了
DefaultThreadFactory的属性
, prefix为
poolName(也就是
nioEventLoopGroup)+'-'+线程组
id(原子自增
)+'-'

以及初始化了优先级和
jdk底层的线程组等属性

 

 

回到最初
MultithreadEventExecutorGroup类的构造方法中
, 我们看继续看第一步
:


1
2
3
1//创建一个新的线程执行器(1)
2executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
3

我们继续跟进
ThreadPerTaskExecutor的类中
:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1public final class ThreadPerTaskExecutor implements Executor {
2
3    private final ThreadFactory threadFactory;
4
5    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
6        if (threadFactory == null) {
7            throw new NullPointerException("threadFactory");
8        }
9        this.threadFactory = threadFactory;
10    }
11
12    @Override
13    public void execute(Runnable command) {
14        //起一个线程
15        threadFactory.newThread(command).start();
16    }
17}
18

我们发现这个类非常简单
, 继承了
jdk的
Executor类
, 从继承关系中我就能猜想到
, 而这个类就是用于开启线程的线程执行器

 

构造方法传入
ThreadFactory类型的参数
, 这个
ThreadFactory就是我们刚才剖析的
DefaultThreadFactory, 这个类继承了
ThreadFactory, 所以在构造方法中初始化了
ThreadFactory类型的属性

 

我们再看重写的 
execute(Runnable command) 方法
, 传入一个任务
, 然后由
threadFactory对象创建一个线程执行该任务

这个
execute(Runnable command)方法
, 其实就是用开开启
NioEventLoop线程用的
, 那么
NioEventLoop什么时候开启的
, 后面章节会进行剖析

这样
, 通过 
executor =
new ThreadPerTaskExecutor(newDefaultThreadFactory()) 这种方式就返回了一个线程执行器
Executor, 用于开启
NioEventLoop线程

 

 

上一节: 绑定端口

下一节: NioEventLoopGroup之NioEventLoop的创建

 

 

给TA打赏
共{{data.count}}人
人已打赏
安全技术

bootstrap栅格系统自定义列

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索