首页 > 其他分享 >Netty解决粘包半包问题

Netty解决粘包半包问题

时间:2024-12-22 22:57:48浏览次数:5  
标签:Netty 字节 new 粘包 header 消息 byteBuf 长度 半包

1.定长,每次读取固定的数据量

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new FixedLengthFrameDecoder(10)); // 每条消息长度固定为10字节
pipeline.addLast(new YourBusinessHandler());

每条消息长度固定,接收端读取固定字节数作为一个完整的消息。

  • 粘包问题: 即使多个消息被合并在一起,定长解码器可以通过固定的长度正确拆分数据。

  • 半包问题: 如果数据不完整,Netty 会等待剩余数据到达再进行组装。

  • 不灵活,只适用于固定长度的协议。

    如果消息长度不一致,需要填充或裁剪数据,浪费存储空间。

2.分隔符

每条消息使用特定的分隔符(如 \n)进行分隔。

粘包问题: 分隔符明确了每条消息的边界,无论消息是否粘连都可以正确拆分。

半包问题: 如果分隔符未到达,Netty 会缓冲当前数据,等待剩余数据到达后组装完整消息。

  • 如果分隔符是消息内容的一部分,可能会导致解析错误。

  • 每条消息都需要附加分隔符,增加了一些开销。

ChannelPipeline pipeline = ch.pipeline();
ByteBuf delimiter = Unpooled.copiedBuffer("\n".getBytes()); // 使用换行符作为分隔符
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
pipeline.addLast(new YourBusinessHandler());

3.基于长度字段的解决方案

消息头中包含一个字段表示消息体的长度,接收端根据长度字段解析完整的消息。

粘包问题: 长度字段明确了每条消息的大小,即使多条消息粘连,解码器可以逐条解析。

半包问题: 如果接收的数据不足以包含完整消息,Netty 会缓冲数据,等待剩余部分到达再处理。

  • 适用于变长消息,灵活且高效。

  • 不依赖分隔符,节省了数据的额外开销。

  • 需要协议设计时明确定义长度字段。

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(
    1024, 0, 4, 0, 4)); 
    // 最大帧长度1024,长度字段偏移量为0,长度字段长度为4字节,无附加偏移,去掉长度字段的头部字节
pipeline.addLast(new YourBusinessHandler());

一共五个参数

maxFrameLength:帧的最大长度

lengthFieldOffset:长度的偏移量,用于获取长度,比如你先写入长度4个字节,那就无需偏移,因为是先写入的,所以直接读长度的字节大小可以直接得到。如果先写了header,比如CAFE占用了1个字节,那长度偏移量就得是1了,此时他会跳过从头开始数的第一个字节,然后读取消息的长度。有了消息的长度就可以读取消息了。

lengthFieldLength:长度占用的字节大小,跟上面lengthFieldOffset的来确定发送消息的长度。

initialBytesstrip:剥离字节长度,比如我要剥离掉这个header1字节,还有这个记录长度的字节4字节,我需要指定剥离的字节大小(1字节+4字节),就能只留下消息了。

lengthAdjustment:他指的是从长度之后应该跳过几个字节的内容,比如我在长度和消息之间又加了一个版本号1字节

现在组成:header 1字节 ,长度 4字节 ,版本号 1字节 ,消息("helloworld")

此时maxFrameLength=1024

lengthFieldOffset=1(跳过header)

lengthFieldLength=4(长度的字节大小)

lengthAdjustment=1(长度以后跳过1个字节才是消息)

initialBytesstrip=6 (去掉header、长度、版本号共6个字节)此时才会得到真正的helloworld

4.自定义协议

如果协议复杂或者不符合上述通用解码器的场景,可以手动编写解码器。

通过分析接收到的数据流,根据协议规则解析完整的消息。

粘包问题: 自定义逻辑中明确解析每条消息的边界。

半包问题: 使用 Netty 提供的缓冲区特性,确保接收完整数据后再解析。

@Slf4j
public class ProtocolMessageEncoderDecoder extends ByteToMessageCodec<ProtocolMessage<?>> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMessage<?> protocolMessage, ByteBuf byteBuf) throws Exception {
        //判断protocolMessage为空
        if(protocolMessage == null || protocolMessage.getHeader() == null){
            byteBuf.writeBytes(new byte[]{});
        }
        //得到请求头
        ProtocolMessage.Header header = protocolMessage.getHeader();

