首页 > 其他分享 >Springboot 系列 (22) - Springboot+Netty | 使用 Netty 封装的 WebSocket 实现消息实例

Springboot 系列 (22) - Springboot+Netty | 使用 Netty 封装的 WebSocket 实现消息实例

时间:2023-02-22 13:22:36浏览次数:65  
标签:Netty http Springboot 22 netty io new import channel


Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 是一个基于 Java NIO 的开发框架,主要针对在 TCP 协议下,面向 Client 端的高并发应用,或者 Peer-to-Peer 场景下的大量数据持续传输的应用。

Netty 的特点:非阻塞、基于事件驱动、高性能、高可靠性和高可定制性。

Java 支持 3 种网络编程模型 I/O 模式:

    (1) BIO (Blocking Input/Output): 同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销;
    (2) NIO (Non-blocking Input/Output): 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有 I/O 请求就进行处理;
    (3) AIO (Asynchronous Input/Output): 异步非阻塞,AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。

各 I/O 模式的适用场景:

    (1) BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,但程序简单易理解;
    (2) NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4 开始支持;
    (3) AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。

Netty: http://netty.io/
Netty GitHub: https://github.com/netty/netty

 

1. 开发环境

    Windows版本:Windows 10 Home (20H2)   
    IntelliJ IDEA (https://www.jetbrains.com/idea/download/):Community Edition for Windows 2020.1.4
    Apache Maven (https://maven.apache.org/):3.8.1
    Redis for Windows:5.0.14

    注:Spring 开发环境的搭建,可以参考 “ Spring基础知识(1)- Spring简介、Spring体系结构和开发环境配置 ”。

 

2. 创建 Spring Boot 基础项目

    项目实例名称:SpringbootExample20
    Spring Boot 版本:2.6.6

    创建步骤:

        (1) 创建 Maven 项目实例 SpringbootExample20;
        (2) Spring Boot Web 配置;
        (3) 导入 Thymeleaf 依赖包;
        (4) 配置 jQuery;
        
    具体操作请参考 “Spring 系列 (2) - 在 Spring Boot 项目里使用 Thymeleaf、JQuery+Bootstrap 和国际化” 里的项目实例 SpringbootExample02,文末包含如何使用 spring-boot-maven-plugin 插件运行打包的内容。

    SpringbootExample20 和 SpringbootExample02 相比,SpringbootExample20 不配置 Bootstrap、模版文件(templates/*.html)和国际化。

 

3. 配置 Netty

    1) 导入 Netty 依赖包

        访问 http://www.mvnrepository.com/,查询 netty,修改 pom.xml

            <project ... >
                ...
                <dependencies>
                    ...

                    <!-- Netty -->
                    <dependency>
                        <groupId>io.netty</groupId>
                        <artifactId>netty-all</artifactId>
                        <version>4.1.89.Final</version>
                    </dependency>

                    ...
                </dependencies>

                ...
            </project>


        在IDE中项目列表 -> SpringbootExample20 -> 点击鼠标右键 -> Maven -> Reload Project

    2) 修改 src/main/resources/application.properties 文件,内容如下

        spring.main.banner-mode=off

        # Web server
        server.display-name=SpringbootExample20-Test
        server.address=localhost
        server.port=9090

        # Netty server
        netty.websocket.ip=localhost
        netty.websocket.port=9999
        netty.websocket.max-frame-size=10240
        netty.websocket.path=/ws


    3) 创建 src/main/java/com/example/netty/NettyBootsrapRunner.java 文件

        package com.example.netty;

        import java.net.InetSocketAddress;
        import org.springframework.beans.BeansException;
        import org.springframework.beans.factory.annotation.Value;
        import org.springframework.boot.ApplicationArguments;
        import org.springframework.boot.ApplicationRunner;
        import org.springframework.context.ApplicationContext;
        import org.springframework.context.ApplicationContextAware;
        import org.springframework.context.ApplicationListener;
        import org.springframework.context.event.ContextClosedEvent;
        import org.springframework.stereotype.Component;
        import io.netty.bootstrap.ServerBootstrap;
        import io.netty.channel.Channel;
        import io.netty.channel.ChannelFutureListener;
        import io.netty.channel.ChannelHandlerContext;
        import io.netty.channel.ChannelInboundHandlerAdapter;
        import io.netty.channel.ChannelInitializer;
        import io.netty.channel.ChannelPipeline;
        import io.netty.channel.EventLoopGroup;
        import io.netty.channel.nio.NioEventLoopGroup;
        import io.netty.channel.socket.SocketChannel;
        import io.netty.channel.socket.nio.NioServerSocketChannel;
        import io.netty.handler.codec.http.DefaultFullHttpResponse;
        import io.netty.handler.codec.http.FullHttpRequest;
        import io.netty.handler.codec.http.HttpObjectAggregator;
        import io.netty.handler.codec.http.HttpResponseStatus;
        import io.netty.handler.codec.http.HttpServerCodec;
        import io.netty.handler.codec.http.HttpVersion;
        import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
        import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
        import io.netty.handler.stream.ChunkedWriteHandler;

        @Component
        public class NettyBootsrapRunner implements ApplicationRunner, ApplicationListener<ContextClosedEvent>, ApplicationContextAware {

            @Value("${netty.websocket.port}")
            private int port;
            @Value("${netty.websocket.ip}")
            private String ip;
            @Value("${netty.websocket.path}")
            private String path;
            @Value("${netty.websocket.max-frame-size}")
            private long maxFrameSize;
            private ApplicationContext applicationContext;
            private Channel serverChannel;

            public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
                this.applicationContext = applicationContext;
            }

            public void run(ApplicationArguments args) throws Exception {

                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();
                try {
                    ServerBootstrap serverBootstrap = new ServerBootstrap();
                    serverBootstrap.group(bossGroup, workerGroup);
                    serverBootstrap.channel(NioServerSocketChannel.class);
                    serverBootstrap.localAddress(new InetSocketAddress(this.ip, this.port));
                    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new HttpServerCodec());
                            pipeline.addLast(new ChunkedWriteHandler());
                            pipeline.addLast(new HttpObjectAggregator(65536));
                            pipeline.addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    if(msg instanceof FullHttpRequest) {
                                        FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
                                        String uri = fullHttpRequest.uri();
                                        if (!uri.equals(path)) {
                                            // Not websocket uri, return 404
                                            ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND))
                                                    .addListener(ChannelFutureListener.CLOSE);
                                            return ;
                                        }
                                    }
                                    super.channelRead(ctx, msg);
                                }
                            });
                            pipeline.addLast(new WebSocketServerCompressionHandler());
                            pipeline.addLast(new WebSocketServerProtocolHandler(path, null, true, maxFrameSize));

                            // Get handler from IOC
                            pipeline.addLast(applicationContext.getBean(WebsocketMessageHandler.class));
                        }
                    });

                    System.out.println("NettyBootsrapRunner -> run(): (" + this.ip + ":" + this.port + ")");
                    Channel channel = serverBootstrap.bind().sync().channel();
                    this.serverChannel = channel;
                    channel.closeFuture().sync();

                } finally {
                    bossGroup.shutdownGracefully();
                    workerGroup.shutdownGracefully();
                }
            }

            public void onApplicationEvent(ContextClosedEvent event) {
                if (this.serverChannel != null) {
                    this.serverChannel.close();
                }

                System.out.println("NettyBootsrapRunner -> onApplicationEvent(): stop websocket");
            }
        }


    4) 创建 src/main/java/com/example/netty/WebsocketMessageHandler.java 文件

        package com.example.netty;

        import org.springframework.stereotype.Component;
        import io.netty.channel.ChannelHandlerContext;
        import io.netty.channel.SimpleChannelInboundHandler;
        import io.netty.channel.ChannelFutureListener;
        import io.netty.channel.ChannelHandler.Sharable;
        import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
        import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
        import io.netty.handler.codec.http.websocketx.WebSocketFrame;

        @Sharable
        @Component
        public class WebsocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

            @Override
            protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
                if (msg instanceof TextWebSocketFrame) {
                    TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;

                    // Reply to client
                    ctx.channel().writeAndFlush(new TextWebSocketFrame("Received your message -> " + textWebSocketFrame.text()));
                } else {

                    // Invalid message type
                    ctx.channel().writeAndFlush(WebSocketCloseStatus.INVALID_MESSAGE_TYPE).addListener(ChannelFutureListener.CLOSE);
                }
            }

            @Override
            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                super.channelInactive(ctx);
                System.out.println("WebsocketMessageHandler -> channelInactive(): " + ctx.channel().remoteAddress());
            }

            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                super.channelActive(ctx);
                System.out.println("WebsocketMessageHandler -> channelActive(): " + ctx.channel().remoteAddress());
            }
        }


    5) 运行

        Edit Configurations

            Click "+" add new configuration -> Select "Maven"

                Command line: clean spring-boot:run
                Name: SpringbootExample20 [clean,spring-boot:run]

            -> Apply / OK

        Click Run "SpringbootExample02 [clean,spring-boot:run]"

            ...

            Spring boot example project       

        访问 http://localhost:9090/test

            Test Page

        注:打包可以将 Command line 改成 clean package spring-boot:repackage

 

4. 测试实例 (Web 模式)

    1) 创建 src/main/resources/templates/client.html 文件

        <html lang="en" xmlns:th="http://www.thymeleaf.org">
        <head>
            <meta charset="UTF-8">
            <title th:text="${var}">Client</title>
            <script language="javascript" th:src="@{/lib/jquery/jquery-3.6.0.min.js}"></script>
        </head>
        <body>

        <h4>Netty WebSocket - Client</h4>
        <p>&nbsp;</p>

        <p>
            <label><strong>WebSocket url:</strong></label><br>
            <input type="text" name="ws_url" id="ws_url" value="ws://localhost:9999/websocket" style="width: 50%; height: 32px;" /><br><br>
            <button type="button" id="btn_connect" class="btn btn-default btn-sm">Connect</button>
            <button type="button" id="btn_close" class="btn btn-default btn-sm" style="display: none;">Close</button>
        </p>

        <p id="message_area" style="display: none;">
            <label><strong>Message:</strong></label><br>
            <input type="text" name="message" id="message" style="width: 50%; height: 32px;" /><br><br>
            <button type="button" id="btn_send" class="btn btn-default btn-sm">Send</button>
        </p>

        <p>&nbsp;</p>

        <div id="result_area" style="padding: 15px; width: 50%;  font-size: 12px; min-height: 120px;">
        </div>

        <script type="text/javascript">
            var globalSocket = null;

            $(document).ready(function() {
                $("#btn_connect").click(function(e) {
                    connectWebSocket();
                });

                $("#btn_close").click(function(e) {
                    closeWebSocket();
                });

                $("#btn_send").click(function(e) {
                if (globalSocket != null) {
                    var msg = $("#message").val();
                    if (msg == '') {
                        alert("Please enter message");
                        $("#message").focus();
                        return;
                    }

                    globalSocket.send(msg);
                }
                });
            });

            function connectWebSocket() {
                var wsUrl = $("#ws_url").val();
                if (wsUrl == '') {
                    alert("Please enter url");
                    $("#ws_url").focus();
                    return;
                }

                if (globalSocket == null) {
                    $("#result_area").html('');
                    $("#btn_execute").attr("disabled", "disabled");

                    createWebSocket(wsUrl);
                }
            }

            function createWebSocket(url) {
                if (globalSocket != null || url == '')
                    return;

                console.log("createWebSocket(): url = ", url);
                globalSocket = new WebSocket(url);
                globalSocket.onopen = funcWSOpen;
                globalSocket.onclose = funcWSClose;
                globalSocket.onerror = funcWSError;
                globalSocket.onmessage = funcWSMessage;
            }

            function closeWebSocket() {
                if (globalSocket != null) {
                    console.log("closeWebSocket(): close");
                    globalSocket.close();
                    $("#btn_close").attr("disabled", "disabled");
                    $("#message_area").css("display", "none");
                }
            }

            function funcWSOpen(e) {
                console.log("funcWSOpen(): ", e);

                $("#message_area").css("display", "");
                $("#btn_close").removeAttr("disabled");
                $("#btn_close").css("display", "");

                $("#result_area").append("<br>WSOpen: Connected<br>");
            }

            function funcWSClose(e) {
                console.log("funcWSClose(): ", e);

                $("#result_area").append("<br>WSClose: Close<br>");
                $("#btn_execute").removeAttr("disabled");
                $("#btn_close").css("display", "none");
                globalSocket = null;
            }

            function funcWSError(e) {
                console.error("funcWSError(): ", e);

                $("#result_area").append("<br>WSError: Error<br>");
                $("#btn_execute").removeAttr("disabled");
                $("#btn_close").css("display", "none");
                globalSocket = null;
            }

            function funcWSMessage(e) {
                console.log("funcWSMessage(): e.data = ", e.data);
                $("#result_area").append("<br>WSMessage: " + e.data + "<br>");
            }
        </script>
        </body>
        </html>


    2) 修改 src/main/java/com/example/controller/IndexController.java 文件

        package com.example.controller;

        import org.springframework.stereotype.Controller;
        import org.springframework.web.bind.annotation.RequestMapping;
        import org.springframework.web.bind.annotation.ResponseBody;

        @Controller
        public class IndexController {
            @ResponseBody
            @RequestMapping("/test")
            public String test() {
                return "Test Page";
            }

            @RequestMapping("/client")
            public String client() {
                return "client";
            }

        }


    访问 http://localhost:9090/client,JAR 方式运行时,页面上 WebSocket url 是 ws://localhost:9999/websocket,点击 Connect 按钮测试。



标签:Netty,http,Springboot,22,netty,io,new,import,channel
From: https://www.cnblogs.com/tkuang/p/17144002.html

相关文章