首页 > 其他分享 >Netty 原理解析与实战开发(二)

Netty 原理解析与实战开发(二)

时间:2023-08-03 17:36:31浏览次数:49  
标签:实战 Netty null WebSocket long new 解析 public

Netty 原理解析与开发实战

八、ChannelHandler

8.1 ChannelHandler介绍

我们对数据的处理都是在ChannelHandler中完成的,Netty提供了众多ChannelHandler的实现类来帮助我们实现一些网络编程中通用功能,比如最常用的心跳检测、数据编解码等。

Netty中的ChannelHandler分为两类,一类处理入站数据,都实现了ChannelInboundHandler接口,一类处理出站数据,都实现了ChannelOutboundHandler接口。前面章节所说的编解码器是一类比较特殊的Handler,解码器负责将入站byte数据解码为对象,编码器负责将出站对象数据编码为byte数据。

ChannelHandler在进行数据处理的时候有匹配机制,意思就说如果当前Handler所注册的类型不能够处理,则直接交给下一个处理器。具体的匹配方法大致是:在运行时获取到类的模版参数列表,获得运行时模版类的Class对象,然后当要处理数据时,调用class.isInstance(msg)来判断当前数据是否是该模版类中模版的对象,如果是,则处理,如果不是,则交给下一个处理器。