        //拼装请求
        //魔数 1字节
        byteBuf.writeBytes(new byte[]{header.getMagic()});


        //版本号 1字节
        byteBuf.writeBytes(new byte[]{header.getVersion()});
        //序列化器 1字节
        byteBuf.writeBytes(new byte[]{header.getSerializer()});
        //类型 1字节
        byteBuf.writeBytes(new byte[]{header.getType()});
        //状态 1字节
        byteBuf.writeBytes(new byte[]{header.getStatus()});
        //请求id 8字节
        byteBuf.writeLong(header.getRequestId());
        //消息体长度 4字节
//        byteBuf.writeInt(header.getBodyLength());
        // 获取序列化器
        ProtocolMessageSerializerEnum enumByKey = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
        if(enumByKey == null){
            throw new RuntimeException("序列化协议不存在:"+ enumByKey.getValue());
        }
        //利用key得到序列化器
        Serializer serializer = SerializerFactory.getInstance(enumByKey.getValue());
        //序列化请求体
        byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());
        //写入请求体长度
        byteBuf.writeInt(bodyBytes.length);
        //写入请求体
        byteBuf.writeBytes(bodyBytes);
        //完成自定义协议编码
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        // 分别从指定位置读出 Buffer
        ProtocolMessage.Header header = new ProtocolMessage.Header();
        //魔数
        byte magic = byteBuf.readByte();
        //校验魔数
        if(magic != ProtocolConstant.PROTOCOL_MAGIC){
//            throw new RuntimeException("消息 magic 非法" + magic);
            throw new RpcException(ErrorCode.ConsumerError,"消息 magic 非法" + magic);
        }
        //版本号
        byte version = byteBuf.readByte();
        //序列化器
        byte serializer = byteBuf.readByte();
        //类型
        byte type = byteBuf.readByte();
        //状态
        byte status = byteBuf.readByte();
        //请求id
        long RequestId = byteBuf.readLong();
        //消息体长度
        int BodyLength = byteBuf.readInt();
        //写入header
        header.setMagic(magic);
        header.setSerializer(serializer);
        header.setVersion(version);
        header.setType(type);
        header.setStatus(status);
        header.setRequestId(RequestId);

        header.setBodyLength(BodyLength);
        //获得请求体数据,以上一共17个字节
        byte[] bodyBytes = new byte[BodyLength];

        //写入请求体数据, 解决粘包问题,只读指定长度的数据
        byteBuf.readBytes(bodyBytes,0,BodyLength);
        //解析消息体
        // 获取序列化器
        ProtocolMessageSerializerEnum serializerEnum   = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
        if(serializerEnum  == null){
        throw new RuntimeException("序列化消息的协议不存在");
        }
        //得到序列化器
        Serializer serializer1 = SerializerFactory.getInstance(serializerEnum .getValue());
        ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByKey(header.getType());
        if (messageTypeEnum == null) {
            throw new RuntimeException("序列化消息的类型不存在");
        }
        switch (messageTypeEnum) {
            case REQUEST:
                RpcRequest request  = serializer1.deserialize(bodyBytes, RpcRequest.class);
                list.add(new ProtocolMessage<>(header,request));
                return;
            case RESPONSE:
                RpcResponse response = serializer1.deserialize(bodyBytes, RpcResponse.class);
                list.add(new ProtocolMessage<>(header,response));
                return;
            case HEART_BEAT:return;
            case OTHERS:return;
            default:
                throw new RuntimeException("暂不支持该消息类型");
        }
    }


}

灵活,适用于复杂协议。

方法粘包问题解决半包问题解决使用场景优缺点
定长消息精确分割等待补充数据消息固定长度的协议简单但不灵活,适合定长数据
分隔符根据分隔符拆分等待完整数据使用明确分隔符的协议实现简单,但需要额外的分隔符
长度字段按长度截取等待完整数据消息中包含长度字段的协议灵活高效,适合大多数场景
自定义解码器自行定义逻辑自行定义逻辑协议复杂或不规则灵活性最高,但开发成本高

