首页 > 其他分享 >Netty进阶-协议

Netty进阶-协议

时间:2022-10-24 15:11:19浏览次数:73  
标签:Netty 00 进阶 协议 61 73 65 new buf

5.1、Redis协议
//redis协议测试
public class TestRedis {
    public static void main(String[] args) {
        final byte[] line = {13, 10};
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            ChannelFuture channelFuture = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(new LoggingHandler())
                                    .addLast(new ChannelInboundHandlerAdapter() {
                                        @Override
                                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                            ByteBuf buf = ctx.alloc().buffer();
                                            buf.writeBytes("*3".getBytes());
                                            buf.writeBytes(line);
                                            buf.writeBytes("$3".getBytes());
                                            buf.writeBytes(line);
                                            buf.writeBytes("set".getBytes());
                                            buf.writeBytes(line);
                                            buf.writeBytes("$4".getBytes());
                                            buf.writeBytes(line);
                                            buf.writeBytes("name".getBytes());
                                            buf.writeBytes(line);
                                            buf.writeBytes("$8".getBytes());
                                            buf.writeBytes(line);
                                            buf.writeBytes("zhangsan".getBytes());
                                            buf.writeBytes(line);
                                            ctx.writeAndFlush(buf);
                                        }

                                        @Override
                                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                            System.out.println("redis 回复:" + msg);
                                        }
                                    });
                        }
                    }).connect("124.220.70.93", 6379).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            group.shutdownGracefully();
        }
    }
}
5.2、Http协议
@Slf4j
public class TestHttp {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();

        try {
            ChannelFuture channelFuture = new ServerBootstrap()
                    .group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline()
                                    .addLast(new LoggingHandler())
                                    //http 编解码处理器
                                    .addLast(new HttpServerCodec())
                                    //只处理http请求头和请求行,要处理请求体,泛型为HttpContent
                                    .addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                                        @Override
                                        protected void channelRead0(ChannelHandlerContext ctx, HttpRequest request) throws Exception {
                                            //获取请求路径
                                            log.info(request.uri());

                                            //响应
                                            DefaultFullHttpResponse response =
                                                    new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);

                                            byte[] bytes = "<h1>hello world</h1>".getBytes();
                                            response.headers().setInt(CONTENT_LENGTH, bytes.length);
                                            response.content().writeBytes(bytes);
                                            ctx.writeAndFlush(response);
                                        }
                                    });
                        }
                    }).bind(8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}
5.3、自定义协议要素
  • 魔数,用来在第一时间判定是否是无效数据包
  • 版本号,可以支持协议的升级
  • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
  • 指令类型,是登录、注册、单聊、群聊... 跟业务相关
  • 请求序号,为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

编解码器

根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发

@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 1. 4 字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版本号,
        out.writeByte(1);
        // 3. 1 字节的序列化方式 jdk 0 , json 1
        out.writeByte(0);
        // 4. 1 字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 个字节的序列号
        out.writeInt(msg.getSequenceId());
        // 无意义,对齐填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容
        out.writeBytes(bytes);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois.readObject();
        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        out.add(message);
    }
}

测试

public class Test {
    public static void main(String[] args) throws Exception {

        //测试编码,对象--》ByteBuf
        EmbeddedChannel channel = new EmbeddedChannel(
                //LTC解码器,解决黏包半包,最后一个参数=0,是因为所有信息我们手动去解析了
                new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
                new LoggingHandler(),
                new MessageCodec()
        );
        LoginRequestMessage message = new LoginRequestMessage("zhangsang", "123456", "张三");
        channel.writeOutbound(message);

        //测试解码,ByteBuf-->对象
        //1.先编码,用来测试
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
        new MessageCodec().encode(null, message, buf);

        channel.writeInbound(buf);
        //codec.decode(null, buf, new ArrayList<>());
    }
}

结果

