Network protocols
WebSocket是一种高级网络协议,旨在提高Web应用程序的性能和响应能力。 我们将通过编写示例应用程序来探索Netty对它们的支持。
在第12章中,您将学习如何使用WebSocket实现双向数据传输,方法是构建一个聊天室服务器,其中多个浏览器客户端可以实时通信。 您还将看到如何通过检测客户端是否支持它,从应用程序中的HTTP切换到WebSocket协议。
我们将在第13章中总结第3部分,研究Netty对用户数据报协议(UDP)的支持。在这里,您将构建一个广播服务器和监视器客户端,可以适应许多实际用途。
本章讲介绍:
- real-time web 的概念
- WebSocket 协议
- 使用Netty 创建一个基于 WebSocket 的聊天室服务端程序
如果您关注网络技术的最新发展,您很可能会遇到实时网络短语,如果您有工程领域的实时应用程序经验,您可能会对这个术语的含义持怀疑态度。
因此,我们首先要澄清的是,这不是所谓的硬实时服务质量(QoS),其中保证了在指定时间间隔内交付计算结果。 仅仅HTTP的请求/响应设计使得这个问题非常严重,因为过去设计的方法都没有提供令人满意的解决方案。
虽然已经有一些关于正式定义定时Web服务语义的学术讨论,但普遍接受的定义似乎并未出现。 所以现在我们将接受来自维基百科的以下非权威性描述:
The real-time web is a network web using technologies and practices that
enable users to receive information as soon as it is published by its
authors, rather than requiring that they or their software check a source
periodically for updates.
实时网络是一种使用技术和实践的网络网络,使用户能够在作者发布信息后立即接收信息,而不是要求他们或他们的软件定期检查信息源以进行更新。
简而言之,一个成熟的实时网络可能不会即将到来,但其背后的想法正在推动对几乎即时访问信息的不断增长的期望。 我们将在本章中讨论的WebSocket协议是朝着这个方向的良好支持的步骤。
12.1 Introducing WebSocket
WebSocket协议是从头开始设计的,旨在为Web上的双向数据传输问题提供实用的解决方案,允许客户端和服务器随时传输消息,从而要求它们异步处理消息接收。 (最新的浏览器支持WebSocket作为HTML5的客户端API。)
Netty对WebSocket的支持包括所有正在使用的主要实现,因此在您的下一个应用程序中采用它非常简单。 与Netty一样,您可以完全使用协议,而无需担心其内部实现细节。 我们将通过创建基于WebSocket的实时聊天应用程序来证明这一点。
12.2 Our example WebSocket application
我们的示例应用程序将通过使用WebSocket协议实现基于浏览器的聊天应用程序来演示实时功能,例如您可能在Facebook的文本消息功能中遇到过。 我们将通过允许多个用户同时相互通信来进一步发展。
图12.1说明了应用程序逻辑:
- 一个客户端发送一条消息。
- 这条消息广播到所有已经建立连接的其他客户端。
这就是您期望聊天室工作的方式:每个人都可以与其他人交谈。 在我们的示例中,我们将仅实现服务器端,客户端是通过网页访问聊天室的浏览器。 正如您将在接下来的几页中看到的那样,WebSocket使编写此服务器变得简单。
12.3 Adding WebSocket support
称为升级握手的机制用于从标准HTTP或HTTPS协议切换到WebSocket。 因此,使用WebSocket的应用程序将始终以HTTP / S开头,然后执行升级。 当恰好发生这种情况时,应用程序是特定的; 它可能是在启动时或者在请求特定URL时。
我们的应用程序采用以下约定:如果请求的URL以/ ws结尾,我们将协议升级到WebSocket。 否则,服务器将使用基本HTTP / S. 连接升级后,所有数据都将使用WebSocket传输。 图12.2说明了服务器逻辑,它一如Netty,将由一组ChannelHandler实现。 在我们解释用于处理HTTP和WebSocket协议的技术时,我们将在下一节中对它们进行描述。
12.3.1 Handling HTTP requests
首先,我们将实现处理HTTP请求的组件。 此组件将提供访问聊天室的页面,并显示已连接客户端发送的消息。 代码清单12.1包含了这个HttpRequestHandler的代码,它为SimpleHttpRequest消息扩展了SimpleChannelInboundHandler。 请注意channelRead0() 的实现如何转发URI / ws的任何请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 1// Extends SimpleChannelInboundHandler to handle FullHttpRequest messages
2public class HttpRequestHandler
3 extends SimpleChannelInboundHandler<FullHttpRequest> {
4 private final String wsUri;
5 private static final File INDEX;
6
7 static {
8 URL location = HttpRequestHandler.class
9 .getProtectionDomain()
10 .getCodeSource().getLocation();
11 try {
12 String path = location.toURI() + "index.html";
13 path = !path.contains("file:") ? path : path.substring(5);
14 INDEX = new File(path);
15 } catch (URISyntaxException e) {
16 throw new IllegalStateException("Unable to locate index.html", e);
17 }
18 }
19
20 public HttpRequestHandler(String wsUri) {
21 this.wsUri = wsUri;
22 }
23
24 @Override
25 public void channelRead0(ChannelHandlerContext ctx,
26 FullHttpRequest request) throws Exception {
27 // If a WebSocket upgrade is requested, increments the reference count(retain) and passes it to the next ChannelInboundHandler
28 if(wsUri.equalsIgnoreCase(request.getUri())) {
29 ctx.fireChannelRead(request.retain());
30 } else {
31 // Handlers 100 Continue requests in conformity with HTTP 1.1
32 if(HttpHeaders.is100ContinueExpected(request)) {
33 send100Continue(ctx);
34 }
35 // Reads index.html
36 RandomAccessFile file = new RandomAccessFile(INDEX, "r");
37 HttpResponse response = new DefaultHttpResponse(
38 request.getProtocolVersion(), HttpResponseStatus.OK);
39 response.headers().set(
40 HttpHeaders.Names.CONTENT_TYPE,
41 "text/plain; charset=UTF-8");
42 boolean keepAlive = HttpHeaders.isKeepAlive(request);
43 // If keepalive is requested, adds the required headers
44 if(keepAlive) {
45 response.headers().set(
46 HttpHeaders.Names.CONTENT_LENGTH, file.length());
47 response.headers().set(HttpHeaders.Names.CONNECTION,
48 HttpHeaders.Values.KEEP_ALIVE);
49 }
50
51 // Writes the HttpResponse to the client
52 ctx.write(response);
53 // Writes index.html to the client
54 if (ctx.pipeline().get(SslHandler.class) == null) {
55 ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
56 } else {
57 ctx.write(new ChunkedNioFile(file.getChannel()));
58 }
59 // Writes and flushes the LastHttpContent to the client
60 ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
61 if(!keepAlive) {
62 future.addListener(ChannelFutureListener.CLOSE);
63 }
64 }
65 }
66
67 private static void send100Continue(ChannelHandlerContext ctx) {
68 FullHttpResponse response = new DefaultFullHttpResponse{
69 HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
70 ctx.writeAndFlush(response);
71 }
72
73 @Override
74 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
75 throws Exception {
76 cause.printStackTrace();
77 ctx.close();
78 }
79}
80
81
82
如果HTTP请求引用URI / ws,则HttpRequestHandler在FullHttpRequest上调用 retain() 并通过调用 fireChannelRead(msg) 将其转发到下一个 ChannelInboundHandler . 需要调用retain(),因为在 channelRead() 完成后,它将调用 FullHttpRequest 上的 release() 来释放其资源。(请参阅第6章中对SimpleChannelInboundHandler的讨论。)
如果客户端发送HTTP 1.1标头Expect:100-continue,则HttpRequestHandler发送100 Continue响应。 在设置标头后,HttpRequestHandler将HttpResponse d写回客户端。 这不是FullHttpResponse,因为它只是响应的第一部分。 此外,此处不调 writeAndFlush()。
这是在最后完成的。
如果既不需要加密也不需要压缩,则可以通过将index.html e的内容存储在DefaultFileRegion中来实现最高效率。 这将利用零拷贝来执行传输。 因此,您需要检查ChannelPipeline中是否存在SslHandler。 或者,您使用ChunkedNioFile。
HttpRequestHandler写一个LastHttpContent来标记响应的结束。 如果未请求keepalive,则HttpRequestHandler将ChannelFutureListener添加到上次写入的 ChannelFuture 并关闭连接。 这是您调用 writeAndFlush() 来刷新所有以前写入的消息的地方。
这代表聊天服务器的第一部分,它管理纯HTTP请求和响应。 接下来我们将处理WebSocket帧,它们传输实际的聊天消息。
WebSocket Frames WebSockets以帧的形式传输数据,每个帧都代表消息的一部分。 完整的消息可能包含许多帧。
12.3.2 Handling WebSocket frames
由IETF发布的WebSocket RFC定义了六个帧; Netty为每个人提供POJO实施。 表12.1列出了帧类型并描述了它们的用法。
BinaryWebSocketFrame
包含 binary data
TextWebSocketFrame
包含 text data
ContinuationWebSocketFrame
包含 text 或者 binary 数据,他属于前一个 BinaryWebSocketFrame 或者 TextWebSocketFrame
CloseWebSocketFrame
表示CLOSE请求,包含关闭状态代码和短语
PingWebSocketFrame
请求传输PongWebSocketFrame
PongWebSocketFrame
作为响应发送给一个 PingWebSocketFrame
我们的聊天程序将会使用下面这些 frame 类型:
- CloseWebSocketFrame
- PingWebSocketFrame
- PongWebSocketFrame
- TextWebSocketFrame
TextWebSocketFrame是我们实际需要处理的唯一一个。 根据WebSocket RFC,Netty提供了一个WebSocketServerProtocolHandler来管理其他的。
下面的清单显示了TextWebSocketFrames的ChannelInboundHandler,它还将跟踪其ChannelGroup中的所有活动WebSocket连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 1// Extends SimpleChannelInboundHandler and handle TextWebSocketFramemessages
2pyblic class TextWebSocketFrameHandler
3 extends SimpleChannelInboundHandler<TextWebSocketFrame> {
4 private final ChannelGroup group;
5
6 public TextWebSocketFrameHandler(ChannelGroup group) {
7 this.group = group;
8 }
9
10 // Overrides userEventTriggered() to handle custome events
11 @Override
12 public void userEventTriggered(ChannelHandlerContext ctx,
13 Object evt) throws Exception {
14 if(evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE)
15 {
16 ctx.pipeline().remove(HttpRequestHandler.class);
17 // Notifies all connected WebSocket clients that new Client has connected
18 group.writeAndFlush(new TextWebSocketFrame(
19 "Client" + ctx.channel() + " joined"));
20 group.add(ctx.channel());
21 } else {
22 super.userEventTriggered(ctx, evt);
23 }
24 }
25
26 @Override
27 public void channelRead0(ChannelHandlerContext ctx,
28 TextWebSocketFrame msg) throws Exception {
29 // Increments the reference count of the message and writes it to all connected clients in the ChannelGroup
30 group.writeAndFlush(msg.retain());
31 }
32}
33
34
35
TextWebSocketFrameHandler只有很少的职责。 当与新客户端的WebSocket握手成功完成时,它通过写入ChannelGroup中的所有Channel来通知所有连接的客户端,然后将新Channel添加到ChannelGroup.
如果收到TextWebSocketFrame,它会调用retain()并使用writeAndFlush()将其传输到ChannelGroup,以便所有连接的WebSocket Channel都接收它。
和以前一样,调用 retain() 是必需的,因为当 channelRead0() 返回时TextWebSocketFrame 的引用计数将减少。 由于所有操作都是异步的,因此writeAndFlush() 可能会在以后完成,并且它不能访问已变为无效的引用。
由于Netty在内部处理大部分剩余功能,因此现在唯一要做的就是为每个创建的新Channel初始化ChannelPipeline。 为此,我们需要一个ChannelInitializer。
12.3.3 Initializing the ChannelPipeline
如您所知,要在 ChannelPipeline 中安装 ChannelHandler,您需要扩展 ChannelInitializer并实现 initChannel() 。 以下清单显示了生成的ChatServerInitializer的代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 1// Extends ChannelInitializer
2public class ChatServerIntializer extends ChannelIntializer<Channel> {
3 private final ChannelGroup group;
4
5 public ChatServerIntializer(ChannelGroup group) {
6 this.group = group;
7 }
8
9 // Adds all needed ChannelHandlers to the ChannelPipeline
10 @Override
11 protected void initChannel(Channel ch) throws Exception {
12 ChannelPipeline pipeline = ch.pipeline();
13 pipeline.addLast(new HttpServerCodec());
14 pipeline.addLast(new ChunkedWriteHandler());
15 pipeline.addLast(new HttpObjectAggregator(64 * 1024));
16 pipeline.addLast(new HttpRequestHandler("/ws"));
17 pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
18 pipeline.addLast(new TextWebSocketFrameHandler(group));
19 }
20}
21
22
对 initChannel() 的调用通过安装所有必需的 ChannelHandler 来设置新注册 Channel 的ChannelPipeline。 表12.2总结了这些内容及其各自的职责。
HttpServerCodec
将字节解码为HttpRequest,HttpContent,和LastHttpContent。 编码HttpRequest,HttpContent和LastHttpContent到字节。
ChunkedWriteHandler
写入文件的内容。
HttpObjectAggregator
将HttpMessage及其后续HttpContent聚合到单个FullHttpRequest或FullHttpResponse中(取决于它是否用于处理请求或响应)。 安装此选项后,管道中的下一个ChannelHandler将仅接收完整的HTTP请求。
HttpRequestHandler
处理FullHttpRequest(未发送到 /ws URI)。
WebSocketServerProtocolHandler
根据WebSocket规范的要求,处理WebSocket升级握手,PingWebSocketFrames,PongWebSocketFrames和CloseWebSocketFrames。
TextWebSocketFrameHandler
处理TextWebSocketFrames和握手完成事件
Netty 的 WebSocketServerProtocolHandler 处理所有强制 WebSocket 帧类型和升级握手本身。 如果握手成功,则将所需的ChannelHandler添加到管道中,并删除不再需要的那些。
升级前的管道状态如图12.3所示。 这表示ChatServerInitializer初始化后的ChannelPipeline。
升级完成后,WebSocketServerProtocolHandler将替换带有WebSocketFrameDecoder的HttpRequestDecoder和带有WebSocketFrameEncoder的HttpResponseEncoder。 为了最大限度地提高性能,它将删除WebSocket连接不需要的任何ChannelHandler。 这些将包括图12.3中所示的HttpObjectAggregator和HttpRequestHandler。
图12.4显示了这些操作完成后的ChannelPipeline。 请注意,Netty目前支持四种版本的WebSocket协议,每种版本都有自己的实现类。 根据客户端(此处为浏览器)支持的内容,自动执行正确版本的WebSocketFrameDecoder和WebSocketFrameEncoder的选择。
12.3.4 Bootstrapping
图片的最后一部分是引导服务器并安装ChatServerInitializer的代码。 这将由ChatServer类处理,如此处所示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 1public class ChatServer {
2 // Creates DefaultChannelGroup that will hold all connected WebSocket channels
3 private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
4 private final EventLoopGroup group = new NioEventLoopGroup();
5 private Channel channel;
6
7 public ChannelFuture start(InetSocketAddress address) {
8 // Bootstraps the server
9 ServerBootstrap bootstrap = new ServerBootstrap();
10 bootstrap.group(group)
11 .channel(NioServerSocketChannel.class)
12 .childHandler(createInitializer(channelGroup));
13 ChannelFuture future = bootstrap.bind(address);
14 future.syncUninterruptibly();
15 channel = future.channel();
16 return future;
17 }
18
19
20 // Creates the ChatServerInitializer
21 protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
22 return new ChatServerInitializer(group);
23 }
24
25 // Handles server shutdown and releases all resources
26 public void destroy() {
27 if(channel != null) {
28 channel.close();
29 }
30 channelGroup.close();
31 group.shutdownGracefully();
32 }
33
34 public static void main(String[] args) throws Exception {
35 if (args.length != 1) {
36 System.err.println("Please give port as argument.");
37 System.exit(1);
38 }
39 int port = Integer.praseInt(args[0]);
40 final ChatServer endpoint = new ChatServer();
41 ChannelFuture future = endpoint.start( new InetSocketAddress(port));
42 Runtime.getRuntime().addShutdownHook(new Thread() {
43 @Override
44 public void run() {
45 endpoint.destroy();
46 }
47 });
48 future.channel().closeFuture().syncUninterruptibly();
49 }
50 }
51
52
这完成了应用程序本身。 现在让我们来测试吧。
12.4 Testing the application
chapter12目录中的示例代码包含构建和运行服务器所需的一切。 (如果您尚未设置包括Apache Maven在内的开发环境,请参阅第2章中的说明。)
我们将使用以下Maven命令来构建和启动服务器:
1
2
3 1mvn -PChatServer clean package exec:exec
2
3
项目文件pom.xml配置为在端口9999上启动服务器。要使用其他端口,您可以编辑文件中的值或使用System属性覆盖它:
1
2
3 1mvn -PChatServer -Dport=1111 clean package exec:exec
2
3
以下清单显示了命令的主要输出(已删除非必要行)。
您可以通过将浏览器指向http:// localhost:9999来访问该应用程序。 图12.5显示了Chrome浏览器中的UI。
该图显示了两个连接的客户端。 第一个是使用顶部的界面连接。 第二个客户端通过底部的Chrome浏览器命令行连接。 您会注意到两个客户端都发送了消息,并且每条消息都显示在两个客户端上。
这是一个非常简单的演示,说明WebSocket如何在浏览器中实现实时通信。
12.4.1 What about encryption?
在现实生活中,您很快就会被要求为此服务器添加加密。 使用Netty,只需将SslHandler添加到ChannelPipeline并进行配置即可。 以下清单显示了如何通过扩展ChatServerInitializer来创建SecureChatServerInitializer来完成此操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 1// Adding encryption to the ChannelPipeline
2// Extends ChatServerInitializer to add encryption
3public class SecureChatServerInitializer extends ChatServerInitializer {
4 private final SslContext context;
5
6 public SecureChatServerInitializer(ChannelGroup group,
7 SslContext context) {
8 super(group);
9 this.context = context;
10 }
11
12 @Override
13 protected void initChannel(Channel ch) throws Exception {
14 // Calls the parent's initChannel()
15 super.initChannel(ch);
16 SSLEngine engine = context.newEngine(ch.alloc());
17 // Adds the SslHandler to the ChannelPipeline
18 ch.pipeline().addFirst(new SslHandler(engine));
19 }
20}
21
22
最后一步是调整ChatServer以使用SecureChatServerInitializer,以便在管道中安装SslHandler。 这给了我们这里显示的SecureChatServer。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 1// Adding encryption to the ChatServer
2// SecureChatServer extends ChatServer to support encryption
3public class SecureChatServer extends ChatServer {
4 private final SslContext context;
5
6 public SecureChatServer(SslContext context) {
7 this.context = context;
8 }
9
10 @Override
11 protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
12 // Returns the previously created SecureChatServerInitializer to enable encryption
13 return new SecureChatServerInitializer(group, context);
14 }
15
16 public static void main(String[] args) throws Exception {
17 if(args.length != 1) {
18 System.err.println("Please give port as argument");
19 System.exit(1);
20 }
21 int port = Integer.parseInt(args[0]);
22 SelfSignedCertificate cert = new SelfSignedCertificate();
23 SslContext context = SslContext.newServerContext(
24 cert.certificate(), cert.privateKey());
25
26 final SecureChatServer endpoint = new SecureChatServer(context);
27 ChannelFuture future = endpoint.start(new InetSocketAddress(port));
28 Runtime.getRuntime().addShutdownHook(new Thread() {
29 @Override
30 public void run() {
31 endpoint.destroy();
32 }
33 });
34 future.channel().closeFuture().syncUninterruptibly();
35 }
36 }
37
38
这就是为所有通信启用SSL / TLS加密所需的全部内容。 和以前一样,您可以使用Apache Maven来运行应用程序。 它还将检索任何所需的依赖项。
现在,您可以从其HTTPS URL访问SecureChatServer:https://localhost:9999。
12.5 Summary
在本章中,您学习了如何使用Netty的WebSocket实现来管理Web应用程序中的实时数据。 我们介绍了支持的数据类型,并讨论了您可能遇到的限制。 虽然在所有情况下都可能无法使用WebSocket,但应该清楚它代表了一个重要的进步网络技术。