Netty源码分析第4章(pipeline)—->第2节: handler的添加

释放双眼,带上耳机,听听看~!

 

Netty源码分析第四章: pipeline

 

第二节: Handler的添加

 

 

添加handler, 我们以用户代码为例进行剖析:


1
2
3
4
5
6
7
8
1.childHandler(new ChannelInitializer<SocketChannel>() {
2    protected void initChannel(SocketChannel ch) throws Exception {
3        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()[0]));
4        ch.pipeline().addLast(new StringEncoder());
5        ch.pipeline().addLast(new SimpleHandler());
6    }
7});
8

用过netty的小伙伴们肯定对这段代码不会陌生, 通过addLast, 可以添加编解码器和我们自定义的handler, 某一个事件完成之后可以自动调用我们handler预先定义的方法, 具体添加和调用是怎么个执行逻辑, 在我们之后的内容会全部学习到, 以后再使用这类的功能会得心应手

 

在这里, 我们主要剖析 
ch.pipeline().addLast(
new SimpleHandler()) 这部分代码的addLast()方法

首先通过channel拿到当前的pipline, 这个上一小节进行剖析过相信不会陌生

拿到pipeline之后再为其添加handler, 因为channel初始化默认创建的是DefualtChannelPipeline

我们跟到其addLast()方法中:


1
2
3
4
1public final ChannelPipeline addLast(ChannelHandler... handlers) {
2    return addLast(null, handlers);
3}
4

首先看到这里的参数其实是一个可变对象, 也就是可以传递多个handler, 这里我们只传递了一个

我们继续跟addLast:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
2    if (handlers == null) {
3        throw new NullPointerException("handlers");
4    }
5    //传多个参数的时候通过for循环添加
6    for (ChannelHandler h: handlers) {
7        if (h == null) {
8            break;
9        }
10        addLast(executor, null, h);
11    }
12    return this;
13}
14

这里如果传入多个handler则会循环添加, 我们通常只添加一个

再继续跟到addLast()方法中去:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
2    final AbstractChannelHandlerContext newCtx;
3    synchronized (this) {
4        //判断handler是否被重复添加(1)
5        checkMultiplicity(handler);
6        //创建一个HandlerContext并添加到列表(2)
7        newCtx = newContext(group, filterName(name, handler), handler);
8
9        //添加HandlerContext(3)
10        addLast0(newCtx);
11
12        //是否已注册
13        if (!registered) {
14            newCtx.setAddPending();
15            callHandlerCallbackLater(newCtx, true);
16            return this;
17        }
18
19        EventExecutor executor = newCtx.executor();
20        if (!executor.inEventLoop()) {
21            newCtx.setAddPending();
22            //回调用户事件
23            executor.execute(new Runnable() {
24                @Override
25                public void run() {
26                    callHandlerAdded0(newCtx);
27                }
28            });
29            return this;
30        }
31    }
32    //回调添加事件(4)
33    callHandlerAdded0(newCtx);
34    return this;
35}
36

这部分代码比较长, 我们拆解为4个步骤:

1.重复添加验证

2.创建一个HandlerContext并添加到列表

  1. 添加context

  2. 回调添加事件

首先我们看第一步, 重复添加验证

我们跟到checkMultiplicity(handler)中:


1
2
3
4
5
6
7
8
9
10
11
12
13
1private static void checkMultiplicity(ChannelHandler handler) {
2    if (handler instanceof ChannelHandlerAdapter) {
3        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
4        if (!h.isSharable() && h.added) {
5            throw new ChannelPipelineException(
6                    h.getClass().getName() +
7                    " is not a @Sharable handler, so can't be added or removed multiple times.");
8        }
9        //满足条件设置为true, 代表已添加
10        h.added = true;
11    }
12}
13

首先判断是不是ChannelHandlerAdapter类型, 因为我们自定义的handler通常会直接或者间接的继承该接口, 所以这里为true

拿到handler之后转换成ChannelHandlerAdapter类型, 然后进行条件判断

 
if (!h.isSharable() && h.added) 代表如果不是共享的handler, 并且是未添加状态, 则抛出异常:

我们可以跟到isSharable()方法中去:


