Unsafe 是channel的内部接口, 负责跟socket底层打交道。从书写跟命名上看是不公开给开发者使用的,直到最后实现NioSocketChannelUnsafe也没有公开出去
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 1public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
2 interface Unsafe {
3 RecvByteBufAllocator.Handle recvBufAllocHandle();
4 SocketAddress localAddress();
5 SocketAddress remoteAddress();
6 void register(EventLoop eventLoop, ChannelPromise promise);
7 void bind(SocketAddress localAddress, ChannelPromise promise);
8 void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
9 void disconnect(ChannelPromise promise);
10 void close(ChannelPromise promise);
11 void closeForcibly();
12 void deregister(ChannelPromise promise);
13 void beginRead();
14 void write(Object msg, ChannelPromise promise);
15 void flush();
16 ChannelPromise voidPromise();
17 ChannelOutboundBuffer outboundBuffer();
18 }
19 public interface NioUnsafe extends Unsafe {
20 SelectableChannel ch();
21 void finishConnect();
22 void read();
23 void forceFlush();
24 }
25}
26
NioSocketChannelUnsafe 继承关系为: NioSocketChannelUnsafe -> NioByteUnsafe -> AbstractNioUnsafe -> AbstractUnsafe
AbstractUnsafe:负责socket 链路绑定、接受、关闭,数据fush操作
每个操作大概分四个阶段处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 1@Override
2 public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
3 assertEventLoop();
4 //执行前检查
5 if (!promise.setUncancellable() || !ensureOpen(promise)) {
6 return;
7 }
8
9 boolean wasActive = isActive();
10 //调用实现
11 try {
12 doBind(localAddress);
13 } catch (Throwable t) {
14 safeSetFailure(promise, t);
15 closeIfClosed();
16 return;
17 }
18
19 //调用业务,通知pipeline
20 if (!wasActive && isActive()) {
21 invokeLater(()-> pipeline.fireChannelActive(););
22 }
23 //完成阶段处理
24 safeSetSuccess(promise);
25 }
26
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 1@Override
2 public final void flush() {
3 assertEventLoop();
4
5 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
6 if (outboundBuffer == null) {
7 return;
8 }
9
10 outboundBuffer.addFlush();
11 flush0();
12 }
13
14 @SuppressWarnings("deprecation")
15 protected void flush0() {
16 //刚完成Flush操作
17 if (inFlush0) {
18 return;
19 }
20
21 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
22 if (outboundBuffer == null || outboundBuffer.isEmpty()) {
23 return;
24 }
25
26 inFlush0 = true;
27
28 //发送数据前链路检查
29 if (!isActive()) {
30 try {
31 if (isOpen()) {
32 //true 通知 handler channelWritabilityChanged方法
33 outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
34 } else {
35 outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
36 }
37 } finally {
38 inFlush0 = false;
39 }
40 return;
41 }
42
43 try {
44 //调用channel实现
45 doWrite(outboundBuffer);
46 } catch (Throwable t) {
47 if (t instanceof IOException && config().isAutoClose()) {
48 close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
49 } else {
50 outboundBuffer.failFlushed(t, true);
51 }
52 } finally {
53 inFlush0 = false;
54 }
55 }
56
AbstractNioUnsafe:是NioUnsafe接口模板类,简单的包装
NioByteUnsafe:主要对NioUnsafe接口 read操作实现
NioSocketChannelUnsafe:只是简单的包装,最终公开给内部使用
NioByteUnsafe read方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 1 public final void read() {
2 final ChannelConfig config = config();
3 final ChannelPipeline pipeline = pipeline();
4 final ByteBufAllocator allocator = config.getAllocator();
5 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
6 allocHandle.reset(config);
7
8 ByteBuf byteBuf = null;
9 boolean close = false;
10 try {
11 do {
12 byteBuf = allocHandle.allocate(allocator);
13 //填充byteBuf 调用channel实现
14 int size = doReadBytes(byteBuf);
15 //记录最后读取长度
16 allocHandle.lastBytesRead(size);
17 //链路关闭,释放byteBuf
18 if (allocHandle.lastBytesRead() <= 0) {
19 byteBuf.release();
20 byteBuf = null;
21 close = allocHandle.lastBytesRead() < 0;
22 break;
23 }
24 //自增消息读取处理次数
25 allocHandle.incMessagesRead(1);
26 //已完成填充byteBuf 调用业务pipeline
27 readPending = false;
28 pipeline.fireChannelRead(byteBuf);
29 byteBuf = null;
30 } while (allocHandle.continueReading());
31
32 allocHandle.readComplete();
33 pipeline.fireChannelReadComplete();
34
35 if (close) {
36 closeOnRead(pipeline);
37 }
38 } catch (Throwable t) {
39 handleReadException(pipeline, byteBuf, t, close, allocHandle);
40 } finally {
41 //如果不是主动read 要完成后要清理read op
42 if (!readPending && !config.isAutoRead()) {
43 removeReadOp();
44 }
45 }
46 }
47 }
48
小结:可以看出没有任何的计算代码,Unsafe只实现边界检查、流程控制,具体实现交给上层处理