Netty websocket

释放双眼,带上耳机,听听看~!

Network protocols

WebSocket是一种高级网络协议,旨在提高Web应用程序的性能和响应能力。 我们将通过编写示例应用程序来探索Netty对它们的支持。

在第12章中,您将学习如何使用WebSocket实现双向数据传输,方法是构建一个聊天室服务器,其中多个浏览器客户端可以实时通信。 您还将看到如何通过检测客户端是否支持它,从应用程序中的HTTP切换到WebSocket协议。

我们将在第13章中总结第3部分,研究Netty对用户数据报协议(UDP)的支持。在这里,您将构建一个广播服务器和监视器客户端,可以适应许多实际用途。

本章讲介绍:

  • real-time web 的概念
  • WebSocket 协议
  • 使用Netty 创建一个基于 WebSocket 的聊天室服务端程序

如果您关注网络技术的最新发展,您很可能会遇到实时网络短语,如果您有工程领域的实时应用程序经验,您可能会对这个术语的含义持怀疑态度。

因此,我们首先要澄清的是,这不是所谓的硬实时服务质量(QoS),其中保证了在指定时间间隔内交付计算结果。 仅仅HTTP的请求/响应设计使得这个问题非常严重,因为过去设计的方法都没有提供令人满意的解决方案。

虽然已经有一些关于正式定义定时Web服务语义的学术讨论,但普遍接受的定义似乎并未出现。 所以现在我们将接受来自维基百科的以下非权威性描述:

The real-time web is a network web using technologies and practices that
enable users to receive information as soon as it is published by its
authors, rather than requiring that they or their software check a source
periodically for updates.
实时网络是一种使用技术和实践的网络网络,使用户能够在作者发布信息后立即接收信息,而不是要求他们或他们的软件定期检查信息源以进行更新。

简而言之,一个成熟的实时网络可能不会即将到来,但其背后的想法正在推动对几乎即时访问信息的不断增长的期望。 我们将在本章中讨论的WebSocket协议是朝着这个方向的良好支持的步骤。

12.1 Introducing WebSocket

WebSocket协议是从头开始设计的,旨在为Web上的双向数据传输问题提供实用的解决方案,允许客户端和服务器随时传输消息,从而要求它们异步处理消息接收。 (最新的浏览器支持WebSocket作为HTML5的客户端API。)

Netty对WebSocket的支持包括所有正在使用的主要实现,因此在您的下一个应用程序中采用它非常简单。 与Netty一样,您可以完全使用协议,而无需担心其内部实现细节。 我们将通过创建基于WebSocket的实时聊天应用程序来证明这一点。

12.2 Our example WebSocket application

我们的示例应用程序将通过使用WebSocket协议实现基于浏览器的聊天应用程序来演示实时功能,例如您可能在Facebook的文本消息功能中遇到过。 我们将通过允许多个用户同时相互通信来进一步发展。
图12.1说明了应用程序逻辑:

  1. 一个客户端发送一条消息。
  2. 这条消息广播到所有已经建立连接的其他客户端。

这就是您期望聊天室工作的方式:每个人都可以与其他人交谈。 在我们的示例中,我们将仅实现服务器端,客户端是通过网页访问聊天室的浏览器。 正如您将在接下来的几页中看到的那样,WebSocket使编写此服务器变得简单。

Netty websocket

12.3 Adding WebSocket support

称为升级握手的机制用于从标准HTTP或HTTPS协议切换到WebSocket。 因此,使用WebSocket的应用程序将始终以HTTP / S开头,然后执行升级。 当恰好发生这种情况时,应用程序是特定的; 它可能是在启动时或者在请求特定URL时。

我们的应用程序采用以下约定:如果请求的URL以/ ws结尾,我们将协议升级到WebSocket。 否则,服务器将使用基本HTTP / S. 连接升级后,所有数据都将使用WebSocket传输。 图12.2说明了服务器逻辑,它一如Netty,将由一组ChannelHandler实现。 在我们解释用于处理HTTP和WebSocket协议的技术时,我们将在下一节中对它们进行描述。

12.3.1 Handling HTTP requests

首先,我们将实现处理HTTP请求的组件。 此组件将提供访问聊天室的页面,并显示已连接客户端发送的消息。 代码清单12.1包含了这个HttpRequestHandler的代码,它为SimpleHttpRequest消息扩展了SimpleChannelInboundHandler。 请注意channelRead0() 的实现如何转发URI / ws的任何请求。

