Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 是一个基于 Java NIO 的开发框架,主要针对在 TCP 协议下,面向 Client 端的高并发应用,或者 Peer-to-Peer 场景下的大量数据持续传输的应用。
Netty 的特点:非阻塞、基于事件驱动、高性能、高可靠性和高可定制性。
Java 支持 3 种网络编程模型 I/O 模式:
(1) BIO (Blocking Input/Output): 同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销;
(2) NIO (Non-blocking Input/Output): 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有 I/O 请求就进行处理;
(3) AIO (Asynchronous Input/Output): 异步非阻塞,AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。
各 I/O 模式的适用场景:
(1) BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,但程序简单易理解;
(2) NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4 开始支持;
(3) AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。
Netty: http://netty.io/
Netty GitHub: https://github.com/netty/netty
1. 开发环境
Windows版本:Windows 10 Home (20H2)
IntelliJ IDEA (https://www.jetbrains.com/idea/download/):Community Edition for Windows 2020.1.4
Apache Maven (https://maven.apache.org/):3.8.1
Redis for Windows:5.0.14
注:Spring 开发环境的搭建,可以参考 “ Spring基础知识(1)- Spring简介、Spring体系结构和开发环境配置 ”。
2. 创建 Spring Boot 基础项目
项目实例名称:SpringbootExample20
Spring Boot 版本:2.6.6
创建步骤:
(1) 创建 Maven 项目实例 SpringbootExample20;
(2) Spring Boot Web 配置;
(3) 导入 Thymeleaf 依赖包;
(4) 配置 jQuery;
具体操作请参考 “Spring 系列 (2) - 在 Spring Boot 项目里使用 Thymeleaf、JQuery+Bootstrap 和国际化” 里的项目实例 SpringbootExample02,文末包含如何使用 spring-boot-maven-plugin 插件运行打包的内容。
SpringbootExample20 和 SpringbootExample02 相比,SpringbootExample20 不配置 Bootstrap、模版文件(templates/*.html)和国际化。
3. 配置 Netty
1) 导入 Netty 依赖包
访问 http://www.mvnrepository.com/,查询 netty,修改 pom.xml
<project ... > ... <dependencies> ... <!-- Netty --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.89.Final</version> </dependency> ... </dependencies> ... </project>
在IDE中项目列表 -> SpringbootExample20 -> 点击鼠标右键 -> Maven -> Reload Project
2) 修改 src/main/resources/application.properties 文件,内容如下
spring.main.banner-mode=off # Web server server.display-name=SpringbootExample20-Test server.address=localhost server.port=9090 # Netty server netty.websocket.ip=localhost netty.websocket.port=9999 netty.websocket.max-frame-size=10240 netty.websocket.path=/ws
3) 创建 src/main/java/com/example/netty/NettyBootsrapRunner.java 文件
package com.example.netty; import java.net.InetSocketAddress; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextClosedEvent; import org.springframework.stereotype.Component; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.stream.ChunkedWriteHandler; @Component public class NettyBootsrapRunner implements ApplicationRunner, ApplicationListener<ContextClosedEvent>, ApplicationContextAware { @Value("${netty.websocket.port}") private int port; @Value("${netty.websocket.ip}") private String ip; @Value("${netty.websocket.path}") private String path; @Value("${netty.websocket.max-frame-size}") private long maxFrameSize; private ApplicationContext applicationContext; private Channel serverChannel; public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } public void run(ApplicationArguments args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.localAddress(new InetSocketAddress(this.ip, this.port)); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof FullHttpRequest) { FullHttpRequest fullHttpRequest = (FullHttpRequest) msg; String uri = fullHttpRequest.uri(); if (!uri.equals(path)) { // Not websocket uri, return 404 ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND)) .addListener(ChannelFutureListener.CLOSE); return ; } } super.channelRead(ctx, msg); } }); pipeline.addLast(new WebSocketServerCompressionHandler()); pipeline.addLast(new WebSocketServerProtocolHandler(path, null, true, maxFrameSize)); // Get handler from IOC pipeline.addLast(applicationContext.getBean(WebsocketMessageHandler.class)); } }); System.out.println("NettyBootsrapRunner -> run(): (" + this.ip + ":" + this.port + ")"); Channel channel = serverBootstrap.bind().sync().channel(); this.serverChannel = channel; channel.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public void onApplicationEvent(ContextClosedEvent event) { if (this.serverChannel != null) { this.serverChannel.close(); } System.out.println("NettyBootsrapRunner -> onApplicationEvent(): stop websocket"); } }
4) 创建 src/main/java/com/example/netty/WebsocketMessageHandler.java 文件
package com.example.netty; import org.springframework.stereotype.Component; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.netty.handler.codec.http.websocketx.WebSocketFrame; @Sharable @Component public class WebsocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception { if (msg instanceof TextWebSocketFrame) { TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg; // Reply to client ctx.channel().writeAndFlush(new TextWebSocketFrame("Received your message -> " + textWebSocketFrame.text())); } else { // Invalid message type ctx.channel().writeAndFlush(WebSocketCloseStatus.INVALID_MESSAGE_TYPE).addListener(ChannelFutureListener.CLOSE); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); System.out.println("WebsocketMessageHandler -> channelInactive(): " + ctx.channel().remoteAddress()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); System.out.println("WebsocketMessageHandler -> channelActive(): " + ctx.channel().remoteAddress()); } }
5) 运行
Edit Configurations
Click "+" add new configuration -> Select "Maven"
Command line: clean spring-boot:run
Name: SpringbootExample20 [clean,spring-boot:run]
-> Apply / OK
Click Run "SpringbootExample02 [clean,spring-boot:run]"
...
Spring boot example project
访问 http://localhost:9090/test
Test Page
注:打包可以将 Command line 改成 clean package spring-boot:repackage
4. 测试实例 (Web 模式)
1) 创建 src/main/resources/templates/client.html 文件
<html lang="en" xmlns:th="http://www.thymeleaf.org"> <head> <meta charset="UTF-8"> <title th:text="${var}">Client</title> <script language="javascript" th:src="@{/lib/jquery/jquery-3.6.0.min.js}"></script> </head> <body> <h4>Netty WebSocket - Client</h4> <p> </p> <p> <label><strong>WebSocket url:</strong></label><br> <input type="text" name="ws_url" id="ws_url" value="ws://localhost:9999/websocket" style="width: 50%; height: 32px;" /><br><br> <button type="button" id="btn_connect" class="btn btn-default btn-sm">Connect</button> <button type="button" id="btn_close" class="btn btn-default btn-sm" style="display: none;">Close</button> </p> <p id="message_area" style="display: none;"> <label><strong>Message:</strong></label><br> <input type="text" name="message" id="message" style="width: 50%; height: 32px;" /><br><br> <button type="button" id="btn_send" class="btn btn-default btn-sm">Send</button> </p> <p> </p> <div id="result_area" style="padding: 15px; width: 50%; font-size: 12px; min-height: 120px;"> </div> <script type="text/javascript"> var globalSocket = null; $(document).ready(function() { $("#btn_connect").click(function(e) { connectWebSocket(); }); $("#btn_close").click(function(e) { closeWebSocket(); }); $("#btn_send").click(function(e) { if (globalSocket != null) { var msg = $("#message").val(); if (msg == '') { alert("Please enter message"); $("#message").focus(); return; } globalSocket.send(msg); } }); }); function connectWebSocket() { var wsUrl = $("#ws_url").val(); if (wsUrl == '') { alert("Please enter url"); $("#ws_url").focus(); return; } if (globalSocket == null) { $("#result_area").html(''); $("#btn_execute").attr("disabled", "disabled"); createWebSocket(wsUrl); } } function createWebSocket(url) { if (globalSocket != null || url == '') return; console.log("createWebSocket(): url = ", url); globalSocket = new WebSocket(url); globalSocket.onopen = funcWSOpen; globalSocket.onclose = funcWSClose; globalSocket.onerror = funcWSError; globalSocket.onmessage = funcWSMessage; } function closeWebSocket() { if (globalSocket != null) { console.log("closeWebSocket(): close"); globalSocket.close(); $("#btn_close").attr("disabled", "disabled"); $("#message_area").css("display", "none"); } } function funcWSOpen(e) { console.log("funcWSOpen(): ", e); $("#message_area").css("display", ""); $("#btn_close").removeAttr("disabled"); $("#btn_close").css("display", ""); $("#result_area").append("<br>WSOpen: Connected<br>"); } function funcWSClose(e) { console.log("funcWSClose(): ", e); $("#result_area").append("<br>WSClose: Close<br>"); $("#btn_execute").removeAttr("disabled"); $("#btn_close").css("display", "none"); globalSocket = null; } function funcWSError(e) { console.error("funcWSError(): ", e); $("#result_area").append("<br>WSError: Error<br>"); $("#btn_execute").removeAttr("disabled"); $("#btn_close").css("display", "none"); globalSocket = null; } function funcWSMessage(e) { console.log("funcWSMessage(): e.data = ", e.data); $("#result_area").append("<br>WSMessage: " + e.data + "<br>"); } </script> </body> </html>
2) 修改 src/main/java/com/example/controller/IndexController.java 文件
package com.example.controller; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class IndexController { @ResponseBody @RequestMapping("/test") public String test() { return "Test Page"; } @RequestMapping("/client") public String client() { return "client"; } }
访问 http://localhost:9090/client,JAR 方式运行时,页面上 WebSocket url 是 ws://localhost:9999/websocket,点击 Connect 按钮测试。