1
2
3
4
5
6
7
8
9
10
11
12
1public boolean isSharable() {
2    Class<?> clazz = getClass();
3    Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
4    Boolean sharable = cache.get(clazz);
5    if (sharable == null) {
6        //如果这个类注解了Sharable.class, 说明这个类会被多个channel共享
7        sharable = clazz.isAnnotationPresent(Sharable.class);
8        cache.put(clazz, sharable);
9    }
10    return sharable;
11}
12

首先拿到当前handler的class对象

然后再从netty自定义的一个ThreadLocalMap对象中获取一个盛放handler的class对象的map, 并获取其value

如果value值为空, 则会判断是否被Sharable注解, 并将自身handler的class对象和判断结果存入map对象中, 最后返回判断结果

这说明了被Sharable注解的handler是一个共享handler

从这个逻辑我们可以判断, 共享对象是可以重复添加的

 

 

我们回到checkMultiplicity(handler)方法中:

如果是共享对象或者没有被添加, 则将ChannelHandlerAdapter的added设置为true, 代表已添加

剖析完了重复添加验证, 回到addLast方法中, 我们看第二步, 创建一个HandlerContext并添加到列表:


1
2
1newCtx = newContext(group, filterName(name, handler), handler);
2

首先看filterName(name, handler)方法, 这个方法是判断添加handler的name是否重复

跟到filterName方法中:


1
2
3
4
5
6
7
8
9
10
1private String filterName(String name, ChannelHandler handler) {
2    if (name == null) {
3        //没有名字创建默认名字
4        return generateName(handler);
5    }
6    //检查名字是否重复
7    checkDuplicateName(name);
8    return name;
9}
10

因为我们添加handler时候, 不一定会会给handler命名, 所以这一步name有可能是null, 如果是null, 则创建一个默认的名字, 这里创建名字的方法我们就不往里跟了, 有兴趣的同学可以自己跟进去看

然后再检查名字是否重复

我们跟到checkDuplicateName(name)这个方法中:


1
2
3
4
5
6
7
1private void checkDuplicateName(String name) {
2    //不为空
3    if (context0(name) != null) {
4        throw new IllegalArgumentException("Duplicate handler name: " + name);
5    }
6}
7

这里有个context0(name)方法, 我们跟进去:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1private AbstractChannelHandlerContext context0(String name) {
2    //遍历pipeline
3    AbstractChannelHandlerContext context = head.next;
4    while (context != tail) {
5        //发现name相同, 说明存在handler
6        if (context.name().equals(name)) {
7            //返回
8            return context;
9        }
10        context = context.next;
11    }
12    return null;
13}
14

这里做的操作非常简单, 就是将pipeline中, 从head节点往下遍历HandlerContext, 一直遍历到tail, 如果发现名字相同则会认为重复并返回HandlerContext对象

我们回到addLast()方法中并继续看添加创建相关的逻辑:

newCtx = newContext(group, filterName(name, handler), handler)

filterName(name, handler)这步如果并没有重复则会返回handler的name

我们继续跟到newContext(group, filterName(name, handler), handler)方法中:


1
2
3
4
1private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
2    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
3}
4

这里我们看到创建了一个DefaultChannelHandlerContext对象, 构造方法的参数中, 第一个this代表当前的pipeline对象, group为null, 所以childExecutor(group)也会返回null, name为handler的名字, handler为新添加的handler对象

我们继续跟到DefaultChannelHandlerContext的构造方法中:


1
2
3
4
5
6
7
8
9
1DefaultChannelHandlerContext(
2        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
3    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
4    if (handler == null) {
5        throw new NullPointerException("handler");
6    }
7    this.handler = handler;
8}
9

我们看到首先调用了父类的构造方法, 之后将handler赋值为自身handler的成员变量, HandlerConext和handler关系在此也展现了出来, 是一种组合关系

我们首先看父类的构造方法, 有这么两个参数:isInbound(handler), isOutbound(handler), 这两个参数意思是判断需要添加的handler是inboundHandler还是outBoundHandler

跟到isInbound(handler)方法中:


1
2
3
4
1private static boolean isInbound(ChannelHandler handler) {
2    return handler instanceof ChannelInboundHandler;
3}
4

这里通过是否实现ChannelInboundHandler接口来判断是否为inboundhandler

同样我们看isOutbound(handler)方法:


1
2
3
4
1private static boolean isOutbound(ChannelHandler handler) {
2    return handler instanceof ChannelOutboundHandler;
3}
4

通过判断是否实现ChannelOutboundHandler接口判断是否为outboundhandler

 