上面的匹配方式也给了我一些启发,即如何模版类如何在运行时获取到模版所代表的类型,模版类即public class ClassA<T,I>(其中T和I被称为模版,这在写框架的时候应该很有用。示例代码如下:

// 示例父类
public class Role <T>{
    T t;
}
// 示例子类,继承了 Role<T>,该子类必须指定T的类型,不然会报错
public class Admin extends Role<String>{

}

// 下面的代码会输出 true false
Role<?> role = new Admin();
TypeParameterMatcher matcher = TypeParameterMatcher.find(role, Role.class, "T");
System.out.println(matcher.match(""));
System.out.println(matcher.match(0));

ChannelHandler为什么会一直传下去呢?
下图展示了ChannelHandler从调用Pipeline.fireChannelRead()函数后ChannelHandler是如何传播OP_READ事件的。

Alt text
如上图所示,OP_READ事件的传播过程如下:

  1. 当调用事件循环器检测到有OP_READ事件后,调用unsafe.read()来读取从java.nio.channels.SocketChannel中读取数据;
  2. 然后调用pipeline.fireChannelRead方法将数据传入处理器链中,(pipeline中包含了一个双向链表,链表头为head,链表尾为tail)
  3. 然后pipeline调用head.invokeChannelRead方法,该方法会主动调用该ChannelHandlerContext下的handler的channelRead方法(channelRead方法是我们经常来处理数据的地方),在该方法内,用户可自己选择是否将事件传播到下一个处理器中
  4. 当用户可以通过主动调用ChannelHandlerContext对象的fireChannelRead方法时,该方法会自动找到下一个入站事件处理器,然后调用该事件处理器的invokeChannelRead方法,从而将事件传播下去。

OP_READ事件处理流程

  1. 通过Selector.selectNow()检测到读事件
  2. 检查Channel.config是否读取,如果可读取,则调用channel.unsafe.read()方法
  3. unsafe通过RecvByteBufAllocator分配接收缓存,然后将javaChannel中的数据写入道该缓存中
  4. unsafe将读取到的数据放入当前Channel的pipeline中。

OP_WRITE事件处理流程

  1. 用户调用ctx.write()方法,最终会调用当前channel的unsafe的write方法将数据写入到ChannelOutboundBuffer中,ChannelOutboundBuffer是Netty的出站数据缓存。
  2. 用户调用ctx.flush()方法,最终会调用当前channel的unsafe的flush方法,将ChannelOutboundBuffer中的数据写入到javaChannel中。

8.2 超时处理、心跳检测

首先来说心跳检测,Netty提供了IdleStateHandler类来帮助我们完成心跳检测功能,该类实现了ChannelDuplexHandler类,说明该类是一个双向处理器,可以处理入站和出站事件,主要的构造方法如下:

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
  • observeOutput:如果设置该值为true,Netty将考虑是否是接受端接收数据缓慢的问题(通过检查出站缓存区)导致写超时。如果出站缓慢,则Netty不认为是这是空闲。

  • readerIdleTime:表示读空闲,多长时间未接收到数据后将触发READER_IDLE_STATE_EVENT事件

  • writerIdleTime:表示写空闲,多长时间未写入数据后将触发WRITER_IDLE_STATE_EVENT事件

  • allIdleTime:表示读写都空闲,多长时间未读入或者写入后将触发ALL_IDLE_STATE_EVENT事件

  • unit:表示以上参数的时间单位

一般需要将IdleStateHandler处理器加入到处理链的最前面,并且要注意的是该处理器仅仅按以上逻辑来触发一个用户事件,对事件的处理还需要用户自定义处理器来实现,并且该自定义处理器需要实现userEventTriggered方法。

示例代码如下:

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent event = (IdleStateEvent) evt;
        switch (event.state()) {
            case READER_IDLE: {
                // 处理读空闲
            }break;
            case WRITER_IDLE: {
                // 处理写空闲
            }break;
            case ALL_IDLE: {
                // 处理读写空闲
            }
        }
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

读超时:读超时的实现有ReadTimeoutHandler,当发生读超时,会触发ReadTimeoutException异常并关闭通道。

写超时:写超时的实现有WriteTimeoutHandler,当写入数据时,如果任务过了设定的时间后还没开始做,则会触发WriteTimeoutExeception。底层通过定时任务+异步编程实现,提交一个定时任务,当时间到达后,如果给定的Promise的isDone()返回false,则表明发生写超时。

8.3 Netty日志框架

Netty内部有自己的日志框架,但是可配置性不高,因此我们在这里引入log4j日志框架。

首先,引入pom依赖包:

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>2.0.5</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>2.0.5</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>2.0.5</version>
</dependency>

然后配置Log4j,resources/log4j.properties,完毕,Netty会自动探测log4j。
log4j配置详见log4j配置详解

8.4 IP地址过滤

Netty提供了IP地址过滤的Handler,只需配置一些规则,即可使用。主要的Handler如下:

  • RuleBasedIpFilter:基于规则的IP过滤器
// 创建规则
IpFilterRule ipFilterRule = new IpSubnetFilterRule("127.0.0.1", 32, IpFilterRuleType.ACCEPT);
new RuleBasedIpFilter(ipFilterRule);
  • IpSubnetFilter:Ip子网过滤器,也需要配置规则
IpSubnetFilterRule ipFilterRule = new IpSubnetFilterRule("127.0.0.1", 32, IpFilterRuleType.ACCEPT);
new IpSubnetFilter(ipFilterRule);
  • UniqueIpFilter:用来确保一个IP只建立一个连接的过滤器

8.5 大数据流的处理

Netty中提供了对大数据流的处理,当我们想向通道中写大数据流的时候可以使用netty提供的方法。几个主要的类如下:

  • ChunkedInput:是对输入数据的抽象,提供了ChunkedFileChunkedNioFileChunkedStream等实现。

  • ChunkedWriteHandler:Netty提供的大数据写入的处理器,该处理器接收ChunkedInput对象,并对该对象进行处理,将数据以块的方式写入到Channel中。

示例代码如下:

pipeline.addLast(new ChunkedWriteHandler())
    .addLast(new MyChannelHandler());

//然后可以直接使用下面方式写入文件等数据流,ChunckedWriteHandler会自动对ChunkedInput的子类进行处理
ctx.writeAndFlush(new ChunkedNioFile(FileChannel.open(Paths.get("t.txt"))));

8.6 数据安全,使用SSL/TLS协议

SSL/TLS 中文全称为安全套接字层(Secure Sockets Layer),首先我们需要生成密钥仓库,生成方式详见:使用 keytool 生成密钥对 + keytool 命令详解

一般认证方式有两种,双向认证和单向认证,双向认证需要双方都持有对方的证书,单向认证中,一般是客户端需持有服务器的证书(1.先导出服务器的cer证书 2.将服务器的cer证书导入到客户端的秘钥仓库中)。

在Netty中使用SSL/TLS协议非常简单,有以下几步(该方式较为第二步较为繁琐,可看下面的方法):

  1. 通过keytool生成密钥仓库,具体生成命令可以见使用 keytool 生成密钥对 + keytool 命令详解
  2. 编写工具类,该工具类的作用是读取密钥仓库初始化SSLContext对象。
    具体代码如下:

SslUtil.java


import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.*;
import java.security.cert.CertificateException;


public class SslUtil {

    public static SSLContext getClientContext(String pkPath, String password) {
        return getContext(pkPath, null, password);
    }
    
    public static SSLContext getServerContext(String pkPath, String caPath, String password) {
        return getContext(password, caPath, password);
    }

    /**
     * 获取SSL上下文
     * @param pkPath jks密钥仓库文件,单向验证中服务器方需要提供
     * @param caPath ca仓库文件
     * @param password 密码
     * @return SSLContext
     */
    public static SSLContext getContext(String pkPath, String caPath, String password) {
        KeyManagerFactory kmf = null;
        TrustManagerFactory tfm = null;
        if (pkPath != null) {
            try (InputStream pkIn = new FileInputStream(pkPath)) {

                KeyStore ks = KeyStore.getInstance("JKS");
                ks.load(pkIn, password.toCharArray());
                kmf = KeyManagerFactory.getInstance("SunX509");
                kmf.init(ks, password.toCharArray());
            } catch (KeyStoreException | CertificateException | IOException | NoSuchAlgorithmException | UnrecoverableKeyException e) {
                e.printStackTrace();
            }
        }
        if (caPath != null) {
            try (InputStream caIn = new FileInputStream(caPath)) {
                KeyStore tks = null;
                tks = KeyStore.getInstance("JKS");
                tfm = TrustManagerFactory.getInstance("SunX509");
                tfm.init(tks);
                tks.load(caIn, password.toCharArray());
            } catch (CertificateException | IOException | NoSuchAlgorithmException | KeyStoreException e) {
                e.printStackTrace();
            }
        }
        try {
            SSLContext sslContext = SSLContext.getInstance("TLS");
            sslContext.init(kmf == null ? null : kmf.getKeyManagers(),
                    tfm == null ? null : tfm.getTrustManagers(), null);
            return sslContext;
        } catch (NoSuchAlgorithmException | KeyManagementException e) {
            e.printStackTrace();
        }
        return null;
    }
}

SslContextFactory.java


public class SslContextFactory {
    private static SSLContext SSL_CONTEXT;

    public static SSLContext getSslContext(String pkPath, String caPath, String password) {
        if (SSL_CONTEXT != null) {
            return SSL_CONTEXT;
        }
        // 这里也可以根据情况调用SslUtil.getClientContext()或者SslUtil.getServerContext()方法
        SSL_CONTEXT = SslUtil.getContext(pkPath, caPath, password);
        return SSL_CONTEXT;
    }
}

  1. 将SslHandler添加到Pipeline中,SslHandler的构造方法中需要SSLEngine,可以通过SSLContext获取。代码如下:
// 提供jks密钥仓库文件的路径
String jksPath = Paths.get(System.getProperty("user.dir"), "nettyServer.jks").toString();
// 一般情况下在开发过程中,密钥仓库和证书仓库是同一个,其他的不一定。
SSLEngine sslEngine = SslContextFactory.getSslContext(jksPath, jksPath, "123456")
    .createSSLEngine();
    // 设置是否需要验证客户端
    //sslEngine.setNeedClientAuth(true);
    // 设置当前SSLEngine的模式,true-客户端模式 false-服务器模式
sslEngine.setUseClientMode(false);

//....

//一般将SslHandler添加到Pipeline的最前面
pipe.addLast(new SslHandler(sslEngine))
    //.addLast(...)

使用Netty提供的方法生成SslHandler

// 指定密钥仓库文件地址
Path path = Paths.get(System.getProperty("user.dir"), "nettyServer.jks");
try (InputStream is = Files.newInputStream(path)) {
    // 新建一个KeyStore对象,从流中加载密钥仓库,密钥仓库类型为JKS
    KeyStore keyStore = KeyStore.getInstance("JKS");
    keyStore.load(is, "123456".toCharArray());
    // 获取私钥
    PrivateKey privateKey = (PrivateKey) keyStore.getKey("nettyserver", "123456".toCharArray());
    // 获取证书
    X509Certificate certificate = (X509Certificate) keyStore.getCertificate("nettyserver");
    // 通过SslContextBuilder构建SslContext
    SslContext sslContext = SslContextBuilder.forServer(privateKey, certificate).build();
    // 通过SslContext创建新的SslHandler
    SslHandler sslHandler = sslContext.newHandler(PooledByteBufAllocator.DEFAULT);
}

以上是生成服务端Sslhandler的方法,如果是客户端,只需要提供X509Certificate对象,然后调用SslContextBuilder.forClient()方法即可。如下:

// 指定证书文件地址
Path path = Paths.get(System.getProperty("user.dir"), "nettyServer.cer");
try (InputStream is = Files.newInputStream(path)) {
    // 从文件中获取证书
    X509CertImpl x509Cert = new X509CertImpl(is);
    // 使用SslContextBuilder构建SslContext,这里trustManager可以传入多个
    SslContext sslContext = SslContextBuilder.forClient().trustManager(x509Cert).build();
    // 通过SslContext新建SslHandler
    SslHandler sslHandler = sslContext.newHandler(PooledByteBufAllocator.DEFAULT);
}

如果需要双向认证,则客户端可以进行如下配置:

 // 指定证书文件地址
Path path = Paths.get(System.getProperty("user.dir"), "nettyServer.cer");
Path jksPath = Paths.get(System.getProperty("user.dir"), "nettyClient.jks");
try (InputStream is = Files.newInputStream(path);
        InputStream jksIs = Files.newInputStream(jksPath)) {
    // 从文件中获取证书
    X509CertImpl x509Cert = new X509CertImpl(is);

    // 加载密钥和公钥
    KeyStore keyStore = KeyStore.getInstance("JKS");
    keyStore.load(jksIs, "123456".toCharArray());
    PrivateKey key = (PrivateKey) keyStore.getKey("nettyclient", "123456".toCharArray());
    X509Certificate x509Certificate = (X509Certificate) keyStore.getCertificate("nettyclient");

    // 使用SslContextBuilder构建SslContext
    SslContext sslContext = SslContextBuilder.forClient()
            .trustManager(x509Cert)
            // 客户端的私钥和公钥
            .keyManager(key, x509Certificate)
            .build();
    // 通过SslContext新建SslHandler
    SslHandler sslHandler = sslContext.newHandler(PooledByteBufAllocator.DEFAULT);
}

8.7 流量整形

前面有讲过使用ChannelOption配置Channel的高低水位,通过高低水位可以控制Channel的写入速度(写入前需要调用channel.isWritable()方法来判断通道是否可写,如果直接写,可能导入写入失败)。本小节来讲流量整形,它是流量控制的一种机制。流量整形是一种主动调整流量输出速率的措施。它的思路如下:

  • 写入:对通道的写入事件进行监控,来实时计算写入速率,然后每次写入前计算一个写入等待时间(该等待时间是根据写入速率求得的,如果速率过快,则等待时间越大),如果等待时间大于10ms,则会将数据缓存到一个队列中,再在流量计量算法的控制下“均匀”地发送这些被缓存的数据。当缓存队列满或者等待时间超过最大等待时间时,会设置channel.isWritable()方法返回false。
  • 读取:对通道的读取事件进行监控,计算得到读入速率,然后计算读等待时间wait,如果等待时间大于10ms,则设置通道的autoRead为false,并暂停通道读取,然后启动一个定时任务,等待wait时间后,设置autoRead为true,开始读通道。当暂停通道读事件的时候,会导致操作系统的端口缓存被占满(如果速率过快),因为TCP的滑动窗口机制,发送方会自动调节发送速率。

流量整形可能会增加延迟。

Netty提供了3种流量整形方式:

  • ChannelTrafficShapingHandler(通道流量整形):只对当前通道起作用
// writeLimit 写速率限制,单位bytes/s
// readLimit 读速率限制,单位bytes/s
// checkInterval 两次速率计算之间的时间间隔
// maxTime 流量速率过快时的最大等待时间
public ChannelTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime)
  • GlobalChannelTrafficShapingHandler(全局通道流量整形):一般用在服务器端,需要注意的是,全部通道需公用一个该类的对象。该类会对所有的通道和全局流量速率都做计算。
