首页 > 其他分享 >netty WebSocket客户端实践

netty WebSocket客户端实践

时间:2023-08-28 21:32:09浏览次数:36  
标签:netty WebSocket ctx handler io import channel 客户端

在之前的Socket学习中,主要都是基于两个Socket客户端:WebSocketSocket.IO。在做测试的时候也是基于WebSocket消息的发送和接收为主要测试对象。但是对于超多Socket连接没有涉及。

在实践中会发现,这两个实现类都存在一个问题,为了维护1个Socket连接及其功能,通常需要创建多个线程。在计算机硬件资源有限的情况下,线程是稀缺资源,不仅仅是内存占用,也会增加CPU的负担。

之前解决这个问题的方案直接换成Go语言版本的Socket客户端。例如:/net/websocketgorilla/websocket

其实Java也有相对应的解决方案:netty。话不多说,上代码。

依赖

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.85.Final</version>
</dependency>

netty WebSocket客户端

客户端主要的功能就是创建连接,然后使用一个事件处理线程池管理连接以及收发消息io.netty.channel.EventLoopGroup,然后使用一个io.netty.bootstrap.Bootstrap来作为引导程序。

package com.funtester.socket.netty  
  
import com.funtester.frame.execute.ThreadPoolUtil  
import groovy.util.logging.Log4j2  
import io.netty.bootstrap.Bootstrap  
import io.netty.channel.*  
import io.netty.channel.group.ChannelGroup  
import io.netty.channel.group.DefaultChannelGroup  
import io.netty.channel.nio.NioEventLoopGroup  
import io.netty.channel.socket.SocketChannel  
import io.netty.channel.socket.nio.NioSocketChannel  
import io.netty.handler.codec.http.DefaultHttpHeaders  
import io.netty.handler.codec.http.HttpClientCodec  
import io.netty.handler.codec.http.HttpObjectAggregator  
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker  
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory  
import io.netty.handler.codec.http.websocketx.WebSocketVersion  
import io.netty.handler.stream.ChunkedWriteHandler  
import io.netty.util.concurrent.GlobalEventExecutor  
  
@Log4j2  
class WebSocketConnector {  
  
    static Bootstrap bootstrap = new Bootstrap()  
  
    /**  
     * 处理事件的线程池  
     */  
    static EventLoopGroup group = new NioEventLoopGroup(ThreadPoolUtil.getFactory("N"))  
  
    static {  
        bootstrap.group(group).channel(NioSocketChannel.class)  
    }  
  
    /**  
     * 用于记录和管理所有客户端的channel  
     */    
     static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)  
  
    WebSocketClientHandshaker handShaker  
  
    ChannelPromise handshakeFuture  
  
    String host  
  
    int port  
  
    /**  
     * 网络通道  
     */  
    Channel channel  
  
    WebSocketIoHandler handler  
  
    /**  
     * WebSocket协议类型的模拟客户端连接器构造方法  
     *  
     * @param serverIp  
     * @param serverSocketPort  
     * @param group  
     */    WebSocketConnector(String host, int port) {  
        this.host = host  
        this.port = port  
        String URL = this.host + ":" + this.port + "/test"  
        URI uri = new URI(URL)  
        handler = new WebSocketIoHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()))  
        bootstrap.option(ChannelOption.TCP_NODELAY, true)  
                .option(ChannelOption.SO_TIMEOUT, true)  
                .option(ChannelOption.SO_BROADCAST, true)  
                .option(ChannelOption.SO_KEEPALIVE, true)  
                .handler(new ChannelInitializer<SocketChannel>() {  
  
                    @Override  
                    protected void initChannel(SocketChannel ch) throws Exception {  
                        ChannelPipeline pipeline = ch.pipeline()  
                        pipeline.addLast(new HttpClientCodec())  
                        pipeline.addLast(new ChunkedWriteHandler())  
                        pipeline.addLast(new HttpObjectAggregator(1024 * 1024))  
                        pipeline.addLast(handler)  
                    }  
                })  
    }  
  
  
    /**  
     * 连接  
     */  
    void connect() {  
        try {  
            try {  
                ChannelFuture future = bootstrap.connect(this.host - "ws://" - "wss://", this.port).sync()  
                this.channel = future.channel()  
                clients.add(channel)  
            } catch (e) {  
                log.error("创建channel失败", e)  
            }  
        } catch (Exception e) {  
            log.error("连接服务失败", e)  
        } finally {  
            this.handshakeFuture = handler.handshakeFuture()  
        }  
    }  
  
    /**  
     * 关闭  
     */  
    void close() {  
        this.channel.close()  
    }  
  
}

