[编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现

释放双眼,带上耳机,听听看~!

Unsafe 是channel的内部接口, 负责跟socket底层打交道。从书写跟命名上看是不公开给开发者使用的,直到最后实现NioSocketChannelUnsafe也没有公开出去


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
2    interface Unsafe {
3        RecvByteBufAllocator.Handle recvBufAllocHandle();
4        SocketAddress localAddress();
5        SocketAddress remoteAddress();
6        void register(EventLoop eventLoop, ChannelPromise promise);
7        void bind(SocketAddress localAddress, ChannelPromise promise);
8        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
9        void disconnect(ChannelPromise promise);
10        void close(ChannelPromise promise);
11        void closeForcibly();
12        void deregister(ChannelPromise promise);
13        void beginRead();
14        void write(Object msg, ChannelPromise promise);
15        void flush();
16        ChannelPromise voidPromise();
17        ChannelOutboundBuffer outboundBuffer();
18    }
19    public interface NioUnsafe extends Unsafe {
20        SelectableChannel ch();
21        void finishConnect();
22        void read();
23        void forceFlush();
24    }
25}
26

 

NioSocketChannelUnsafe 继承关系为: NioSocketChannelUnsafe -> NioByteUnsafe -> AbstractNioUnsafe -> AbstractUnsafe

AbstractUnsafe:负责socket 链路绑定、接受、关闭,数据fush操作

每个操作大概分四个阶段处理


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1@Override
2        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
3            assertEventLoop();
4            //执行前检查
5            if (!promise.setUncancellable() || !ensureOpen(promise)) {
6                return;
7            }
8
9            boolean wasActive = isActive();
10            //调用实现
11            try {
12                doBind(localAddress);
13            } catch (Throwable t) {
14                safeSetFailure(promise, t);
15                closeIfClosed();
16                return;
17            }
18
19            //调用业务,通知pipeline
20            if (!wasActive && isActive()) {
21                invokeLater(()-> pipeline.fireChannelActive(););
22            }
23            //完成阶段处理
24            safeSetSuccess(promise);
25        }
26

 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
1@Override
2        public final void flush() {
3            assertEventLoop();
4
5            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
6            if (outboundBuffer == null) {
7                return;
8            }
9
10            outboundBuffer.addFlush();
11            flush0();
12        }
13
14        @SuppressWarnings("deprecation")
15        protected void flush0() {
16            //刚完成Flush操作
17            if (inFlush0) {
18                 return;
19            }
20
21            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
22            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
23                return;
24            }
25
26            inFlush0 = true;
27
28            //发送数据前链路检查
29            if (!isActive()) {
30                try {
31                    if (isOpen()) {
32                        //true 通知 handler channelWritabilityChanged方法
33                        outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
34                    } else {
35                        outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
36                    }
37                } finally {
38                    inFlush0 = false;
39                }
40                return;
41            }
42
43            try {
44                //调用channel实现
45                doWrite(outboundBuffer);
46            } catch (Throwable t) {
47                if (t instanceof IOException && config().isAutoClose()) {
48                    close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
49                } else {
50                    outboundBuffer.failFlushed(t, true);
51                }
52            } finally {
53                inFlush0 = false;
54            }
55        }
56

 

AbstractNioUnsafe:是NioUnsafe接口模板类,简单的包装

NioByteUnsafe:主要对NioUnsafe接口 read操作实现

NioSocketChannelUnsafe:只是简单的包装,最终公开给内部使用

NioByteUnsafe read方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
1      public final void read() {
2            final ChannelConfig config = config();
3            final ChannelPipeline pipeline = pipeline();
4            final ByteBufAllocator allocator = config.getAllocator();
5            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
6            allocHandle.reset(config);
7
8            ByteBuf byteBuf = null;
9            boolean close = false;
10            try {
11                do {
12                    byteBuf = allocHandle.allocate(allocator);
13                    //填充byteBuf 调用channel实现
14                    int size = doReadBytes(byteBuf);
15                    //记录最后读取长度
16                    allocHandle.lastBytesRead(size);
17                    //链路关闭,释放byteBuf
18                    if (allocHandle.lastBytesRead() <= 0) {
19                        byteBuf.release();
20                        byteBuf = null;
21                        close = allocHandle.lastBytesRead() < 0;
22                        break;
23                    }
24                    //自增消息读取处理次数
25                    allocHandle.incMessagesRead(1);
26                    //已完成填充byteBuf 调用业务pipeline
27                    readPending = false;
28                    pipeline.fireChannelRead(byteBuf);
29                    byteBuf = null;
30                } while (allocHandle.continueReading());
31
32                allocHandle.readComplete();
33                pipeline.fireChannelReadComplete();
34
35                if (close) {
36                    closeOnRead(pipeline);
37                }
38            } catch (Throwable t) {
39                handleReadException(pipeline, byteBuf, t, close, allocHandle);
40            } finally {
41                //如果不是主动read 要完成后要清理read op
42                if (!readPending && !config.isAutoRead()) {
43                    removeReadOp();
44                }
45            }
46        }
47    }
48

小结:可以看出没有任何的计算代码,Unsafe只实现边界检查、流程控制,具体实现交给上层处理

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全经验

ThinkCMF 5.0.181212 发布,包含安全更新

2018-12-12 11:12:22

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索