Netty In Action中文版 – 第六章:ChannelHandler

释放双眼,带上耳机,听听看~!

Netty In Action中文版 – 第六章ChannelHandler

Netty In Action中文版 - 第六章:ChannelHandler

本章介绍

  • ChannelPipeline
  • ChannelHandlerContext
  • ChannelHandler
  • Inbound vs outbound(入站和出站)

接受连接或创建他们只是你的应用程序的一部分虽然这些任何很重要但是一个网络应用程序旺旺是更复杂的需要更多的代码编写如处理传入和传出的数据。Netty提供了一个强大的处理这些事情的功能允许用户自定义ChannelHandler的实现来处理数据。使得ChannelHandler更强大的是可以连接每个ChannelHandler来实现任务这有助于代码的整洁和重用。但是处理数据只是ChannelHandler所做的事情之一也可以压制I/O操作例如写请求。所有这些都可以动态实现。

6.1 ChannelPipeline

        ChannelPipeline是ChannelHandler实例的列表用于处理或截获通道的接收和发送数据。ChannelPipeline提供了一种高级的截取过滤器模式让用户可以在ChannelPipeline中完全控制一个事件及如何处理ChannelHandler与ChannelPipeline的交互。

        对于每个新的通道会创建一个新的ChannelPipeline并附加至通道。一旦连接Channel和ChannelPipeline之间的耦合是永久性的。Channel不能附加其他的ChannelPipeline或从ChannelPipeline分离。

        下图描述了ChannelHandler在ChannelPipeline中的I/O处理一个I/O操作可以由一个ChannelInboundHandler或ChannelOutboundHandler进行处理并通过调用ChannelInboundHandler处理入站IO或通过ChannelOutboundHandler处理出站IO。

如上图所示ChannelPipeline是ChannelHandler的一个列表如果一个入站I/O事件被触发这个事件会从第一个开始依次通过ChannelPipeline中的ChannelHandler若是一个入站I/O事件则会从最后一个开始依次通过ChannelPipeline中的ChannelHandler。ChannelHandler可以处理事件并检查类型如果某个ChannelHandler不能处理则会跳过并将事件传递到下一个ChannelHandler。ChannelPipeline可以动态添加、删除、替换其中的ChannelHandler这样的机制可以提高灵活性。

        修改ChannelPipeline的方法

  • addFirst(…)添加ChannelHandler在ChannelPipeline的第一个位置
  • addBefore(…)在ChannelPipeline中指定的ChannelHandler名称之前添加ChannelHandler
  • addAfter(…)在ChannelPipeline中指定的ChannelHandler名称之后添加ChannelHandler
  • addLast(ChannelHandler…)在ChannelPipeline的末尾添加ChannelHandler
  • remove(…)删除ChannelPipeline中指定的ChannelHandler
  • replace(…)替换ChannelPipeline中指定的ChannelHandler

[java] view plain
copy

  1. ChannelPipeline pipeline = ch.pipeline();
  2. FirstHandler firstHandler = 

new FirstHandler();

  1. pipeline.addLast(

"handler1", firstHandler);

  1. pipeline.addFirst(

"handler2", 
new SecondHandler());

  1. pipeline.addLast(

"handler3", 
new ThirdHandler());

  1. pipeline.remove(

"handler3");

  1. pipeline.remove(firstHandler);
  2. pipeline.replace(

"handler2", 
"handler4", 
new FourthHandler());

被添加到ChannelPipeline的ChannelHandler将通过IO-Thread处理事件这意味了必须不能有其他的IO-Thread阻塞来影响IO的整体处理有时候可能需要阻塞例如JDBC。因此Netty允许通过一个EventExecutorGroup到每一个ChannelPipeline.add*方法自定义的事件会被包含在EventExecutorGroup中的EventExecutor来处理默认的实现是DefaultEventExecutorGroup。

        ChannelPipeline除了一些修改的方法还有很多其他的方法具体是方法及使用可以看API文档或源码。