15:54:44.259 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 257B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 f1 |................|
|00000010| ac ed 00 05 73 72 00 2c 63 6f 6d 2e 6a 70 79 2e |....sr.,com.jpy.|
|00000020| 63 68 61 74 64 65 6d 6f 2e 6d 65 73 73 61 67 65 |chatdemo.message|
|00000030| 2e 4c 6f 67 69 6e 52 65 71 75 65 73 74 4d 65 73 |.LoginRequestMes|
|00000040| 73 61 67 65 ec 28 6f bb c5 26 64 b8 02 00 03 4c |sage.(o..&d....L|
|00000050| 00 08 6e 69 63 6b 6e 61 6d 65 74 00 12 4c 6a 61 |..nicknamet..Lja|
|00000060| 76 61 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 4c |va/lang/String;L|
|00000070| 00 08 70 61 73 73 77 6f 72 64 71 00 7e 00 01 4c |..passwordq.~..L|
|00000080| 00 08 75 73 65 72 6e 61 6d 65 71 00 7e 00 01 78 |..usernameq.~..x|
|00000090| 72 00 20 63 6f 6d 2e 6a 70 79 2e 63 68 61 74 64 |r. com.jpy.chatd|
|000000a0| 65 6d 6f 2e 6d 65 73 73 61 67 65 2e 4d 65 73 73 |emo.message.Mess|
|000000b0| 61 67 65 9e 0b 0e cd d6 f3 a9 f5 02 00 02 49 00 |age...........I.|
|000000c0| 0b 6d 65 73 73 61 67 65 54 79 70 65 49 00 0a 73 |.messageTypeI..s|
|000000d0| 65 71 75 65 6e 63 65 49 64 78 70 00 00 00 00 00 |equenceIdxp.....|
|000000e0| 00 00 00 74 00 06 e5 bc a0 e4 b8 89 74 00 06 31 |...t........t..1|
|000000f0| 32 33 34 35 36 74 00 09 7a 68 61 6e 67 73 61 6e |23456t..zhangsan|
|00000100| 67                                              |g               |
+--------+-------------------------------------------------+----------------+
15:54:44.259 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
15:54:44.265 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 257B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 f1 |................|
|00000010| ac ed 00 05 73 72 00 2c 63 6f 6d 2e 6a 70 79 2e |....sr.,com.jpy.|
|00000020| 63 68 61 74 64 65 6d 6f 2e 6d 65 73 73 61 67 65 |chatdemo.message|
|00000030| 2e 4c 6f 67 69 6e 52 65 71 75 65 73 74 4d 65 73 |.LoginRequestMes|
|00000040| 73 61 67 65 ec 28 6f bb c5 26 64 b8 02 00 03 4c |sage.(o..&d....L|
|00000050| 00 08 6e 69 63 6b 6e 61 6d 65 74 00 12 4c 6a 61 |..nicknamet..Lja|
|00000060| 76 61 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 4c |va/lang/String;L|
|00000070| 00 08 70 61 73 73 77 6f 72 64 71 00 7e 00 01 4c |..passwordq.~..L|
|00000080| 00 08 75 73 65 72 6e 61 6d 65 71 00 7e 00 01 78 |..usernameq.~..x|
|00000090| 72 00 20 63 6f 6d 2e 6a 70 79 2e 63 68 61 74 64 |r. com.jpy.chatd|
|000000a0| 65 6d 6f 2e 6d 65 73 73 61 67 65 2e 4d 65 73 73 |emo.message.Mess|
|000000b0| 61 67 65 9e 0b 0e cd d6 f3 a9 f5 02 00 02 49 00 |age...........I.|
|000000c0| 0b 6d 65 73 73 61 67 65 54 79 70 65 49 00 0a 73 |.messageTypeI..s|
|000000d0| 65 71 75 65 6e 63 65 49 64 78 70 00 00 00 00 00 |equenceIdxp.....|
|000000e0| 00 00 00 74 00 06 e5 bc a0 e4 b8 89 74 00 06 31 |...t........t..1|
|000000f0| 32 33 34 35 36 74 00 09 7a 68 61 6e 67 73 61 6e |23456t..zhangsan|
|00000100| 67                                              |g               |
+--------+-------------------------------------------------+----------------+
15:54:44.296 [main] INFO com.jpy.chatdemo.protocol.MessageCodec - 16909060,1,0,0,0,241
15:54:44.296 [main] INFO com.jpy.chatdemo.protocol.MessageCodec - ==========
15:54:44.296 [main] INFO com.jpy.chatdemo.protocol.MessageCodec - LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=zhangsang, password=123456, nickname=张三)
5.4、@Sharable

思考1:上述代码中,LoggingHandler和LengthFieldBasedFrameDecoder是否可以提取出来供多个channel使用。

LoggingHandler只是用来打印的,不保存信息,无状态的,可以多个线程channel共用。

LengthFieldBasedFrameDecoder用来编解码的,可能会有多个channel数据拼接到了一起,不可共用。

结论:@Sharable 注解的类可以共用,否则不可共用

思考2:自定义的MessageCodec可否使用,查看源码,继承了ByteToMessageCodec,ByteToMessageCodec的子类不能使用@Sharable,就不能共享

解决:继承MessageToMessageCodec<ByteBuf, Message>

@Slf4j
@ChannelHandler.Sharable
/*
 * 必须和LengthFieldBasedFrameDecoder一起使用,确定ByteBuf消息是完整的
 */
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
        ByteBuf buf = ctx.alloc().buffer();
        //1.魔数:4个字节
        buf.writeBytes(new byte[]{1, 2, 3, 4});
        //2.版本号:1个字节
        buf.writeByte(1);
        //3.序列化方式:1个字节
        //0:jdk 1:json
        buf.writeByte(0);
        //4.指令类型:1个字节
        buf.writeByte(msg.getMessageType());
        //5.请求序号:4个字节
        buf.writeInt(msg.getSequenceId());
        //无意义,对齐
        buf.writeByte(0xff);

        //msg-->bytebuf
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(baos);
        oos.writeObject(msg);
        byte[] bytes = baos.toByteArray();
        int length = bytes.length;
        //6.内容长度:4个字节
        buf.writeInt(length);
        //7.内容
        buf.writeBytes(bytes);

        out.add(buf);

    }

    //解码:Bytebuf-->Message
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serizal = in.readByte();
        byte reqType = in.readByte();
        int reqNum = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois.readObject();

        log.info("{},{},{},{},{},{}", magicNum, version, serizal, reqType, reqNum, length);
        log.info("==========");
        log.info("{}", message);

        out.add(message);
    }
}