这里用到了一个保存现在的所有的活跃channel的类io.netty.channel.group.ChannelGroup,有点就是可以自动管理所有的channel,还能自动剔除已经关闭的channel

这里还有补充2个发送消息的方法:

/**  
 * 发送文本消息  
 */  
void sendText(String msg) {  
    channel.writeAndFlush(new TextWebSocketFrame(msg))  
}  
  
/**  
 * 发送ping消息  
 */  
void ping() {  
    channel.writeAndFlush(new PingWebSocketFrame())  
}

消息处理器

这里需要处理的消息各种类型,继承io.netty.channel.SimpleChannelInboundHandler实现不同的方法即可。

这里有个泛型设置可以直接设置成不同的消息类型,例如io.netty.handler.codec.http.websocketx.WebSocketFrame及其子类,如果确定服务端发来消息的类型的话,可以更加省事儿。

package com.funtester.socket.netty  
  
import groovy.util.logging.Log4j2  
import io.netty.channel.*  
import io.netty.channel.group.ChannelGroup  
import io.netty.channel.group.DefaultChannelGroup  
import io.netty.handler.codec.http.FullHttpResponse  
import io.netty.handler.codec.http.websocketx.*  
import io.netty.handler.timeout.IdleState  
import io.netty.handler.timeout.IdleStateEvent  
import io.netty.util.concurrent.GlobalEventExecutor  
/**  
 * WebSocket协议类型的模拟客户端IO处理器类  
 */  
@Log4j2  
class WebSocketIoHandler extends SimpleChannelInboundHandler<Object> {  
  
    /**  
     * 用于记录和管理所有客户端的channel  
     */    private ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)  
  
    private final WebSocketClientHandshaker handShaker  
  
    private ChannelPromise handshakeFuture  
  
    WebSocketIoHandler(WebSocketClientHandshaker handShaker) {  
        this.handShaker = handShaker  
    }  
  
    ChannelFuture handshakeFuture() {  
        return handshakeFuture  
    }  
  
    @Override  
    void handlerAdded(ChannelHandlerContext ctx) {  
        handshakeFuture = ctx.newPromise()  
    }  
  
    @Override  
    void channelActive(ChannelHandlerContext ctx) {  
        handShaker.handshake(ctx.channel());  
    }  
  
    @Override  
    void channelInactive(ChannelHandlerContext ctx) {  
        ctx.close()  
        try {  
            super.channelInactive(ctx)  
        } catch (Exception e) {  
            log.error("channelInactive 异常.", e)  
        }  
        log.warn("WebSocket链路与服务器连接已断开.")  
    }  
  
    @Override  
    void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {  
        Channel ch = ctx.channel()  
        if (!handShaker.isHandshakeComplete()) {  
            try {  
                handShaker.finishHandshake(ch, (FullHttpResponse) msg)  
                handshakeFuture.setSuccess()  
            } catch (WebSocketHandshakeException e) {  
                log.warn("WebSocket Client failed to connect",e)  
                handshakeFuture.setFailure(e)  
            }  
            return  
        }  
  
        WebSocketFrame frame = (WebSocketFrame) msg  
        if (frame instanceof TextWebSocketFrame) {  
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame  
            String s = textFrame.text()  
        } else if (frame instanceof CloseWebSocketFrame) {  
            log.info("WebSocket Client closing")  
            ch.close()  
        }  
    }  
  
    @Override  
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
        log.error("WebSocket链路由于发生异常,与服务器连接已断开.", cause)  
        if (!handshakeFuture.isDone()) {  
            handshakeFuture.setFailure(cause)  
        }  
        ctx.close()  
        super.exceptionCaught(ctx, cause)  
    }  
  
    @Override  
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
        if (evt instanceof IdleStateEvent) {  
            IdleStateEvent event = (IdleStateEvent) evt  
            // 如果写通道处于空闲状态,就发送心跳命令  
            if (IdleState.WRITER_IDLE == event.state() || IdleState.READER_IDLE == event.state()) {  
                // 发送心跳数据  
                def channel = ctx.channel()  
                channel.writeAndFlush(new TextWebSocketFrame("dsf"))  
            }  
        } else {  
            super.userEventTriggered(ctx, evt)  
        }  
    }  
}

