源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 1public final class Server {
2
3 public static void main(String[] args) throws Exception {
4 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
5 EventLoopGroup workerGroup = new NioEventLoopGroup();
6
7 try {
8 ServerBootstrap b = new ServerBootstrap();
9 b.group(bossGroup, workerGroup)
10 .channel(NioServerSocketChannel.class)
11 .childOption(ChannelOption.TCP_NODELAY, true)
12 //给每个客户端的连接设置一些基本的tcp属性
13 .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
14 //创建每个客户端链接的时候绑定一些基本的属性
15 .handler(new ServerHandler())
16 //服务器启动过程中的一些逻辑
17 .childHandler(new ChannelInitializer<SocketChannel>() {
18 @Override
19 public void initChannel(SocketChannel ch) {
20 ch.pipeline().addLast(new AuthHandler());
21 //..
22
23 }
24 });
25
26 ChannelFuture f = b.bind(8888).sync();
27
28 f.channel().closeFuture().sync();
29 } finally {
30 bossGroup.shutdownGracefully();
31 workerGroup.shutdownGracefully();
32 }
33 }
34}
35
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 1
2import io.netty.channel.ChannelHandlerContext;
3import io.netty.channel.ChannelInboundHandlerAdapter;
4
5import java.util.concurrent.TimeUnit;
6
7public class ServerHandler extends ChannelInboundHandlerAdapter {
8 @Override
9 public void channelActive(ChannelHandlerContext ctx) {
10 System.out.println("channelActive");
11 }
12
13 @Override
14 public void channelRegistered(ChannelHandlerContext ctx) {
15 System.out.println("channelRegistered");
16 }
17
18 @Override
19 public void handlerAdded(ChannelHandlerContext ctx) {
20 System.out.println("handlerAdded");
21 }
22
23 @Override
24 public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
25 super.channelRead(ctx, msg);
26
27 new Thread(new Runnable() {
28 public void run() {
29 // 耗时的操作
30 String result = loadFromDB();
31
32 ctx.channel().writeAndFlush(result);
33 ctx.executor().schedule(new Runnable() {
34 public void run() {
35 // ...
36 }
37 }, 1, TimeUnit.SECONDS);
38
39 }
40 }).start();
41 }
42
43 private String loadFromDB() {
44 return "hello world!";
45 }
46}
47
48
服务端Channel启动过程:
基本过程
- 调jdk底层的api创建jdk的channel,netty将其包装成自己的channel,创建一些基本组件绑定在此channel上,比如pipeline
- 基于此channel初始化一些基本属性,添加一些逻辑处理器
- 将jdk底层的channel注册到事件轮训处理器上,并把netty的服务端channel绑定在jdk底层服务端channel上
- 调jdk底层api,实现对本地端口的监听,绑定成功后,netty会重新向selector注册一个op_accept事件,netty就可以接收新的连接了
创建channel
AbstractChannel是服务端channel和客户端channel****的一个抽象
问题:
服务端的socket****在哪初始化?
newSocket()[通过jdk来创建底层jdk channel]
初始化channel
add ServerBootstrapAcceptor****给接收到的一个新连接分配一个nio的线程
注册selector
绑定线程:把对应的Nio线程和当前的chanel进行绑定, 后续所有I/O事件相关的操作都交给eventloop****处理
实际注册:将jdk底层的channel注册到事件轮训处理器上
**通过invkeHanderAddedIfNeeded()**和fireChannelRegistered两个事件传播到用户方法中
绑定端口
**doBind()**将端口实际绑定到本地
HeadContext.readIfIsAutoRead**:将之前注册到selector的事件重新绑定为accept事件,有新连接进入,selector就会轮训到accept事件,将连接交给netty****处理**
问题:
在哪里accetp****连接
绑定成功后,netty会重新向selector注册一个op_accept事件,netty就可以接收新的连接了