// executor:调度处理器,用来做速率计算、读写任务调度
// 下面的参数同上
public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
            long writeGlobalLimit, long readGlobalLimit,
            long writeChannelLimit, long readChannelLimit,
            long checkInterval, long maxTime)
  • GlobalTrafficShapingHandler(全局流量整形):该类和GlobalChannelTrafficShapingHandler的区别是,它只对全局流量做监控,对单个通道不做更细的监控。
// 同上
public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit,
            long checkInterval, long maxTime)

九、常用的协议

Netty支持的传输层协议有TCP、UDP等,本章主要介绍应用层协议,因此仅仅涉及Handler的使用。

Netty支持的应用层协议有:SSL、HTTP、WebSocket、FTP、SMTP等。

协议本身是一种人为设定的规约,就好比语言一样,两个人对话,只有使用相同的语言,才能更好的交流。

只需要在接受到数据后对数据进行解码,发送数据时对数据进行编码,而这两个操作均可在ChannelHandler中实现。

9.1 Http协议的使用

Netty中使用http协议非常方便,Netty本身提供了http的编解码器,只需要在pipeline中添加即可,需要注意的是:此类协议需要最先添加。

HttpServerCodec类继承了CombinedChannelDuplexHandler<HttpResponseDecoder, HttpRequestEncoder>,可以看到它是将Http编解码器进行了组合。也可以用HttpResponseDecoderHttpRequestEncoder替换HttpServerCodec,不过一般不必。

