Netty源码分析第3章(客户端接入流程)—->第2节: 处理接入事件之handle的创建

释放双眼,带上耳机,听听看~!

 

Netty源码分析第三章: 客户端接入流程

 

第二节: 处理接入事件之handle的创建

 

上一小节我们剖析完成了与channel绑定的ChannelConfig初始化相关的流程, 这一小节继续剖析客户端连接事件的处理

回到上一章NioEventLoop的processSelectedKey ()方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
2    //获取到channel中的unsafe
3    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
4    //如果这个key不是合法的, 说明这个channel可能有问题
5    if (!k.isValid()) {
6        //代码省略
7    }
8    try {
9        //如果是合法的, 拿到key的io事件
10        int readyOps = k.readyOps();
11        //链接事件
12        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
13            int ops = k.interestOps();
14            ops &= ~SelectionKey.OP_CONNECT;
15            k.interestOps(ops);
16            unsafe.finishConnect();
17        }
18        //写事件
19        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
20            ch.unsafe().forceFlush();
21        }
22        //读事件和接受链接事件
23        //如果当前NioEventLoop是work线程的话, 这里就是op_read事件
24        //如果是当前NioEventLoop是boss线程的话, 这里就是op_accept事件
25        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
26            unsafe.read();
27            if (!ch.isOpen()) {
28                return;
29            }
30        }
31    } catch (CancelledKeyException ignored) {
32        unsafe.close(unsafe.voidPromise());
33    }
34}
35

我们看其中的if判断:


1
2
1if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
2

上一小节我们分析过, 如果当前NioEventLoop是work线程的话, 这里就是op_read事件, 如果是当前NioEventLoop是boss线程的话, 这里就是op_accept事件, 这里我们以boss线程为例进行分析

 

之前我们讲过, 无论处理op_read事件还是op_accept事件, 都走的unsafe的read()方法, 这里unsafe是通过channel拿到, 我们知道如果是处理accept事件, 这里的channel是NioServerSocketChannel, 这里与之绑定的unsafe是NioMessageUnsafe

我们跟到NioMessageUnsafe的read()方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
1public void read() {
2    //必须是NioEventLoop方法调用的, 不能通过外部线程调用
3    assert eventLoop().inEventLoop();
4    //服务端channel的config
5    final ChannelConfig config = config();
6    //服务端channel的pipeline
7    final ChannelPipeline pipeline = pipeline();
8    //处理服务端接入的速率
9    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
10    //设置配置
11    allocHandle.reset(config);
12    boolean closed = false;
13    Throwable exception = null;
14    try {
15        try {
16            do {
17                //创建jdk底层的channel
18                //readBuf用于临时承载读到链接
19                int localRead = doReadMessages(readBuf);
20                if (localRead == 0) {
21                    break;
22                }
23                if (localRead < 0) {
24                    closed = true;
25                    break;
26                }
27                //分配器将读到的链接进行计数
28                allocHandle.incMessagesRead(localRead);
29                //连接数是否超过最大值
30            } while (allocHandle.continueReading());
31        } catch (Throwable t) {
32            exception = t;
33        }
34        int size = readBuf.size();
35        //遍历每一条客户端连接
36        for (int i = 0; i < size; i ++) {
37            readPending = false;
38            //传递事件, 将创建NioSokectChannel进行传递
39            //最终会调用ServerBootstrap的内部类ServerBootstrapAcceptor的channelRead()方法
40            pipeline.fireChannelRead(readBuf.get(i));
41        }
42        readBuf.clear();
43        allocHandle.readComplete();
44        pipeline.fireChannelReadComplete();
45        //代码省略
46    } finally {
47        //代码省略
48    }
49}
50

首先获取与NioServerSocketChannel绑定config和pipeline, config我们上一小节进行分析过, pipeline我们将在下一章进行剖析

我们看这一句:


1
2
1final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
2

这里通过RecvByteBufAllocator接口调用了其内部接口Handler