Netty websocket


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
1// Extends SimpleChannelInboundHandler to handle FullHttpRequest messages
2public class HttpRequestHandler
3    extends SimpleChannelInboundHandler<FullHttpRequest> {
4    private final String wsUri;
5    private static final File INDEX;
6    
7    static {
8        URL location = HttpRequestHandler.class
9                         .getProtectionDomain()
10                         .getCodeSource().getLocation();
11        try {
12            String path = location.toURI() + "index.html";
13            path = !path.contains("file:") ? path : path.substring(5);
14            INDEX = new File(path);
15        } catch (URISyntaxException e) {
16            throw new IllegalStateException("Unable to locate index.html", e);
17        }
18     }
19
20     public HttpRequestHandler(String wsUri) {
21         this.wsUri = wsUri;
22     }
23  
24     @Override
25     public void channelRead0(ChannelHandlerContext ctx,
26         FullHttpRequest request) throws Exception {
27         // If a WebSocket upgrade is requested, increments the reference count(retain) and passes it to the next ChannelInboundHandler
28         if(wsUri.equalsIgnoreCase(request.getUri())) {
29             ctx.fireChannelRead(request.retain());
30         } else {
31           // Handlers 100 Continue requests in conformity with HTTP 1.1
32           if(HttpHeaders.is100ContinueExpected(request)) {
33               send100Continue(ctx);
34           }
35           // Reads index.html
36           RandomAccessFile file = new RandomAccessFile(INDEX, "r");
37           HttpResponse response = new DefaultHttpResponse(
38               request.getProtocolVersion(), HttpResponseStatus.OK);
39           response.headers().set(
40               HttpHeaders.Names.CONTENT_TYPE,
41               "text/plain; charset=UTF-8");
42           boolean keepAlive = HttpHeaders.isKeepAlive(request);
43           // If keepalive is requested, adds the required headers
44           if(keepAlive) {                                        
45              response.headers().set(
46                  HttpHeaders.Names.CONTENT_LENGTH, file.length());
47              response.headers().set(HttpHeaders.Names.CONNECTION,
48                  HttpHeaders.Values.KEEP_ALIVE);
49           }
50
51           // Writes the HttpResponse to the client
52           ctx.write(response);
53           // Writes index.html to the client
54           if (ctx.pipeline().get(SslHandler.class) == null) {
55               ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
56           } else {
57              ctx.write(new ChunkedNioFile(file.getChannel()));
58           }
59           // Writes and flushes the LastHttpContent to the client
60           ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
61           if(!keepAlive) {
62              future.addListener(ChannelFutureListener.CLOSE);
63            }
64        }
65    }
66
67    private static void send100Continue(ChannelHandlerContext ctx) {
68       FullHttpResponse response = new DefaultFullHttpResponse{
69           HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
70           ctx.writeAndFlush(response);
71    }
72
73    @Override
74    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
75        throws Exception {
76        cause.printStackTrace();
77        ctx.close();
78    }
79}
80                                      
81
82

如果HTTP请求引用URI / ws,则HttpRequestHandler在FullHttpRequest上调用 retain() 并通过调用 fireChannelRead(msg) 将其转发到下一个 ChannelInboundHandler . 需要调用retain(),因为在 channelRead() 完成后,它将调用 FullHttpRequest 上的 release() 来释放其资源。(请参阅第6章中对SimpleChannelInboundHandler的讨论。)

如果客户端发送HTTP 1.1标头Expect:100-continue,则HttpRequestHandler发送100 Continue响应。 在设置标头后,HttpRequestHandler将HttpResponse d写回客户端。 这不是FullHttpResponse,因为它只是响应的第一部分。 此外,此处不调 writeAndFlush()。
这是在最后完成的。

如果既不需要加密也不需要压缩,则可以通过将index.html e的内容存储在DefaultFileRegion中来实现最高效率。 这将利用零拷贝来执行传输。 因此,您需要检查ChannelPipeline中是否存在SslHandler。 或者,您使用ChunkedNioFile。

HttpRequestHandler写一个LastHttpContent来标记响应的结束。 如果未请求keepalive,则HttpRequestHandler将ChannelFutureListener添加到上次写入的 ChannelFuture 并关闭连接。 这是您调用 writeAndFlush() 来刷新所有以前写入的消息的地方。

这代表聊天服务器的第一部分,它管理纯HTTP请求和响应。 接下来我们将处理WebSocket帧,它们传输实际的聊天消息。

WebSocket Frames WebSockets以帧的形式传输数据,每个帧都代表消息的一部分。 完整的消息可能包含许多帧。

12.3.2 Handling WebSocket frames

由IETF发布的WebSocket RFC定义了六个帧; Netty为每个人提供POJO实施。 表12.1列出了帧类型并描述了它们的用法。

BinaryWebSocketFrame
包含 binary data
TextWebSocketFrame
包含 text data
ContinuationWebSocketFrame
包含 text 或者 binary 数据,他属于前一个 BinaryWebSocketFrame 或者 TextWebSocketFrame
CloseWebSocketFrame
表示CLOSE请求,包含关闭状态代码和短语
PingWebSocketFrame
请求传输PongWebSocketFrame
PongWebSocketFrame
作为响应发送给一个 PingWebSocketFrame

Netty websocket

我们的聊天程序将会使用下面这些 frame 类型:

  • CloseWebSocketFrame
  • PingWebSocketFrame
  • PongWebSocketFrame
  • TextWebSocketFrame

TextWebSocketFrame是我们实际需要处理的唯一一个。 根据WebSocket RFC,Netty提供了一个WebSocketServerProtocolHandler来管理其他的。

下面的清单显示了TextWebSocketFrames的ChannelInboundHandler,它还将跟踪其ChannelGroup中的所有活动WebSocket连接。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1// Extends SimpleChannelInboundHandler and handle TextWebSocketFramemessages
2pyblic class TextWebSocketFrameHandler
3    extends SimpleChannelInboundHandler<TextWebSocketFrame> {
4    private final ChannelGroup group;
5  
6    public TextWebSocketFrameHandler(ChannelGroup group) {
7        this.group = group;
8    }
9
10    // Overrides userEventTriggered() to handle custome events
11    @Override
12    public void userEventTriggered(ChannelHandlerContext ctx,
13        Object evt) throws Exception {
14        if(evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE)    
15        {
16        ctx.pipeline().remove(HttpRequestHandler.class);
17        // Notifies all connected WebSocket clients that new Client has connected
18        group.writeAndFlush(new TextWebSocketFrame(
19            "Client" + ctx.channel() + " joined"));
20        group.add(ctx.channel());
21        } else {
22            super.userEventTriggered(ctx, evt);
23        }
24    }
25
26    @Override
27    public void channelRead0(ChannelHandlerContext ctx,
28        TextWebSocketFrame msg) throws Exception {
29        // Increments the reference count of the message and writes it to all connected clients in the ChannelGroup
30        group.writeAndFlush(msg.retain());
31    }
32}          
33            
34
35

TextWebSocketFrameHandler只有很少的职责。 当与新客户端的WebSocket握手成功完成时,它通过写入ChannelGroup中的所有Channel来通知所有连接的客户端,然后将新Channel添加到ChannelGroup.

如果收到TextWebSocketFrame,它会调用retain()并使用writeAndFlush()将其传输到ChannelGroup,以便所有连接的WebSocket Channel都接收它。

和以前一样,调用 retain() 是必需的,因为当 channelRead0() 返回时TextWebSocketFrame 的引用计数将减少。 由于所有操作都是异步的,因此writeAndFlush() 可能会在以后完成,并且它不能访问已变为无效的引用。

由于Netty在内部处理大部分剩余功能,因此现在唯一要做的就是为每个创建的新Channel初始化ChannelPipeline。 为此,我们需要一个ChannelInitializer。

12.3.3 Initializing the ChannelPipeline

如您所知,要在 ChannelPipeline 中安装 ChannelHandler,您需要扩展 ChannelInitializer并实现 initChannel() 。 以下清单显示了生成的ChatServerInitializer的代码。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1// Extends ChannelInitializer
2public class ChatServerIntializer extends ChannelIntializer<Channel> {
3    private final ChannelGroup group;
4
5    public ChatServerIntializer(ChannelGroup group) {
6        this.group = group;
7    }
8
9    // Adds all needed ChannelHandlers to the ChannelPipeline
10    @Override
11    protected void initChannel(Channel ch) throws Exception {
12        ChannelPipeline pipeline = ch.pipeline();
13        pipeline.addLast(new HttpServerCodec());
14        pipeline.addLast(new ChunkedWriteHandler());
15        pipeline.addLast(new HttpObjectAggregator(64 * 1024));
16        pipeline.addLast(new HttpRequestHandler("/ws"));
17        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
18        pipeline.addLast(new TextWebSocketFrameHandler(group));
19    }
20}            
21
22

对 initChannel() 的调用通过安装所有必需的 ChannelHandler 来设置新注册 Channel 的ChannelPipeline。 表12.2总结了这些内容及其各自的职责。

HttpServerCodec
将字节解码为HttpRequest,HttpContent,和LastHttpContent。 编码HttpRequest,HttpContent和LastHttpContent到字节。
ChunkedWriteHandler
写入文件的内容。
HttpObjectAggregator
将HttpMessage及其后续HttpContent聚合到单个FullHttpRequest或FullHttpResponse中(取决于它是否用于处理请求或响应)。 安装此选项后,管道中的下一个ChannelHandler将仅接收完整的HTTP请求。
HttpRequestHandler
处理FullHttpRequest(未发送到 /ws URI)。
WebSocketServerProtocolHandler
根据WebSocket规范的要求,处理WebSocket升级握手,PingWebSocketFrames,PongWebSocketFrames和CloseWebSocketFrames。
TextWebSocketFrameHandler
处理TextWebSocketFrames和握手完成事件

Netty websocket

Netty websocket
Netty 的 WebSocketServerProtocolHandler 处理所有强制 WebSocket 帧类型和升级握手本身。 如果握手成功,则将所需的ChannelHandler添加到管道中,并删除不再需要的那些。

Netty websocket

升级前的管道状态如图12.3所示。 这表示ChatServerInitializer初始化后的ChannelPipeline。

升级完成后,WebSocketServerProtocolHandler将替换带有WebSocketFrameDecoder的HttpRequestDecoder和带有WebSocketFrameEncoder的HttpResponseEncoder。 为了最大限度地提高性能,它将删除WebSocket连接不需要的任何ChannelHandler。 这些将包括图12.3中所示的HttpObjectAggregator和HttpRequestHandler。

图12.4显示了这些操作完成后的ChannelPipeline。 请注意,Netty目前支持四种版本的WebSocket协议,每种版本都有自己的实现类。 根据客户端(此处为浏览器)支持的内容,自动执行正确版本的WebSocketFrameDecoder和WebSocketFrameEncoder的选择。

Netty websocket

12.3.4 Bootstrapping

图片的最后一部分是引导服务器并安装ChatServerInitializer的代码。 这将由ChatServer类处理,如此处所示。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
1public class ChatServer {
2    // Creates DefaultChannelGroup that will hold all connected WebSocket channels
3    private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
4    private final EventLoopGroup group = new NioEventLoopGroup();
5    private Channel channel;
6
7    public ChannelFuture start(InetSocketAddress address) {
8        // Bootstraps the server
9        ServerBootstrap bootstrap = new ServerBootstrap();
10        bootstrap.group(group)
11            .channel(NioServerSocketChannel.class)
12            .childHandler(createInitializer(channelGroup));
13        ChannelFuture future = bootstrap.bind(address);
14        future.syncUninterruptibly();
15        channel = future.channel();
16        return future;
17   }
18
19
20   // Creates the ChatServerInitializer
21   protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
22       return new ChatServerInitializer(group);
23   }
24
25   // Handles server shutdown and releases all resources
26   public void destroy() {
27       if(channel != null) {
28           channel.close();
29       }
30       channelGroup.close();
31       group.shutdownGracefully();
32   }
33  
34   public static void main(String[] args) throws Exception {
35       if (args.length != 1) {
36           System.err.println("Please give port as argument.");
37           System.exit(1);
38       }
39       int port = Integer.praseInt(args[0]);
40       final ChatServer endpoint = new ChatServer();
41       ChannelFuture future = endpoint.start( new InetSocketAddress(port));
42       Runtime.getRuntime().addShutdownHook(new Thread() {
43           @Override
44           public void run() {
45               endpoint.destroy();
46           }
47       });
48       future.channel().closeFuture().syncUninterruptibly();
49   }
50 }                                  
51
52