示例代码如下:

public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec())
                .addLast(new HttpObjectAggregator())
                .addLast(new MyHttpChannelInboundHandler());
    }

    public static class MyHttpChannelInboundHandler extends SimpleChannelInboundHandler<HttpObject> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
            if (msg instanceof HttpRequest) {
                HttpRequest request = (HttpRequest) msg;
                URI uri = new URI(request.uri());
                if ("/favicon.ico".equals(uri.getPath()))
                    return;
                ByteBuf byteBuf = Unpooled.buffer(100);
                byteBuf.writeCharSequence("Hello world!", StandardCharsets.UTF_8);

                HttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                        HttpResponseStatus.OK, byteBuf);
                httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
                HttpUtil.setContentLength(httpResponse, byteBuf.readableBytes());

                ctx.writeAndFlush(httpResponse);

            }
        }
    }
}

上面的代码中,创建了一个Channel初始化器,并且在pipeline中添加了HttpServerCodec对象、HttpObjectAggregator对象、自定义的MyHttpChannelInboundHandler对象。

HttpObjectAggregator的作用是将HttpMessage和HttpContent进行聚合,组成FullHttpRequest或者FullHttpResponse对象。

服务端Channel的初始化代码如下:

public class NettyServer {

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup boss = null;
        NioEventLoopGroup worker = null;
        try {
            boss = new NioEventLoopGroup(1);
            worker = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .handler(new DetectChannelHandler())
                    .childHandler(new MyChannelInitializer());
            ChannelFuture sync = serverBootstrap.bind(8080).sync();
            sync.channel().closeFuture().sync();
        } finally {
            if (boss != null) boss.shutdownGracefully();
            if (worker != null) worker.shutdownGracefully();
        }
    }

}