我们看其RecvByteBufAllocator接口:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1public interface RecvByteBufAllocator {
2    Handle newHandle();
3    interface Handle {
4        int guess();
5        void reset(ChannelConfig config);
6        void incMessagesRead(int numMessages);
7        void lastBytesRead(int bytes);
8        int lastBytesRead();
9        void attemptedBytesRead(int bytes);
10        int attemptedBytesRead();
11        boolean continueReading();
12        void readComplete();    
13    }
14}
15

我们看到RecvByteBufAllocator接口只有一个方法newHandle(), 顾名思义就是用于创建Handle对象的方法, 而Handle中的方法, 才是实际用于操作的方法

在RecvByteBufAllocator实现类中包含Handle的子类, 具体实现关系如下:

Netty源码分析第3章(客户端接入流程)---->第2节: 处理接入事件之handle的创建

3-2-1

回到read()方法中再看这段代码:


1
2
1final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
2

unsafe()返回当前channel绑定的unsafe对象, recvBufAllocHandle()最终会调用AbstractChannel内部类AbstractUnsafe的recvBufAllocHandle()方法

跟进AbstractUnsafe的recvBufAllocHandle()方法:


1
2
3
4
5
6
7
8
1public RecvByteBufAllocator.Handle recvBufAllocHandle() {
2    //如果不存在, 则创建一个recvHandle的实例
3    if (recvHandle == null) {
4        recvHandle = config().getRecvByteBufAllocator().newHandle();
5    }
6    return recvHandle;
7}
8

如果如果是第一次执行到这里, 自身属性recvHandle为空, 会创建一个recvHandle实例, config()返回NioServerSocketChannel绑定的ChannelConfig, getRecvByteBufAllocator()获取其RecvByteBufAllocator对象, 这两部分上一小节剖析过了, 这里通过newHandle()创建一个Handle, 这里会走到AdaptiveRecvByteBufAllocator类中的newHandle()方法中

跟进newHandle()方法中:


1
2
3
4
1public Handle newHandle() {
2    return new HandleImpl(minIndex, maxIndex, initial);
3}
4

这里创建HandleImpl传入了三个参数, 这三个参数我们上一小节剖析过, minIndex为最小内存在SIZE_TABLE中的下标, maxIndex为最大内存在SEIZE_TABEL中的下标, initial是初始内存, 我们跟到HandleImpl的构造方法中:


1
2
3
4
5
6
7
1public HandleImpl(int minIndex, int maxIndex, int initial) {
2    this.minIndex = minIndex;
3    this.maxIndex = maxIndex;
4    index = getSizeTableIndex(initial);
5    nextReceiveBufferSize = SIZE_TABLE[index];
6}
7

初始化minIndex和maxIndex, 根据initial找到当前的下标, nextReceiveBufferSize是根据当前的下标找到对应的内存

这样, 我们就创建了个Handle对象

在这里我们需要知道, 这个handle, 是和channel唯一绑定的属性, 而AdaptiveRecvByteBufAllocator对象是和ChannelConfig对象唯一绑定的, 间接也是和channel进行唯一绑定

 

继续回到read()方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
1public void read() {
2    //必须是NioEventLoop方法调用的, 不能通过外部线程调用
3    assert eventLoop().inEventLoop();
4    //服务端channel的config
5    final ChannelConfig config = config();
6    //服务端channel的pipeline
7    final ChannelPipeline pipeline = pipeline();
8    //处理服务端接入的速率
9    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
10    //设置配置
11    allocHandle.reset(config);
12    boolean closed = false;
13    Throwable exception = null;
14    try {
15        try {
16            do {
17                //创建jdk底层的channel
18                //readBuf用于临时承载读到链接
19                int localRead = doReadMessages(readBuf);
20                if (localRead == 0) {
21                    break;
22                }
23                if (localRead < 0) {
24                    closed = true;
25                    break;
26                }
27                //分配器将读到的链接进行计数
28                allocHandle.incMessagesRead(localRead);
29                //连接数是否超过最大值
30            } while (allocHandle.continueReading());
31        } catch (Throwable t) {
32            exception = t;
33        }
34        int size = readBuf.size();
35        //遍历每一条客户端连接
36        for (int i = 0; i < size; i ++) {
37            readPending = false;
38            //传递事件, 将创建NioSokectChannel进行传递
39            //最终会调用ServerBootstrap的内部类ServerBootstrapAcceptor的channelRead()方法
40            pipeline.fireChannelRead(readBuf.get(i));
41        }
42        readBuf.clear();
43        allocHandle.readComplete();
44        pipeline.fireChannelReadComplete();
45        //代码省略
46    } finally {
47        //代码省略
48    }
49}
50