这完成了应用程序本身。 现在让我们来测试吧。

12.4 Testing the application

chapter12目录中的示例代码包含构建和运行服务器所需的一切。 (如果您尚未设置包括Apache Maven在内的开发环境,请参阅第2章中的说明。)

我们将使用以下Maven命令来构建和启动服务器:


1
2
3
1mvn -PChatServer clean package exec:exec
2
3

项目文件pom.xml配置为在端口9999上启动服务器。要使用其他端口,您可以编辑文件中的值或使用System属性覆盖它:


1
2
3
1mvn -PChatServer -Dport=1111 clean package exec:exec
2
3

以下清单显示了命令的主要输出(已删除非必要行)。

Netty websocket

Netty websocket

您可以通过将浏览器指向http:// localhost:9999来访问该应用程序。 图12.5显示了Chrome浏览器中的UI。

该图显示了两个连接的客户端。 第一个是使用顶部的界面连接。 第二个客户端通过底部的Chrome浏览器命令行连接。 您会注意到两个客户端都发送了消息,并且每条消息都显示在两个客户端上。

这是一个非常简单的演示,说明WebSocket如何在浏览器中实现实时通信。

12.4.1 What about encryption?

在现实生活中,您很快就会被要求为此服务器添加加密。 使用Netty,只需将SslHandler添加到ChannelPipeline并进行配置即可。 以下清单显示了如何通过扩展ChatServerInitializer来创建SecureChatServerInitializer来完成此操作。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1// Adding encryption to the ChannelPipeline
2// Extends ChatServerInitializer to add encryption
3public class SecureChatServerInitializer extends ChatServerInitializer {
4    private final SslContext context;
5
6    public SecureChatServerInitializer(ChannelGroup group,
7        SslContext context) {
8        super(group);
9        this.context = context;
10   }
11
12   @Override
13   protected void initChannel(Channel ch) throws Exception {
14       // Calls the parent's initChannel()
15       super.initChannel(ch);
16       SSLEngine engine = context.newEngine(ch.alloc());
17       // Adds the SslHandler to the ChannelPipeline
18       ch.pipeline().addFirst(new SslHandler(engine));  
19  }
20}    
21
22

