首页 > 其他分享 >Netty编解码&粘包拆包&心跳机制&断线自动重连

Netty编解码&粘包拆包&心跳机制&断线自动重连

时间:2023-08-19 20:03:20浏览次数:42  
标签:Netty protostuff 编解码 pipeline 粘包 new channel 客户端

Netty编解码
Netty涉及到编解码的组件有Channel、ChannelHandler、ChannelPipe等,先大概了解下这几个组件的作用。
ChannelHandler
ChannelHandler充当了处理入站和出站数据的应用程序逻辑容器。例如,实现ChannelInboundHandler接口(或
ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据随后会被你的应用程序的业务逻辑处理。当你要给连接
的客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。你的业务逻辑通常写在一个或者多个ChannelInboundHandler中。
ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的。
ChannelPipeline
ChannelPipeline提供了ChannelHandler链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这
些事件为出站的,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler(ChannelOutboundHandler
调用是从tail到head方向逐个调用每个handler的逻辑),并被这些Handler处理,反之则称为入站的,入站只调用pipeline里的
ChannelInboundHandler逻辑(ChannelInboundHandler调用是从head到tail方向逐个调用每个handler的逻辑)。

 

编码解码器
当你通过Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java对
象);如果是出站消息,它会被编码成字节。
Netty提供了一系列实用的编码解码器,他们都实现了ChannelInboundHadnler或者ChannelOutboundHandler接口。在这些类中,
channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由已知解码器
所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。
Netty提供了很多编解码器,比如编解码字符串的StringEncoder和StringDecoder,编解码对象的ObjectEncoder和ObjectDecoder
等。
如果要实现高效的编解码可以用protobuf,但是protobuf需要维护大量的proto文件比较麻烦,现在一般可以使用protostuff。
protostuff是一个基于protobuf实现的序列化方法,它较于protobuf最明显的好处是,在几乎不损耗性能的情况下做到了不用我们
写.proto文件来实现序列化。使用它也非常简单,代码如下:
引入依赖:

<dependency>
     <groupId>com.dyuproject.protostuff</groupId>
     <artifactId>protostuff‐api</artifactId>
     <version>1.0.10</version>
</dependency>
<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff‐core</artifactId>
    <version>1.0.10</version>
</dependency>
<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff‐runtime</artifactId>
    <version>1.0.10</version>
</dependency>
View Code

protostuff使用示例:

import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* protostuff 序列化工具类,基于protobuf封装
*/
public class ProtostuffUtil {

    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();

    private static <T> Schema<T> getSchema(Class<T> clazz) {
        @SuppressWarnings("unchecked")
        Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
        if (schema == null) {
            schema = RuntimeSchema.getSchema(clazz);
            if (schema != null) {
                cachedSchema.put(clazz, schema);
            }
        }
        return schema;
    }

    /**
    * 序列化
    *
    * @param obj
    * @return
    */
    public static <T> byte[] serializer(T obj) {
        @SuppressWarnings("unchecked")
        Class<T> clazz = (Class<T>) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try {
            Schema<T> schema = getSchema(clazz);
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
    }

    /**
    * 反序列化
    *
    * @param data
    * @param clazz
    * @return
    */
    public static <T> T deserializer(byte[] data, Class<T> clazz) {
        try {
            T obj = clazz.newInstance();
            Schema<T> schema = getSchema(clazz);
            ProtostuffIOUtil.mergeFrom(data, obj, schema);
            return obj;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    public static void main(String[] args) {
        byte[] userBytes = ProtostuffUtil.serializer(new User(1, "zhuge"));
        User user = ProtostuffUtil.deserializer(userBytes, User.class);
        System.out.println(user);
    }
}
View Code

Netty粘包拆包
TCP是一个流协议,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区
的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成
一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。面向流的通信是无消息保护边界的。
如下图所示,client发了两个数据包D1和D2,但是server端可能会收到如下几种情况的数据。

 

解决方案
1)消息定长度,传输的数据大小固定长度,例如每段的长度固定为100字节,如果不够空位补空格
2)在数据包尾部添加特殊分隔符,比如下划线,中划线等,这种方法简单易行,但选择分隔符的时候一定要注意每条数据的内部一定不
能出现分隔符。
3)发送长度:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度
来判断每条数据的开始和结束。
Netty提供了多个解码器,可以进行分包的操作,如下:
LineBasedFrameDecoder (回车换行分包)
DelimiterBasedFrameDecoder(特殊分隔符分包)
FixedLengthFrameDecoder(固定长度报文来分包)
自定义长度分包解码器,参见项目示例com.tuling.netty.split包下代码
Netty心跳检测机制
所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.
在 Netty 中, 实现心跳机制的关键是 IdleStateHandler, 看下它的构造器:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