可以看到,初始化Channel时,同一般的TCP服务没有区别。

9.2 Http2

Http2具有如下特点:

1.二进制分帧层

img

如图所示,二进制分帧层是加载SSL/TLS(如果有)之上的,Http2将所有传输的信息分隔为更小的消息和帧,并采用二进制格式来对它们进行编码。

2.数据流、消息和帧
新的二进制分帧机制改变了客户端和服务端之间数据交换的方式。为了说明这个过程,需要了解下面三个概念:

  • 数据流(Stream):已建立的连接内的双向字节流,可以承载一条或者多条消息
  • 消息(Message):与逻辑请求或响应消息对应的一系列帧
  • 帧(Frame):Http2的最小通信单位,每个帧都包含有帧头,至少也会标识出当前帧所属的数据流

这些概念之间的关系如下:

  • 所有通信都是在一条TCP连接上完成,此连接可以承载任意数量的双向数据流
  • 每一个数据流都有一个唯一的标识符和优先级信息,用于承载双向消息
  • 每条消息都是一条逻辑Http消息(例如请求或响应),包含一个或多个帧
  • 帧是最小通信单位,承载特定类型的数据,如Http标头、消息负载等。来自不同数据流的帧可以交错发送,再根据每个帧头的数据流表示标识重新组装。