6.2 ChannelHandlerContext

        每个ChannelHandler被添加到ChannelPipeline后都会创建一个ChannelHandlerContext并与之创建的ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler实现进行交互这是相同ChannelPipeline的一部分。ChannelHandlerContext不会改变添加到其中的ChannelHandler因此它是安全的。

6.2.1 通知下一个ChannelHandler

        在相同的ChannelPipeline中通过调用ChannelInboundHandler和ChannelOutboundHandler中各个方法中的一个方法来通知最近的handler通知开始的地方取决你如何设置。下图显示了ChannelHandlerContext、ChannelHandler、ChannelPipeline的关系

        如果你想有一些事件流全部通过ChannelPipeline有两个不同的方法可以做到

  • 调用Channel的方法
  • 调用ChannelPipeline的方法

这两个方法都可以让事件流全部通过ChannelPipeline。无论从头部还是尾部开始因为它主要依赖于事件的性质。如果是一个“入站”事件它开始于头部若是一个“出站”事件则开始于尾部。

        下面的代码显示了一个写事件如何通过ChannelPipeline从尾部开始

[java] view plain
copy

  1. @Override

protected 
void initChannel(SocketChannel ch) 
throws Exception {

  1.     ch.pipeline().addLast(

new ChannelInboundHandlerAdapter() {

  1.         

@Override

  1.         

public 
void channelActive(ChannelHandlerContext ctx) 
throws Exception {

  1.             

//Event via Channel

  1.             Channel channel = ctx.channel();
  2.             channel.write(Unpooled.copiedBuffer(

"netty in action", CharsetUtil.UTF_8));

  1.             

//Event via ChannelPipeline

  1.             ChannelPipeline pipeline = ctx.pipeline();
  2.             pipeline.write(Unpooled.copiedBuffer(

"netty in action", CharsetUtil.UTF_8));

  1.         }
  2.     });
  3. }

下图表示通过Channel或ChannelPipeline的通知

可能你想从ChannelPipeline的指定位置开始不想流经整个ChannelPipeline如下情况

  • 为了节省开销不感兴趣的ChannelHandler不让通过
  • 排除一些ChannelHandler

在这种情况下你可以使用ChannelHandlerContext的ChannelHandler通知起点。它使用ChannelHandlerContext执行下一个ChannelHandler。下面代码显示了直接使用ChannelHandlerContext操作

[java] view plain
copy

  1. // Get reference of ChannelHandlerContext
  2. ChannelHandlerContext ctx = ..;
  3. // Write buffer via ChannelHandlerContext
  4. ctx.write(Unpooled.copiedBuffer(

"Netty in Action", CharsetUtil.UTF_8));

该消息流经ChannelPipeline到下一个ChannelHandler在这种情况下使用ChannelHandlerContext开始下一个ChannelHandler。下图显示了事件流

如上图显示的从指定的ChannelHandlerContext开始跳过前面所有的ChannelHandler使用ChannelHandlerContext操作是常见的模式最常用的是从ChannelHanlder调用操作也可以在外部使用ChannelHandlerContext因为这是线程安全的。

6.2.2 修改ChannelPipeline

        调用ChannelHandlerContext的pipeline()方法能访问ChannelPipeline能在运行时动态的增加、删除、替换ChannelPipeline中的ChannelHandler。可以保持ChannelHandlerContext供以后使用如外部Handler方法触发一个事件甚至从一个不同的线程。

        下面代码显示了保存ChannelHandlerContext供之后使用或其他线程使用

[java] view plain
copy

public 
class WriteHandler 
extends ChannelHandlerAdapter {

  1.     

private ChannelHandlerContext ctx;
1.

  1.     

@Override

  1.     

public 
void handlerAdded(ChannelHandlerContext ctx) 
throws Exception {

  1.         

this.ctx = ctx;

  1.     }
  2.     

public 
void send(String msg){

  1.         ctx.write(msg);
  2.     }
  3. }