这里解释下三个参数的含义:
readerIdleTimeSeconds: 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的
IdleStateEvent 事件.
writerIdleTimeSeconds: 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的
IdleStateEvent 事件.
allIdleTimeSeconds: 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.
注:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:

IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)

要实现Netty服务端心跳检测机制需要在服务器端的ChannelInitializer中加入如下的代码:

pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));

初步地看下IdleStateHandler源码,先看下IdleStateHandler中的channelRead方法:

 

红框代码其实表示该方法只是进行了透传,不做任何业务逻辑处理,让channelPipe中的下一个handler处理channelRead方法
我们再看看channelActive方法:

 这里有个initialize的方法,这是IdleStateHandler的精髓,接着探究:

 这边会触发一个Task,ReaderIdleTimeoutTask,这个task里的run方法源码是这样的:

 

第一个红框代码是用当前时间减去最后一次channelRead方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead已经是6s
之前的事情了,你设置的是5s,那么nextDelay则为-1,说明超时了,那么第二个红框代码则会触发下一个handler的
userEventTriggered方法:

 如果没有超时则不触发userEventTriggered方法。

Netty心跳检测代码示例:

//服务端代码
 public class HeartBeatServer {

     public static void main(String[] args) throws Exception {
         EventLoopGroup boss = new NioEventLoopGroup();
         EventLoopGroup worker = new NioEventLoopGroup();
         try {
             ServerBootstrap bootstrap = new ServerBootstrap();
             bootstrap.group(boss, worker)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline pipeline = ch.pipeline();
                     pipeline.addLast("decoder", new StringDecoder());
                     pipeline.addLast("encoder", new StringEncoder());
                     //IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接,
                     //会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须
                     //实现userEventTriggered方法处理对应事件
                     pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
                     pipeline.addLast(new HeartBeatHandler());
                 }
             });
             System.out.println("netty server start。。");
             ChannelFuture future = bootstrap.bind(9000).sync();
             future.channel().closeFuture().sync();
         } catch (Exception e) {
            e.printStackTrace();
         } finally {
             worker.shutdownGracefully();
             boss.shutdownGracefully();
         }
     }
 }
View Code
//服务端处理handler
 public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {

     int readIdleTimes = 0;

     @Override
     protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
         System.out.println(" ====== > [server] message received : " + s);
         if ("Heartbeat Packet".equals(s)) {
            ctx.channel().writeAndFlush("ok");
         } else {
            System.out.println(" 其他信息处理 ... ");
         }
    }

     @Override
     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
         IdleStateEvent event = (IdleStateEvent) evt;
         String eventType = null;
         switch (event.state()) {
             case READER_IDLE:
                 eventType = "读空闲";
                 readIdleTimes++; // 读空闲的计数加1
                 break;
             case WRITER_IDLE:
                 eventType = "写空闲";
                 // 不处理
                 break;
             case ALL_IDLE:
                 eventType = "读写空闲";
                 // 不处理
                 break;
         }
         System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);
         if (readIdleTimes > 3) {
             System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
             ctx.channel().writeAndFlush("idle close");
             ctx.channel().close();
         }
     }

     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
     }
 }
View Code
//客户端代码
 public class HeartBeatClient {
     public static void main(String[] args) throws Exception {
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
         try {
             Bootstrap bootstrap = new Bootstrap();
             bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline pipeline = ch.pipeline();
                     pipeline.addLast("decoder", new StringDecoder());
                     pipeline.addLast("encoder", new StringEncoder());
                     pipeline.addLast(new HeartBeatClientHandler());
                 }
             });

             System.out.println("netty client start。。");
             Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel();
             String text = "Heartbeat Packet";
             Random random = new Random();
             while (channel.isActive()) {
                 int num = random.nextInt(10);
                 Thread.sleep(num * 1000);
                 channel.writeAndFlush(text);
             }
         } catch (Exception e) {
            e.printStackTrace();
         } finally {
            eventLoopGroup.shutdownGracefully();
         }
     }

     static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {

         @Override
         protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
             System.out.println(" client received :" + msg);
             if (msg != null && msg.equals("idle close")) {
                 System.out.println(" 服务端关闭连接,客户端也关闭");
             ctx.channel().closeFuture();
            }
         }
     }
 }