3.请求与响应复用

  • 并行交错地发送多个请求,请求之间互不影响
  • 并行交错地发送多个响应,响应之间互不干扰
  • 使用一个连接并行发送多个请求或响应
  • 消除不必要的连接和提高了现有网络容量的利用率,从而减少页面加载时间

4.头部压缩算法(HPack)
头部压缩算法需要再Http客户端和服务器端进行如下操作:

  • 维护一份相同的静态表,包含常见的头部名称以及特别常见的头部名称和值的组合
  • 维护一份相同的动态表,可以动态地添加内容
  • 基于静态哈夫曼码表的哈夫曼编码

头部压缩算法原理就是使用静态表和动态表对头部字段进行替换(用表索引),然后对于动态表中不存在的内容,还可以使用哈夫曼编码来减小体积。

5.协商机制
虽然http2是比http1更加优秀的协议,但是目前仍有很多公司使用http1.1或http1.0,因此需要协商机制来保障不同协议之间的兼容。

通过协商机制,如果双方都支持http2,则会进行协议升级(Upgrade)。

这部分实战内容暂时空着,有点麻烦。。

9.3 WebSocket

使用WebSocket可以实现如在线聊天室、在线推送等功能。

WebSocket协议是建立在http协议之上的,在建立websocket时会进行协议升级(upgrade),WebSocket数据传输的最小单位为帧Frame,数据将被承载到一个或多个帧(如果数据过大)中进行传输。

  1. 因为WebSocket是建立在Http协议之上的,因此使用的编解码器仍然是HttpServerCodec
  2. 如果数据过大,WebSocket会将数据分为多个帧,因此需要将他们聚合起来,因此需要使用HttpObjectAggregator(http也类似)
  3. 然后使用WebSocketServerProtocolHandler解析WebSocket数据帧

初始化Pipeline代码如下:

// 设置WebSocket的端点
WebSocketServerProtocolHandler webSocketServerProtocolHandler = new WebSocketServerProtocolHandler("/hello",
                                    null, true, false, 10000, null);
ch.pipeline()
        // 添加心跳检测
        .addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS))
        // 添加http编解码器
        .addLast(new HttpServerCodec())
        // 添加http对象聚合器
        .addLast(new HttpObjectAggregator(8192))
        // 添加WebSocket帧编解码器
        .addLast(webSocketServerProtocolHandler)
        // 添加文本帧处理器,继承自SimpleChannelInboundHandler<TextWebSocketFrame>
        .addLast(new WebSocketTextHandler())
        .addLast(new EchoTextHandler());

WebSocket的帧类型
帧的抽象为WebSocketFrame接口。

  • BinaryWebSocketFrame:包含二进制数据
  • CloseWebSocketFrame:Close帧,用于关闭连接
  • ContinuationWebSocketFrame:包含延续的文本或二进制数据
  • PingWebSocketFrame:Ping帧
  • PongWebSocketFrame:Pong帧
  • TextWebSocketFrame:包含文本数据