请注意ChannelHandler实例如果带有@Sharable注解则可以被添加到多个ChannelPipeline。也就是说单个ChannelHandler实例可以有多个ChannelHandlerContext因此可以调用不同ChannelHandlerContext获取同一个ChannelHandler。如果添加不带@Sharable注解的ChannelHandler实例到多个ChannelPipeline则会抛出异常使用@Sharable注解后的ChannelHandler必须在不同的线程和不同的通道上安全使用。怎么是不安全的使用看下面代码

[java] view plain
copy

  1. @Sharable

public 
class NotSharableHandler 
extends ChannelInboundHandlerAdapter {
1.

  1.     

private 
int count;
1.

  1.     

@Override

  1.     

public 
void channelRead(ChannelHandlerContext ctx, Object msg) 
throws Exception {

  1.         count++;
  2.         System.out.println(

"channelRead(…) called the " + count + 
" time");

  1.         ctx.fireChannelRead(msg);
  2.     }
  3. }

上面是一个带@Sharable注解的Handler它被多个线程使用时里面count是不安全的会导致count值错误。
为什么要共享ChannelHandler使用@Sharable注解共享一个ChannelHandler在一些需求中还是有很好的作用的如使用一个ChannelHandler来统计连接数或来处理一些全局数据等等。

6.3 状态模型

        Netty有一个简单但强大的状态模型并完美映射到ChannelInboundHandler的各个方法。下面是Channel生命周期四个不同的状态

  • channelUnregistered
  • channelRegistered
  • channelActive
  • channelInactive

Channel的状态在其生命周期中变化因为状态变化需要触发下图显示了Channel状态变化

        还可以看到额外的状态变化因为用户允许从EventLoop中注销Channel暂停事件执行然后再重新注册。在这种情况下你会看到多个channelRegistered和channelUnregistered状态的变化而永远只有一个channelActive和channelInactive的状态因为一个通道在其生命周期内只能连接一次之后就会被回收重新连接则是创建一个新的通道。

        下图显示了从EventLoop中注销Channel后再重新注册的状态变化

6.4 ChannelHandler和其子类

        Netty中有3个实现了ChannelHandler接口的类其中2个是接口一个是抽象类。如下图

6.4.1 ChannelHandler中的方法

        Netty定义了良好的类型层次结构来表示不同的处理程序类型所有的类型的父类是ChannelHandler。ChannelHandler提供了在其生命周期内添加或从ChannelPipeline中删除的方法。

  • handlerAddedChannelHandler添加到实际上下文中准备处理事件
  • handlerRemoved将ChannelHandler从实际上下文中删除不再处理事件
  • exceptionCaught处理抛出的异常

上面三个方法都需要传递ChannelHandlerContext参数每个ChannelHandler被添加到ChannelPipeline时会自动创建ChannelHandlerContext。ChannelHandlerContext允许在本地通道安全的存储和检索值。Netty还提供了一个实现了ChannelHandler的抽象类ChannelHandlerAdapter。ChannelHandlerAdapter实现了父类的所有方法基本上就是传递事件到ChannelPipeline中的下一个ChannelHandler直到结束。

6.4.2 ChannelInboundHandler

        ChannelInboundHandler提供了一些方法再接收数据或Channel状态改变时被调用。下面是ChannelInboundHandler的一些方法

  • channelRegisteredChannelHandlerContext的Channel被注册到EventLoop
  • channelUnregisteredChannelHandlerContext的Channel从EventLoop中注销
  • channelActiveChannelHandlerContext的Channel已激活
  • channelInactiveChannelHanderContxt的Channel结束生命周期
  • channelRead从当前Channel的对端读取消息
  • channelReadComplete消息读取完成后执行
  • userEventTriggered一个用户事件被处罚
  • channelWritabilityChanged改变通道的可写状态可以使用Channel.isWritable()检查
  • exceptionCaught重写父类ChannelHandler的方法处理异常