View Code

 

Netty断线自动重连实现(省略)
1、客户端启动连接服务端时,如果网络或服务端有问题,客户端连接失败,可以重连,重连的逻辑加在客户端。
2、系统运行过程中网络故障或服务端故障,导致客户端与服务端断开连接了也需要重连,可以在客户端处理数据的Handler的
channelInactive方法中进行重连。

 

标签:Netty,protostuff,编解码,pipeline,粘包,new,channel,客户端
From: https://www.cnblogs.com/ladeng19/p/17642964.html

相关文章

  • Netty核心功能与线程模型
    Netty初探NIO的类库和API繁杂,使用麻烦:需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。开发工作量和难度都非常大:例如客户端面临断线重连、网络闪断、心跳处理、半包读写、网络拥塞和异常流的处理等等。Netty对JDK自带的NIO的API进行......
  • Netty源码学习2——NioEventLoop的执行
    系列文章目录和关于我零丶引入在《Netty源码学习1——NioEventLoopGroup的初始化》中,我们学习了NioEventLoopGroup和NioEventLoop的初始化,在下面netty服务端启动的demo中会在ServerBootStrap中指定Channel为Nio类型的Channel,然后启动的时候绑定端口,之前我们解释道NioEventLoop......
  • linux系统句柄限制调整,当使用netty/socket触发达到系统最大连接数时查看
    socket原理:客户端使用tcp端口连接至服务端,服务端会打开一个句柄文件和客户端保持连接,注意并不是一个连接就会占用一个服务器端口,所以socket连接数跟系统端口最大连接数无关,不然系统防火墙不就没啥用,默认系统每个进程打开的句柄是有限制的,另外整个系统还有一个句柄限制总数,所以soc......
  • Netty源码学习1——NioEventLoopGroup的初始化
    系列文章目录和关于我零丶引入netty源码学习中,大家maybe都接触到如下的helloworld——netty客户端启动的demo:映入眼帘的第一个类就是NioEventLoopGroup,很多文章上来就是是Netty中的核心类,啥Channel,Pipeline,Context,Boostrap一通劈里啪啦,我看起来比较费劲。so本文不会上来就给......
  • 解码Transformer:自注意力机制与编解码器机制详述与代码实现
    本文全面探讨了Transformer及其衍生模型,深入分析了自注意力机制、编码器和解码器结构,并列举了其编码实现加深理解,最后列出基于Transformer的各类模型如BERT、GPT等。文章旨在深入解释Transformer的工作原理,并展示其在人工智能领域的广泛影响。作者TechLead,拥有10+年互联网服......
  • 网络传输数据的编解码
    网络传输数据的类型(二进制)网络传输是以二进制数据进行传输的,因此在网络传输数据的时候,数据需要先编码转化为二进制(bytes)数据类型数据的编解码 在Python中进行网络数据传输编解码通常涉及到将数据转换为字节流进行传输,并在接收方将字节流转换回原始数据。编码编码是将数据......
  • netty_2、Java NIO
    参考:NIO详解(Java):https://juejin.cn/post/6844903605669986317ByteBuffer(内存缓冲区):https://blog.csdn.net/u010430495/article/details/860871541、NIO基础JavaIO是阻塞的,当用户进行数据读写时,首先会由系统去等待数据准备(查看内核空间中有没有数据),等待内核数据准备......
  • C# 使用TouchSocket实现Tcp协议通讯,并且解决分包、粘包的问题
    我们知道如果Socket传输数据太频繁并且数据量级比较大,就很容易出现分包(一个包的内容分成了两份)、粘包(前一个包的内容分成了两份,其中一份连着下一个包的内容)的情况。粘包的处理方式有很多种,常见的三种是:每个包都在头部增加一个当前传输包的int4字节大小作为包头。每次接收到数据......
  • C# 使用FFmpeg.Autogen对byte[]进行编解码
    C#使用FFmpeg.Autogen对byte[]进行编解码,参考:https://github.com/vanjoge/CSharpVideoDemo入口调用类:usingSystem;usingSystem.IO;usingSystem.Drawing;usingSystem.Runtime.InteropServices;usingFFmpeg.AutoGen;namespaceFFmpegAnalyzer{publicclassFFm......
  • SpringBoot Netty socket使用
    SpringBootNettysocket使用Netty是由JBOSS提供的一个java开源框架,现为Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。这里springBoot整合起来使用测试,性能怎么的不怎么了解,至少能用maven引用依......