继续往下跟:


1
2
1allocHandle.reset(config);
2

这个段代码是重新设置配置, 也就是将之前的配置信息进行初始化, 最终会走到, DefaultMaxMessagesRecvByteBufAllocator中的内部类MaxMessageHandle的reet中

我们跟进reset中:


1
2
3
4
5
6
1public void reset(ChannelConfig config) {
2    this.config = config;
3    maxMessagePerRead = maxMessagesPerRead();
4    totalMessages = totalBytesRead = 0;
5}
6

这里仅仅对几个属性做了赋值, 简单介绍下这几个属性:

config:当前channelConfig对象

maxMessagePerRead:表示读取消息的时候可以读取几次(循环次数), maxMessagesPerRead()返回的是RecvByteBufAllocator的maxMessagesPerRead属性, 上一小节已经做过剖析

totalMessages:代表目前读循环已经读取的消息个数, 在NIO传输模式下也就是已经执行的循环次数, 这里初始化为0

totalBytesRead:代表目前已经读取到的消息字节总数, 这里同样也初始化为0

我们继续往下走, 这里首先是一个do-while循环, 循环体里通过int localRead = doReadMessages(readBuf)这种方式将读取到的连接数放入到一个List集合中, 这一步我们下一小节再分析, 我们继续往下走:

 

我们首先看allocHandle.incMessagesRead(localRead)这一步, 这里的localRead表示这次循环往readBuf中放入的连接数, 在Nio模式下这, 如果读取到一条连接会返回1

跟到中的MaxMessageHandle的incMessagesRead(int amt)方法中:


1
2
3
4
1public final void incMessagesRead(int amt) {
2    totalMessages += amt;
3}
4

这里将totalMessages增加amt, 也就是+1

这里totalMessage, 刚才已经剖析过, 在NIO传输模式下也就是已经执行的循环次数, 这里每次执行一次循环都会加一

再去看循环终止条件allocHandle.continueReading()

跟到MaxMessageHandle的continueReading()方法中:


1
2
3
4
5
6
7
8
9
10
11
1public boolean continueReading() {
2    //config.isAutoRead()默认返回true
3    // totalMessages < maxMessagePerRead
4    //totalMessages代表当前读到的链接, 默认是1
5    //maxMessagePerRead每一次最大读多少链接(默认16)
6    return config.isAutoRead() &&
7           attemptedBytesRead == lastBytesRead &&
8           totalMessages < maxMessagePerRead &&
9           totalBytesRead < Integer.MAX_VALUE;
10}
11

我们逐个分析判断条件:

config.isAutoRead(): 这里默认为true

attemptedBytesRead == lastBytesRead: 表示本次读取的字节数和最后一次读取的字节数相等, 因为到这里都没有进行字节数组的读取操作, 所以默认都为0, 这里也返回true

totalMessages < maxMessagePerRead: 表示当前读取的次数是否小于最大读取次数, 我们知道totalMessages每次循环都会自增, 而maxMessagePerRead默认值为16, 所以这里会限制循环不能超过16次, 也就是最多一次只能读取16条连接

totalBytesRead < Integer.MAX_VALUE: 表示读取的字节数不能超过int类型的最大值

这里就剖析完了Handle的创建和初始化过程, 并且剖析了循环终止条件等相关的逻辑

 

上一节: 初始化NioSocketChannelConfig

下一节: NioSocketChannel的创建

 

给TA打赏
共{{data.count}}人
人已打赏
安全技术

Bootstrap 4 Flex(弹性)布局

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索