标签:Netty,00,进阶,协议,61,73,65,new,buf
From: https://www.cnblogs.com/jpymll/p/16821516.html

相关文章

  • Netty入门-ButeBuf
    3.5、ByteBuf3.5.1、创建//结果:初始容量256,扩容到512//PooledUnsafeDirectByteBuf(ridx:0,widx:0,cap:256)//PooledUnsafeDirectByteBuf(ridx:0,widx:300,cap......
  • Netty入门-Future & Promise
    3.3、Future&PromiseNetty中的Future与Jdk中Future同名,但是是两个接口,继承关系:Promise---extends-->Future(Netty)-----extend--->Future(JDK)区别:jdkFuture只能......
  • Qt使用https协议发送带参数的post请求
    背景:现在公司项目需要做一个检测更新的功能,通过访问后台接口,判断是否需要更新。后台接口是https协议的,接口需要post请求,需要带参数,来判断是哪个软件需要做检测更新的操......
  • Netty入门-Handler & Pipeline
    3.4、Handler&PipelineChannelHandler用来处理Channel上的各种事件,分为入站、出站两种。所有ChannelHandler被连成一串,就是Pipeline入站处理器通常是ChannelIn......
  • Netty入门-Hello World
    Netty入门1、Netty优势NettyVSNIO,工作量大,bug多需要自己构建协议解决TCP传输问题,如黏包,半包epoll空轮询导致cpu100%对API进行增强,ThreadLocal-->FastThreadLocal......
  • 标准SPI协议应用笔记:四种模式区分(CPOL/CPHA)
    前言标准的四线SPI包括CS、MISO、MOSI、CLK四根线,同时根据时钟线的相位和极性可以分为四种模式CPOL代表时钟的极性Polarity,即时钟线有效时(active)和无效时(inacitve)的电平......
  • 【kubernetes入门到精通】Kubernetes架构分析介绍篇「进阶篇」
    意志的出现不是对愿望的否定,而是把愿望合并和提升到一个更高的意识水平上。——罗洛·梅官方网站​​Kubernetes中文官方网站​​​​Kubernetes英文官方网站​​Kubernetes......
  • Prometheus之PromQL进阶
    一Prometheus的聚合函数一般来说,单个指标的价值不大,监控场景中往往需要联合并可视化一组指标,这种联合机制即是指“聚合”操作,例如,将计算数、求和、平均值。分位数、标准差......
  • HTTP协议
    一、简介HTTP(HypertextTransferProtocol)超文本传输协议,帮助客户端访问万维网,网页浏览器通过翻译HTML(超文本标识语言)文件来表现文本、图像等对象。 客户端通过浏览器......
  • CSMA/CD协议
    由于早期以太网连接在一根总线上,在同一时间只能有有一台计算机发送数据,所以就采用载波监听多点接入/碰撞检测。多点接入:总线型网络,许多计算机连入总线。载波监听:边发送边......