Unsafe是托委访问socket,那么Channel是直接提供给开发者使用的
Channel 主要有两个实现 NioServerSocketChannel同NioSocketChannel 致于其它不常用不在研究范围内
NioServerSocketChannel 是给server用的,程序由始至终只有一个NioServerSocketChannel
NioSocketChannel 是给客户端用的,每个连接生成一个NioSocketChannel 对象
NioSocketChannel同NioSocketChannel的继承关系
NioSocketChannel -> AbstractNioByteChannel ->** AbstractNioChannel -> AbstractChannel**
NioServerSocketChannel -> AbstractNioMessageChannel->** AbstractNioChannel -> AbstractChannel**
小提示:如果看文字不够直观可以在eclipse里按快捷键 选择类 ctrl+t
channel有unsafe相应的实现类,反之亦是。其实功能是很简单的,划分太多对象目的是对某部分功能重用,有时也可能因过渡设计造成
对于channel我们主要分析 I/O read/write操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 1public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {
2 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
3
4 //构造时就绑定SelectorProvider,然后注册OP_ACCEPT
5 public NioServerSocketChannel() {
6 this(newSocket(DEFAULT_SELECTOR_PROVIDER));
7 }
8
9 public NioServerSocketChannel(ServerSocketChannel channel) {
10 super(null, channel, SelectionKey.OP_ACCEPT);
11 config = new NioServerSocketChannelConfig(this, javaChannel().socket());
12 }
13
14
15 /**
16 server read操作对应的为readMessages
17 参数是个数组,是C语言书写风格,如果需要返回多种类型数据,那么传个对象进去外部就能获取到
18 这里比较重要,当有接收到socket时,生成NioSocketChannel对象
19 读者如果还有印象的话在讲NioEventLoop 有提示netty read 操作是不分 OP_ACCEPT、OP_READ的,可以在这方法打上断点观察
20 */
21 @Override
22 protected int doReadMessages(List<Object> buf) throws Exception {
23 SocketChannel ch = javaChannel().accept();
24 try {
25 if (ch != null) {
26 //生成NioSocketChannel
27 buf.add(new NioSocketChannel(this, ch));
28 return 1;
29 }
30 } catch (Throwable t) {
31 ch.close();
32 }
33
34 return 0;
35 }
36 //server 应该没有write操作才对,因为server是一对多处理,不知道发给那一个clinet
37 @Override
38 protected void doWrite(ChannelOutboundBuffer in) throws Exception {}
39}
40
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 1public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
2 public NioSocketChannel(Channel parent, SocketChannel socket) {
3 super(parent, socket);
4 config = new NioSocketChannelConfig(this, socket.socket());
5 }
6
7 //////////////////////////////这部分是unsafe底层调用上层的实现//////////////////////////////////////////////
8 @Override
9 protected int doReadBytes(ByteBuf byteBuf) throws Exception {
10 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
11 //这里设置byteBuf写入数据坐标
12 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
13 return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
14 }
15
16 @Override
17 protected int doWriteBytes(ByteBuf buf) throws Exception {
18 final int expectedWrittenBytes = buf.readableBytes();
19 return buf.readBytes(javaChannel(), expectedWrittenBytes);
20 }
21
22 @Override
23 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
24 for (;;) {
25 int size = in.size();
26 //没有数据退出
27 if (size == 0) {
28 clearOpWrite();
29 break;
30 }
31
32 long writtenBytes = 0; //记录写数据size
33 boolean done = false; //是否完成
34 boolean setOpWrite = false;
35
36
37 ByteBuffer[] nioBuffers = in.nioBuffers();
38 int nioBufferCnt = in.nioBufferCount();
39 long expectedWrittenBytes = in.nioBufferSize();
40 SocketChannel ch = javaChannel();
41
42 //这里有三种分支处理
43 //如果没有ByteBuffer 有可能只发送几个byte
44 //1跟default逻辑其实是一样的
45 switch (nioBufferCnt) {
46 case 0:
47 //调用父类 AbstractNioByteChannel doWrite,逻辑基本相同,不同的是AbstractNioByteChannel处理的是byte 实现调用的是 doWriteBytes(ByteBuf buf)方法。。。
48 super.doWrite(in);
49 return;
50 case 1:
51 //这里只循环16次,可以看出是复制下面代码的哈。。。
52 ByteBuffer nioBuffer = nioBuffers[0];
53 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
54 final int localWrittenBytes = ch.write(nioBuffer);
55 if (localWrittenBytes == 0) {
56 setOpWrite = true;
57 break;
58 }
59 expectedWrittenBytes -= localWrittenBytes;
60 writtenBytes += localWrittenBytes;
61 if (expectedWrittenBytes == 0) {
62 done = true;
63 break;
64 }
65 }
66 break;
67 default:
68 //多个ByteBuffer时跟上面逻辑一样
69 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
70 final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
71 if (localWrittenBytes == 0) {
72 setOpWrite = true;
73 break;
74 }
75 expectedWrittenBytes -= localWrittenBytes;
76 writtenBytes += localWrittenBytes;
77 if (expectedWrittenBytes == 0) {
78 done = true;
79 break;
80 }
81 }
82 break;
83 }
84
85 // Release the fully written buffers, and update the indexes of the partially written buffer.
86 in.removeBytes(writtenBytes);
87
88 if (!done) {
89 // Did not write all buffers completely.
90 incompleteWrite(setOpWrite);
91 break;
92 }
93 }
94 }
95}
96
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 1public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
2 //生成NioSocketChannel时就绑定 unsafe pipeline
3 protected AbstractChannel(Channel parent) {
4 this.parent = parent;
5 id = newId();
6 unsafe = newUnsafe();
7 pipeline = newChannelPipeline();
8 }
9}
10protected abstract class AbstractUnsafe implements Unsafe {
11 private void register0(ChannelPromise promise) {
12 try {
13 if (!promise.setUncancellable() || !ensureOpen(promise)) {
14 return;
15 }
16 boolean firstRegistration = neverRegistered;
17 doRegister();
18 // doRegister 是调用 AbstractNioChannel selectionKey = javaChannel().register(eventLoop().selector, 0, this);
19 neverRegistered = false;
20 registered = true;
21 //这里是添加 Handler 每个Handler会生成一个Context
22 pipeline.invokeHandlerAddedIfNeeded();
23
24 safeSetSuccess(promise);
25 //通知Handler Registered
26 pipeline.fireChannelRegistered();
27 if (isActive()) {
28 if (firstRegistration) {
29 //通知Handler Active
30 pipeline.fireChannelActive();
31 } else if (config().isAutoRead()) {
32 beginRead();
33 }
34 }
35 } catch (Throwable t) {
36 //.......
37 }
38 }
39}
40
小结:看似很复杂的Channel实现其实没想象难,大多数读写坐标记录交给ByteBuf处理掉了
1.server每个client连接转换成NioSocketChannel对象
2.构建NioSocketChannel时就已经生成 unsafe、pipeline