Netty提供了一个实现了ChannelInboundHandler接口并继承ChannelHandlerAdapter的类ChannelInboundHandlerAdapter。ChannelInboundHandlerAdapter实现了ChannelInboundHandler的所有方法作用就是处理消息并将消息转发到ChannelPipeline中的下一个ChannelHandler。ChannelInboundHandlerAdapter的channelRead方法处理完消息后不会自动释放消息若想自动释放收到的消息可以使用SimpleChannelInboundHandler<I>

        看下面代码

[java] view plain
copy

  1. /**
  2.  * 实现ChannelInboundHandlerAdapter的Handler不会自动释放接收的消息对象
  3.  * @author c.k
  4.  *
  5.  */

public 
class DiscardHandler 
extends ChannelInboundHandlerAdapter {

  1.     

@Override

  1.     

public 
void channelRead(ChannelHandlerContext ctx, Object msg) 
throws Exception {

  1.         

//手动释放消息

  1.         ReferenceCountUtil.release(msg);
  2.     }
  3. }

[java] view plain
copy

  1. /**
  2.  * 继承SimpleChannelInboundHandler会自动释放消息对象
  3.  * @author c.k
  4.  *
  5.  */

public 
class SimpleDiscardHandler 
extends SimpleChannelInboundHandler<Object> {

  1.     

@Override

  1.     

protected 
void channelRead0(ChannelHandlerContext ctx, Object msg) 
throws Exception {

  1.         

//不需要手动释放

  1.     }
  2. }

如果需要其他状态改变的通知可以重写Handler的其他方法。通常自定义消息类型来解码字节可以实现ChannelInboundHandler或ChannelInboundHandlerAdapter。有一个更好的解决方法使用编解码器的框架可以很容的实现。使用ChannelInboundHandler、ChannelInboundHandlerAdapter、SimpleChannelInboundhandler这三个中的一个来处理接收消息使用哪一个取决于需求大多数时候使用SimpleChannelInboundHandler处理消息使用ChannelInboundHandlerAdapter处理其他的“入站”事件或状态改变。
ChannelInitializer用来初始化ChannelHandler将自定义的各种ChannelHandler添加到ChannelPipeline中。

6.4.3 ChannelOutboundHandler

        ChannelOutboundHandler用来处理“出站”的数据消息。ChannelOutboundHandler提供了下面一些方法

  • bindChannel绑定本地地址
  • connectChannel连接操作
  • disconnectChannel断开连接
  • close关闭Channel
  • deregister注销Channel
  • read读取消息实际是截获ChannelHandlerContext.read()
  • write写操作实际是通过ChannelPipeline写消息Channel.flush()属性到实际通道
  • flush刷新消息到通道

ChannelOutboundHandler是ChannelHandler的子类实现了ChannelHandler的所有方法。所有最重要的方法采取ChannelPromise因此一旦请求停止从ChannelPipeline转发参数则必须得到通知。Netty提供了ChannelOutboundHandler的实现ChannelOutboundHandlerAdapter。ChannelOutboundHandlerAdapter实现了父类的所有方法并且可以根据需要重写感兴趣的方法。所有这些方法的实现在默认情况下都是通过调用ChannelHandlerContext的方法将事件转发到ChannelPipeline中下一个ChannelHandler。

        看下面的代码

[java] view plain
copy

public 
class DiscardOutboundHandler 
extends ChannelOutboundHandlerAdapter {

  1.     

@Override

  1.     

public 
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 
throws Exception {

  1.         ReferenceCountUtil.release(msg);
  2.         promise.setSuccess();
  3.     }
  4. }

重要的是要记得释放致远并直通ChannelPromise若ChannelPromise没有被通知可能会导致其中一个ChannelFutureListener不被通知去处理一个消息。
如果消息被消费并且没有被传递到ChannelPipeline中的下一个ChannelOutboundHandler那么就需要调用ReferenceCountUtil.release(message)来释放消息资源。一旦消息被传递到实际的通道它会自动写入消息或在通道关闭是释放。

原文地址http://www.bieryun.com/2162.html

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全技术

Mina、Netty、Twisted一起学(一):实现简单的TCP服务器

2022-1-11 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索