与websocket服务器通信的浏览器代码如下:

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
	<script type="text/javascript">
		var socket;
		if (!window.WebSocket) {
			window.WebSocket = window.MozWebSocket;
		}
		if (window.WebSocket) {
			socket = new WebSocket("ws://localhost:8080/ws");
			socket.onmessage = function(event) {
				var ta = document.getElementById('responseText');
				ta.value = ta.value + '\n' + event.data
			};
			socket.onopen = function(event) {
				var ta = document.getElementById('responseText');
				ta.value = "连接开启!";
			};
			socket.onclose = function(event) {
				var ta = document.getElementById('responseText');
				ta.value = ta.value + "连接被关闭";
			};
		} else {
			alert("你的浏览器不支持 WebSocket!");
		}

		function send(message) {
			if (!window.WebSocket) {
				return;
			}
			if (socket.readyState == WebSocket.OPEN) {
				socket.send(message);
			} else {
				alert("连接没有开启.");
			}
		}
	</script>
	<form onsubmit="return false;">
		<h3>WebSocket 聊天室:</h3>
		<textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
		<br> 
		<input type="text" name="message"  style="width: 300px" value="Welcome to waylau.com">
		<input type="button" value="发送消息" onclick="send(this.form.message.value)">
		<input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录">
	</form>
	<br> 
	<br> 
	更多例子请访问  waylau.com
</body>
</html>

十、测试

10.1 使用EmbeddedChannel测试ChannelHandler

首先导入Junit测试框架依赖包

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13.2</version>
    <scope>test</scope>
</dependency>

然后编写测试类,在这里我们测试了FixedLengthFrameDecoder固定长度解码器

public class TestHandler {
    @Test
    public void test() {
        EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(8));
        ByteBuf buf = Unpooled.copiedBuffer("124523452435534541225320".getBytes(StandardCharsets.UTF_8));
        channel.writeInbound(buf);
        ByteBuf bfuf;
        while ((bfuf = channel.readInbound()) != null) {
            System.out.println(bfuf);
        }
    }
}

测试函数的输出如下:

UnpooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: UnpooledHeapByteBuf(ridx: 24, widx: 24, cap: 24/24))
UnpooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: UnpooledHeapByteBuf(ridx: 24, widx: 24, cap: 24/24))
UnpooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: UnpooledHeapByteBuf(ridx: 24, widx: 24, cap: 24/24))

可以看到,FixedLengthFrameDecoder对输入的ByteBuf对象进行了正确的解码。

10.2 使用Apache JMeter来对网络程序进行压力测试

测试TCP连接

  1. 首先从Apache Jimeter处下载 jimeter程序包,然后运行
  2. 点击保存,保存测试计划
  3. 右键测试计划,选择add->Threads->Threads Group,设置 Thread Properties
  • Number of Threads(users):线程的数量,开启多少个线程,即模拟多少个用户

  • Ramp-up period(seconds):加速时间,即上述每个线程启动的间隔时间

  • Loop Count:循环的次数

  1. 配置好Thread Group后,鼠标右键Thread Group,选择add->Sampler->TCP Sampler,设置TCP连接的Server Name or Ip 和 port、Text to Send。

  2. 配置监听,鼠标右键 TCP Sampler,选择add->Linsteners->Summary Report

  3. 启动测试任务

十一、案例分析

11.1 RocketMQ

11.2 Eclipse Vert.x

Vert.x的API很友好,可以用极少的代码量实现所需的功能,个人认为它对于小项目非常友好,因此也非常适合微服务,SpringBoot对于微服务来说可能有点庞大,或许Vert.x刚好。

Vert.x底层使用Netty实现的,支持高并发。如果项目需要快速上线,或许Vert.x是一个不错的选择。

Vert.x更像是一个工具的集合,它隐藏了复杂的实现细节,只暴露了简单的调用接口就可实现功能。

img

标签:实战,Netty,null,WebSocket,long,new,解析,public
From: https://www.cnblogs.com/zolmk/p/17570799.html