在跟到其父类AbstractChannelHandlerContext的构造方法中:


1
2
3
4
5
6
7
8
9
10
1AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
2                              boolean inbound, boolean outbound) {
3    this.name = ObjectUtil.checkNotNull(name, "name");
4    this.pipeline = pipeline;
5    this.executor = executor;
6    this.inbound = inbound;
7    this.outbound = outbound;
8    ordered = executor == null || executor instanceof OrderedEventExecutor;
9}
10

一切都不陌生了, 因为我们tail节点和head节点创建的时候同样走到了这里

这里初始化了name, pipeline, 以及标识添加的handler是inboundhanlder还是outboundhandler

我们回到最初的addLast()方法中:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
2    final AbstractChannelHandlerContext newCtx;
3    synchronized (this) {
4        //判断handler是否被重复添加(1)
5        checkMultiplicity(handler);
6        //创建一个HandlerContext并添加到列表(2)
7        newCtx = newContext(group, filterName(name, handler), handler);
8
9        //添加HandlerContext(3)
10        addLast0(newCtx);
11
12        //是否已注册
13        if (!registered) {
14            newCtx.setAddPending();
15            callHandlerCallbackLater(newCtx, true);
16            return this;
17        }
18
19        EventExecutor executor = newCtx.executor();
20        if (!executor.inEventLoop()) {
21            newCtx.setAddPending();
22            //回调用户事件
23            executor.execute(new Runnable() {
24                @Override
25                public void run() {
26                    callHandlerAdded0(newCtx);
27                }
28            });
29            return this;
30        }
31    }
32    //回调添加事件(4)
33    callHandlerAdded0(newCtx);
34    return this;
35}
36

我们跟完了创建HandlerContext的相关逻辑, 我们继续跟第三步, 添加HandlerContext

我们跟进addLast0(newCtx)中:


1
2
3
4
5
6
7
8
9
10
11
12
13
1private void addLast0(AbstractChannelHandlerContext newCtx) {
2    //拿到tail节点的前置节点
3    AbstractChannelHandlerContext prev = tail.prev;
4    //当前节点的前置节点赋值为tail节点的前置节点
5    newCtx.prev = prev;
6    //当前节点的下一个节点赋值为tail节点
7    newCtx.next = tail;
8    //tail前置节点的下一个节点赋值为当前节点
9    prev.next = newCtx;
10    //tail节点的前一个节点赋值为当前节点
11    tail.prev = newCtx;
12}
13

这一部分也非常简单, 做了一个指针的指向操作, 将新添加的handlerConext放在tail节点之前, 之前tail节点的上一个节点之后, 熟悉双向链表的同学对此逻辑应该不会陌生, 如果是第一次添加handler, 那么添加后的结构入下图所示:

Netty源码分析第4章(pipeline)---->第2节: handler的添加

4-2-1

添加完handler之后, 这里会判断当前channel是否已经注册, 这部分逻辑我们之后再进行剖析, 我们继续往下走

之后会判断当前线程线程是否为eventLoop线程, 如果不是eventLoop线程, 就将添加回调事件封装成task交给eventLoop线程执行, 否则, 直接执行添加回调事件callHandlerAdded0(newCtx)

跟进callHandlerAdded0(newCtx):


1
2
3
4
5
6
7
8
9
1private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
2    try {
3        ctx.handler().handlerAdded(ctx);
4        ctx.setAddComplete();
5    } catch (Throwable t) {
6        //忽略代码
7    }
8}
9

我们重点关注这句


1
2
1ctx.handler().handlerAdded(ctx);
2

 

其中ctx是我们新创建的HandlerContext, 通过handler()方法拿到绑定的handler, 也就是新添加的handler, 然后执行handlerAdded(ctx)方法, 如果我们没有重写这个方法, 则会执行父类的该方法

在ChannelHandlerAdapter类中定义了该方法的实现:


1
2
3
4
1@Override
2public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
3}
4

我们看到没做任何操作, 也就是如果我们没有重写该方法时, 如果添加handler之后将不会做任何操作, 这里如果我们需要做一些业务逻辑, 可以通过重写该方法进行实现

以上就是添加handler的有关的业务逻辑

 

上一节: pipeline的创建

下一节: handler的删除

 

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全经验

分布式版本控制系统Git的安装与使用

2021-10-11 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索