这里处理接收到消息的时候并没有选择保存消息的功能,因为netty WebSocket使用场景就是超大量(超过1w)连接。保留返回消息,进行业务验证通常不是这类测试场景的首要目的。所以以后等用到了再说吧。

后面会对比这3种Socket客户端包括Go语言两种Socket客户端在超大量连接方面的资源占用。

标签:netty,WebSocket,ctx,handler,io,import,channel,客户端
From: https://blog.51cto.com/FunTester/7267351

相关文章

  • Python Tornado实现WebSocket通讯
    以下实例有点像广播通讯,支持多个客户端连接代码参考别人的,稍微做了一点点修改服务器端websocket_demo.py #-*-coding:utf-8-*-from__future__importunicode_literals,print_functionfromdatetimeimportdatetimeimportosimporttornadofromtornado.opti......
  • 某集成电路企业采用宁盾无客户端网络准入控制方案让终端管理省事更省心!
    宁盾无客户端网络准入控制方案正在成为先进制造、高科技互联网企业等创新型客户的优选方案。创新型客户以技术密集型、研发人员占比高著称,在进行网络准入建设时,如何平衡好用户体验与顺利达成项目预期之间的矛盾,是创新企业IT安全团队格外关注的重点。本次,小盾通过集成电路企业的准......
  • 《MySQL命令行客户端》的使用方法
    MySQL客户端连接工具有多种,但最常用的是MySQL命令行客户端。下面是MySQL命令行客户端的语法:mysql-hhostname-uusername-ppassword-Ddatabase_name其中:-h 参数指定要连接的MySQL服务器的主机名或IP地址。-u 参数指定要连接的MySQL服务器的用户名。-p 参数提示输......
  • MQTT服务器的搭建与MQTT客户端的使用
    一、MQTT服务器(emqx)搭建1、下载MQTTBroker官方下载地址:emqx-5.0.4-windows-amd64.tar.gz。百度网盘地址emqx-windows-4.3.10.zip:链接:https://pan.baidu.com/s/1XaPkWTI_AtYmWVuMD8d5HQ?pwd=n99m提取码:n99m其他版本选择:https://www.emqx.io/downloads?os=Windows。 2.不用......
  • JSP获得服务端与客户端信息
    System.out.println("Protocol:"+request.getProtocol());System.out.println("Scheme:"+request.getScheme());System.out.println("ServerName:"+request.getServerName());System.out.println("ServerPort:"+re......
  • Netty 的 ChannelOption.SO_BACKLOG 知识点整理
    Netty的ChannelOption.SO_BACKLOG知识点整理 一个基于Netty的应用,在压力测试时,Socket请求数量一多,就发送失败,监测JVM内存大小比较稳定,猜测可能是ChannelOption.SO_BACKLOG这个配置导致的,设置的值是128。调整为1024后,连接失败的次数确实减少了一些,那么这个配置到......
  • 在SpringBoot中使用WebSocket
    一、概述最近在SpringBoot中用到了WebSocket的功能,在此记录一下使用步骤。SpringBoot使用WebSocket还是比较简单的,只需要4步就行了。如下二、使用步骤第一步:在pom.xml文件中导入坐标<!--websocketdependency--><dependency><grou......
  • ETL之apache hop系列3-hop Server环境部署与客户端发布管道工作流
    前言该文档主要是apachehop2.5的Windows10和Linuxdocker环境部署和客户端发布工作流和管道的相关内容不使用Docker直接使用应用程序包,下载压缩包文件后,需要解压缩文件并使用以下命令启动HopGUI:在Linux上:./hop-gui.sh在Windows上(需要JDK11):Hop-gui.bat一、Windo......
  • HTML5客户端数据存储
    HTML5使在不影响网站性能的情况下存储大量数据成为可能。之前,这些都是由cookie完成的,cookie不适合大量数据的存储,因为会影响速度。举个例子:varobj={x:1};//例1localStorage.obj=obj;//不能直接存储非字符串类型alert(localStorage.obj.x);//undefinedlocalStorage.......
  • Netty源码学习3——Channel ,ChannelHandler,ChannelPipeline
    系列文章目录和关于我零丶引入在Netty源码学习2——NioEventLoop的执行中,我们学习了NioEventLoop是如何进行事件循环以及如何修复NIO空轮询的bug的,但是没有深入了解IO事件在netty中是如何被处理的,下面我们以服务端demo代码为例子,看下和IO事件处理密切的Channel如上在编写nett......