相关文章

  • Java(从零到企业级电商项目实战)学习笔记
    资料网站:http://learning.happymmall.com/env.html一、mybatis三剑客:generator,plugin,pagehelperpagehelper->https://github.com/pagehelper/Mybatis-PageHelper二、spring例子:https://github.com/spring-projects/spring-mvc-showcasehttps://github.com/spring-proj......
  • R&S 图片或者文件数据解析
    当我们使用SCPI命令获取图片或者文件内容时,需要对返回的数据进行解析一般来说返回的数据以#6123456XXXXXXXXXXXXXXX。。。。开始现在对数据解析:#表示数据的开始#后的第一个数字为以多少位数字来表示文件的大小(字节长度)从以上的例子看6代表后面的6位数字代表文件数据的大小,所......
  • 小柏实战学习FineBI(图文教程二)
    前言:一定要知道百度,必应,谷歌这个三个网站,这三个不知道的话也要会使用ChatGPT,并且要学会看报错信息,学会优雅的提问. 本节课主题:快速上手做个报表出来,不然你怎么证明自己没有摸鱼? 一:打开FineBI后台:http://localhost:37799/webroot/decision#/ 二:在「我的分析>......
  • 解析机房IP代理优缺点
    IP代理可以分为两种类型:住宅IP代理和机房IP代理(也称为数据中心IP代理)。住宅代理是由互联网服务提供商(ISP)分配给家庭用户的真实IP地址,与用户的实际位置相关。而机房代理则是来自许多服务器机房提供的IP地址,与住宅代理不同,它并不由ISP拥有。今天,我们来看一下机房IP代理的优缺......
  • Go 语言入门指南: 环境搭建、基础语法和常用特性解析 | 青训营
    Go语言入门指南:环境搭建、基础语法和常用特性解析|青训营从零开始Go语言简介Go是一个开源的编程语言,它能让构造简单、可靠且高效的软件变得容易。Go是从2007年末由RobertGriesemer,RobPike,KenThompson主持开发,后来还加入了IanLanceTaylor,RussCox等人,并最终......
  • 活动回顾|阿里云 Serverless 技术实战与创新成都站回放 & PPT 下载
    7月29日“阿里云Serverless技术实战与创新”成都站圆满落幕。活动受众以关注Serverless技术的开发者、企业决策人、云原生领域创业者为主,活动形式为演讲、动手实操,让开发者通过一个下午的时间增进对Serverless技术的理解,快速上手Serverless,拥抱云计算新范式带来的技术红......
  • 活动回顾|阿里云 Serverless 技术实战与创新成都站回放 & PPT 下载
    7月29日“阿里云Serverless技术实战与创新”成都站圆满落幕。活动受众以关注Serverless技术的开发者、企业决策人、云原生领域创业者为主,活动形式为演讲、动手实操,让开发者通过一个下午的时间增进对Serverless技术的理解,快速上手Serverless,拥抱云计算新范式带来的技术......
  • Nuxt.JS实战指南:从入门到精通的练习之旅
    官网:Nuxt.js-Vue.js通用应用框架|Nuxt.js中文网搭建Nuxt2-参考文献:Nuxt-超详细环境搭建及创建项目整体流程(create-nuxt-app)_王佳斌的博客-CSDN博客一、为什么用NuxtSEO:所搜引擎优化1.1如何进行搜索引擎优化?多页面Title、描述、关键字网站内容1.2-预渲染1.2.1-预渲染图解1.2......
  • Unity的IFilterBuildAssemblies:深入解析与实用案例
    UnityIFilterBuildAssembliesUnityIFilterBuildAssemblies是Unity引擎中的一个非常有用的功能,它可以让开发者在构建项目时自定义哪些程序集需要被包含在构建中,哪些程序集需要被排除在建之外。这个功能可以帮助开发者更好地控制项目的构建过程,减少构建时间和构建大小。在本文中,......
  • go 语言实战入门案例之猜数字
    文章和代码已经归档至【Github仓库:<https://github.com/timerring/backend-tutorial>】或者公众号【AIShareLab】回复go也可获取。猜数字第一个例子里面,我们会使用Golang来构建一个猜数字游戏。在这个游戏里面,程序首先会生成一个介于1到100之间的随机整数,然后提示玩家进行......