[编织消息框架][netty源码分析]8 Channel 实现类NioSocketChannel职责与实现

释放双眼,带上耳机,听听看~!

Unsafe是托委访问socket,那么Channel是直接提供给开发者使用的

Channel 主要有两个实现 NioServerSocketChannel同NioSocketChannel 致于其它不常用不在研究范围内

NioServerSocketChannel 是给server用的,程序由始至终只有一个NioServerSocketChannel

NioSocketChannel 是给客户端用的,每个连接生成一个NioSocketChannel 对象

 

NioSocketChannel同NioSocketChannel的继承关系

NioSocketChannel -> AbstractNioByteChannel ->** AbstractNioChannel -> AbstractChannel**

NioServerSocketChannel -> AbstractNioMessageChannel->** AbstractNioChannel -> AbstractChannel**

小提示:如果看文字不够直观可以在eclipse里按快捷键 选择类 ctrl+t 

[编织消息框架][netty源码分析]8 Channel 实现类NioSocketChannel职责与实现

channel有unsafe相应的实现类,反之亦是。其实功能是很简单的,划分太多对象目的是对某部分功能重用,有时也可能因过渡设计造成

对于channel我们主要分析 I/O read/write操作


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {
2    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
3
4    //构造时就绑定SelectorProvider,然后注册OP_ACCEPT
5    public NioServerSocketChannel() {
6        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
7    }
8    
9    public NioServerSocketChannel(ServerSocketChannel channel) {
10        super(null, channel, SelectionKey.OP_ACCEPT);
11        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
12    }
13    
14
15    /**
16    server read操作对应的为readMessages
17    参数是个数组,是C语言书写风格,如果需要返回多种类型数据,那么传个对象进去外部就能获取到
18    这里比较重要,当有接收到socket时,生成NioSocketChannel对象  
19   读者如果还有印象的话在讲NioEventLoop 有提示netty read 操作是不分 OP_ACCEPT、OP_READ的,可以在这方法打上断点观察
20    */
21    @Override
22    protected int doReadMessages(List<Object> buf) throws Exception {
23        SocketChannel ch = javaChannel().accept();
24        try {
25            if (ch != null) {
26                //生成NioSocketChannel
27                buf.add(new NioSocketChannel(this, ch));
28                return 1;
29            }
30        } catch (Throwable t) {
31            ch.close();
32        }
33
34        return 0;
35    }
36    //server 应该没有write操作才对,因为server是一对多处理,不知道发给那一个clinet
37    @Override
38    protected void doWrite(ChannelOutboundBuffer in) throws Exception {}
39}
40

 

 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
1public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
2    public NioSocketChannel(Channel parent, SocketChannel socket) {
3        super(parent, socket);
4        config = new NioSocketChannelConfig(this, socket.socket());
5    }
6    
7    //////////////////////////////这部分是unsafe底层调用上层的实现//////////////////////////////////////////////
8    @Override
9    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
10        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
11        //这里设置byteBuf写入数据坐标
12        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
13        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
14    }
15
16    @Override
17    protected int doWriteBytes(ByteBuf buf) throws Exception {
18        final int expectedWrittenBytes = buf.readableBytes();
19        return buf.readBytes(javaChannel(), expectedWrittenBytes);
20    }
21
22    @Override
23    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
24        for (;;) {
25            int size = in.size();
26            //没有数据退出
27            if (size == 0) {
28                clearOpWrite();
29                break;
30            }
31            
32            long writtenBytes = 0;    //记录写数据size
33            boolean done = false;    //是否完成
34            boolean setOpWrite = false;
35
36
37            ByteBuffer[] nioBuffers = in.nioBuffers();
38            int nioBufferCnt = in.nioBufferCount();
39            long expectedWrittenBytes = in.nioBufferSize();
40            SocketChannel ch = javaChannel();
41
42            //这里有三种分支处理
43            //如果没有ByteBuffer 有可能只发送几个byte
44            //1跟default逻辑其实是一样的
45            switch (nioBufferCnt) {
46                case 0:
47                    //调用父类 AbstractNioByteChannel doWrite,逻辑基本相同,不同的是AbstractNioByteChannel处理的是byte 实现调用的是 doWriteBytes(ByteBuf buf)方法。。。
48                    super.doWrite(in);
49                    return;
50                case 1:
51                    //这里只循环16次,可以看出是复制下面代码的哈。。。
52                    ByteBuffer nioBuffer = nioBuffers[0];
53                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
54                        final int localWrittenBytes = ch.write(nioBuffer);
55                        if (localWrittenBytes == 0) {
56                            setOpWrite = true;
57                            break;
58                        }
59                        expectedWrittenBytes -= localWrittenBytes;
60                        writtenBytes += localWrittenBytes;
61                        if (expectedWrittenBytes == 0) {
62                            done = true;
63                            break;
64                        }
65                    }
66                    break;
67                default:
68                    //多个ByteBuffer时跟上面逻辑一样
69                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
70                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
71                        if (localWrittenBytes == 0) {
72                            setOpWrite = true;
73                            break;
74                        }
75                        expectedWrittenBytes -= localWrittenBytes;
76                        writtenBytes += localWrittenBytes;
77                        if (expectedWrittenBytes == 0) {
78                            done = true;
79                            break;
80                        }
81                    }
82                    break;
83            }
84
85            // Release the fully written buffers, and update the indexes of the partially written buffer.
86            in.removeBytes(writtenBytes);
87
88            if (!done) {
89                // Did not write all buffers completely.
90                incompleteWrite(setOpWrite);
91                break;
92            }
93        }
94    }
95}
96

 

 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
2    //生成NioSocketChannel时就绑定 unsafe pipeline
3    protected AbstractChannel(Channel parent) {
4        this.parent = parent;
5        id = newId();
6        unsafe = newUnsafe();
7        pipeline = newChannelPipeline();
8    }
9}
10protected abstract class AbstractUnsafe implements Unsafe {
11    private void register0(ChannelPromise promise) {
12        try {
13            if (!promise.setUncancellable() || !ensureOpen(promise)) {
14                return;
15            }
16            boolean firstRegistration = neverRegistered;
17            doRegister();
18            // doRegister 是调用 AbstractNioChannel selectionKey = javaChannel().register(eventLoop().selector, 0, this);
19            neverRegistered = false;
20            registered = true;
21            //这里是添加 Handler 每个Handler会生成一个Context
22            pipeline.invokeHandlerAddedIfNeeded();
23
24            safeSetSuccess(promise);
25            //通知Handler Registered
26            pipeline.fireChannelRegistered();
27            if (isActive()) {
28                if (firstRegistration) {
29                    //通知Handler Active
30                    pipeline.fireChannelActive();
31                } else if (config().isAutoRead()) {
32                    beginRead();
33                }
34            }
35        } catch (Throwable t) {
36            //.......
37        }
38    }
39}
40

小结:看似很复杂的Channel实现其实没想象难,大多数读写坐标记录交给ByteBuf处理掉了

1.server每个client连接转换成NioSocketChannel对象

2.构建NioSocketChannel时就已经生成 unsafe、pipeline

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全网络

Java NIO框架Netty教程 (十) Object对象编/解码

2021-8-18 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索