标签:Netty,字节,new,粘包,header,消息,byteBuf,长度,半包
From: https://blog.csdn.net/format_push/article/details/144654373

相关文章

  • 千峰教育--Netty 再学习 1 网络模型概述(BIO、NIO、AIO)、BIO 逻辑实现及其局限性(单线程
    课程介绍1网络模型概述2Channel详解3Buffer详解4Selector详解5NIO综合案例-聊天室6AIO概念及实现 1网络编程IO模型介绍1.1BLockingIOBlockingIO也称BIO,及同步阻塞IO。Java的io包基于流模型实现,提供了FIle,FileInputStream,FileOutputStream等输入输出流......
  • Netty的高性能之道
    一、概述1.1惊人的性能数据最近一个圈内朋友通过私信告诉我,通过使用Netty4+Thrift压缩二进制编解码技术,他们实现了10WTPS(1K的复杂POJO对象)的跨节点远程服务调用。相比于传统基于Java序列化+BIO(同步阻塞IO)的通信框架,性能提升了8倍多。事实上,我对这个数据并不感到惊讶,根据我5......
  • netty echo例子
    netty使用方法:1.选择事件处理线程池EventLoopGroup,要与下面的管道选择对应名称,服务端要两个(一个是接收客户端连接,另一个是处理客户端请求),客户端只需要一个(处理客户端请求)2.创建Bootstrap对象,配置事件处理线程池(上面new的Group)3.设置管道(有NioSocketChannel,Ep......
  • Netty出现的异常【已解决】:An exceptionCaught() event was fired, and it reached at
    修改方案:byteBuf.retain();ByteBuf后面添加这个retain(),这个添加原则是这样,如果你消耗了一次ByteBuf,你的下游Handler还需要再次消耗的话,就需要添加这个retain(),以此类推,一定要要注意添加的时机,不然的话可能需要自己手动释放;原因分析:这个错误是因为ByteBuf的已经被逻辑释放......
  • Netty网络框架详细讲解
    一、Netty基本内容1.什么是netty?Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。异步的:事件驱动:基于JavaNIO(Non-blockingI/O)的Selector实现的。Netty的核心设计目标是:高性能:充分利用JavaNIO的非阻塞特性。可扩......
  • 【Netty】IO模型
    官方参考:https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf 1)BIO:一个连接一个线程,客户端有连接请求时服务器端就需要启动一个线程进行处理,线程开销大。2)NIO:一个请求一个线程,客户端发送的连接请求会注册到多路复用器上,多路复用器轮询到该连接有I/O请求时才启动一个线程进行处理......
  • TCP 数据传输的拆包和粘包了解吗?
    前言:上一篇我们了解了什么是TCP协议,以及TCP协议3次握手4次挥手的原因,本篇来分享一下TCP数据的传输过程的拆包和粘包,以及TCP数据传输过程中的一些细节。计算机网络往期文章TCP为什么是3次握手4次挥手?TCP数据是如何发送接收的?我们知道TCP是传输层协议......
  • Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (服务器端)
    录Netty源码分析之番外篇JavaNIO的前生今世JavaNIO的前生今世之一简介JavaNIO的前生今世之二NIOChannel小结JavaNIO的前生今世之三NIOBuffer详解JavaNIO的前生今世之四NIOSelector详解Netty源码分析之零磨刀不误砍柴工源码分......
  • Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (客户端)
    永顺 2016-10-26阅读 20 分钟41 目录Netty源码分析之番外篇JavaNIO的前生今世JavaNIO的前生今世之一简介JavaNIO的前生今世之二NIOChannel小结JavaNIO的前生今世之三NIOBuffer详解JavaNIO的前生今世之四N......
  • Netty 源码分析之 二 贯穿Netty 的大动脉 ── ChannelPipeline (一)
    目录源码之下无秘密──做最好的Netty源码分析教程Netty源码分析之番外篇JavaNIO的前生今世JavaNIO的前生今世之一简介JavaNIO的前生今世之二NIOChannel小结JavaNIO的前生今世之三NIOBuffer详解JavaNIO的前生今世之四NIOSelector......