最后一步是调整ChatServer以使用SecureChatServerInitializer,以便在管道中安装SslHandler。 这给了我们这里显示的SecureChatServer。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
1// Adding encryption to the ChatServer
2// SecureChatServer extends ChatServer to support encryption
3public class SecureChatServer extends ChatServer {
4    private final SslContext context;
5
6    public SecureChatServer(SslContext context) {
7        this.context = context;
8    }
9
10    @Override
11    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
12        // Returns the previously created SecureChatServerInitializer to enable encryption
13        return new SecureChatServerInitializer(group, context);
14   }
15
16   public static void main(String[] args) throws Exception {
17       if(args.length != 1) {
18           System.err.println("Please give port as argument");
19           System.exit(1);
20       }
21       int port = Integer.parseInt(args[0]);
22       SelfSignedCertificate cert = new SelfSignedCertificate();
23       SslContext context = SslContext.newServerContext(
24       cert.certificate(), cert.privateKey());
25
26       final SecureChatServer endpoint = new SecureChatServer(context);
27       ChannelFuture future = endpoint.start(new InetSocketAddress(port));
28       Runtime.getRuntime().addShutdownHook(new Thread() {
29           @Override
30           public void run() {
31               endpoint.destroy();
32           }
33       });
34       future.channel().closeFuture().syncUninterruptibly();
35    }
36 }                                  
37
38

这就是为所有通信启用SSL / TLS加密所需的全部内容。 和以前一样,您可以使用Apache Maven来运行应用程序。 它还将检索任何所需的依赖项。

Netty websocket
现在,您可以从其HTTPS URL访问SecureChatServer:https://localhost:9999。

12.5 Summary

在本章中,您学习了如何使用Netty的WebSocket实现来管理Web应用程序中的实时数据。 我们介绍了支持的数据类型,并讨论了您可能遇到的限制。 虽然在所有情况下都可能无法使用WebSocket,但应该清楚它代表了一个重要的进步网络技术。

给TA打赏
共{{data.count}}人
人已打赏
安全技术

Bootstrap 间隔 (Spacing)

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索