首页 > 其他分享 >Netty进阶

Netty进阶

时间:2023-01-04 23:24:11浏览次数:43  
标签:Netty 进阶 netty void io new import channel

1.Netty问题

TCP协议都存在着黏包和半包问题,但是UDP没有

1.粘包现象

发送方分10次发送,接收方一次接受了10次发送的消息

image-20221222122213019

2.半包现象

调整服务器的接受缓冲区大小(调小)

半包会导致服务器在1次接受时,无法接收到客户端1次的数据

image-20221222122352606

3.TCP滑动窗口

  1. 粘包:

    • 滑动窗口:假设一个报文为256字节,如果接收方处理不及时(没有及时把报文处理掉),并且滑动窗口的大小足够大,那么这256字节就会缓冲在窗口中,多个
    • Nagle算法:攒够一批足够再发,也会造成粘包
    • 应用层:ByteBuf设置很大,Netty默认时1024
  2. 半包:

    • 滑动窗口:接收方窗口只有128字节,发送方发送256字节,放不下只能先发送前128,等待ack后发送剩余的部分,造成了半包
    • MSS(链路层):(网卡/网络设备)发送数据超过MSS限制,会将数据切分发送,造成半包
    • 应用层:接收方ByteBuf小于实际的发送数据量

4.代码案例

Server:

package com.yikolemon.P91Question;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * 黏包现象 : 不调 服务端 接收缓冲区 SO_RCVBUF
 * 半包现象 : serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
 *            影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍
 */
@Slf4j
public class HelloWorldServer {

    void start(){
        final NioEventLoopGroup boss = new NioEventLoopGroup();
        final NioEventLoopGroup worker = new NioEventLoopGroup();

        try{
            final ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            // head<<<<<<<<<
            // 接收缓冲区 设置字节 【使 发生半包】
            serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
            // =============
            // end>>>>>>>>>>
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                }
            });

            final ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
            channelFuture.channel().closeFuture().sync();

        }catch (InterruptedException e)
        {
            log.error("server error", e);

        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new HelloWorldServer().start();
    }

}

Client:

package com.yikolemon.P91Question;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HelloWorldClient {

    static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);

    public static void main(String[] args) {

        final NioEventLoopGroup worker = new NioEventLoopGroup();

        try{
            final Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        // channel连接建立好之后 出发 channelActive() 时间
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            // super.channelActive(ctx);
                            for (int i = 0; i < 10; i++) {
                                final ByteBuf buf = ctx.alloc().buffer(16);
                                buf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15});
                                ctx.writeAndFlush(buf);
                            }
                        }
                    });
                }
            });

            final ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e)
        {
            log.error("Client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }
}

2.问题解决方案

1.短连接

能够解决粘包问题,但不能解决半包问题,当Server的接受缓冲区很小,就会产生半包问题

一次发送就断开连接,不会产生粘包的问题

Clinet代码:

package com.yikolemon.P91Question;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HelloWorldClient {

    static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);

    public static void main(String[] args) {
        //发送10次短连接
        for (int i = 0; i < 10; i++) {
            send();
        }
    }
    public static void send() {

        final NioEventLoopGroup worker = new NioEventLoopGroup();

        try{
            final Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        // channel连接建立好之后 出发 channelActive() 时间
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            // super.channelActive(ctx);
                                final ByteBuf buf = ctx.alloc().buffer(16);
                                buf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15});
                                ctx.writeAndFlush(buf);
                                ctx.channel().close();
                        }
                    });
                }
            });
            final ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e)
        {
            log.error("Client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }
}

2.FixedLengthFrameDecoder定长帧解码器

FixedLengthFrameDecoder是Netty提供的一个解码器。发送方发送指定长度的消息,接收方如果接受不到定长,那么就会等到后面消息补齐最后拼接成为指定的长度。

Client:

package com.yikolemon.P91Question;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.Random;

@Slf4j
public class SloveClient {
    public static byte[] fill10Bytes(char c, int len) {
        byte[] bytes = new byte[10];
        Arrays.fill(bytes, (byte) '_');
        for (int i = 0; i < len; i++) {
            bytes[i] = (byte) c;
        }
        System.out.println(new String(bytes));
        return bytes;
    }

    public static void main(String[] args) {
        send();
    }
    public static void send() {

        final NioEventLoopGroup worker = new NioEventLoopGroup();

        try{
            final Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        // channel连接建立好之后 出发 channelActive() 时间
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            //把所有的数据全都写到一个Buf中,让服务器去解码
                            ByteBuf buf = ctx.alloc().buffer();
                            char c='0';
                            for (int i = 0; i < 10; i++) {
                                byte[] bytes = fill10Bytes(c, new Random().nextInt(10) + 1);
                                buf.writeBytes(bytes);
                                c++;
                            }
                            ctx.writeAndFlush(buf);
                        }
                    });
                }
            });
            final ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e)
        {
            log.error("Client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }
}

Server代码:

package com.yikolemon.P91Question;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * 黏包现象 : 不调 服务端 接收缓冲区 SO_RCVBUF
 * 半包现象 : serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
 *            影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍
 */
@Slf4j
public class HelloWorldServer {

    void start(){
        final NioEventLoopGroup boss = new NioEventLoopGroup();
        final NioEventLoopGroup worker = new NioEventLoopGroup();

        try{
            final ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    //约定解码器,消息的长度为10
                    ch.pipeline().addLast(new FixedLengthFrameDecoder(10));
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                }
            });

            final ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
            channelFuture.channel().closeFuture().sync();

        }catch (InterruptedException e)
        {
            log.error("server error", e);

        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new HelloWorldServer().start();
    }

}

3.使用分割符作为解码器

1)LineBasedFrameDecoder

支持\n和\r\n,需要指定最大长度,如果在最大长度内没有找到分割符,就抛出异常

Client代码:

package com.yikolemon.P91Question;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;

@Slf4j
public class SolveClient2 {
    public static StringBuilder makeString(char c, int len) {
        StringBuilder sb = new StringBuilder(len + 2);
        for (int i = 0; i < len; i++) {
            sb.append(c);
        }
        sb.append("\n");
        return sb;
    }
    public static void main(String[] args) {
        send();
    }
    public static void send() {

        final NioEventLoopGroup worker = new NioEventLoopGroup();

        try{
            final Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        // channel连接建立好之后 出发 channelActive() 时间
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            //把所有的数据全都写到一个Buf中,让服务器去解码
                            ByteBuf buf = ctx.alloc().buffer();
                            char c='0';
                            for (int i = 0; i < 10; i++) {
                                int nextInt = new Random().nextInt(256+1);
                                StringBuilder str = makeString(c, nextInt);
                                byte[] bytes = str.toString().getBytes();
                                buf.writeBytes(bytes);
                                c++;
                            }
                            ctx.writeAndFlush(buf);
                        }
                    });
                }
            });
            final ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e)
        {
            log.error("Client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }
}

Server代码:

package com.yikolemon.P91Question;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class HelloWorldServer {

    void start(){
        final NioEventLoopGroup boss = new NioEventLoopGroup();
        final NioEventLoopGroup worker = new NioEventLoopGroup();

        try{
            final ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    //1024表示最大的检验长度,如果超过了长度仍然没有检测到换行符就会抛出异常
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                }
            });

            final ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
            channelFuture.channel().closeFuture().sync();

        }catch (InterruptedException e)
        {
            log.error("server error", e);

        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new HelloWorldServer().start();
    }

}

2)其他

其他的分割解码器,可以自行进行测试

4.LengthFieldBasedFrameDecoder帧解码器

四个构造参数的含义:

  • lengthFieldOffset :长度字段偏移量
  • lengthFieldLength :长度字段长度
  • lengthAdjustment :长度字段为基准,还有几个字节是内容
  • initialBytesToStrip :从头剥离几个字节

例子1:

  lengthFieldOffset   = 0
  lengthFieldLength   = 2
  lengthAdjustment    = 0
  initialBytesToStrip = 0 (= do not strip header)
 
  BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
  +--------+----------------+      +--------+----------------+
  | Length | Actual Content |----->| Length | Actual Content |
  | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
  +--------+----------------+      +--------+----------------+

例子2:最后剥离出长度字段

  lengthFieldOffset   = 0
  lengthFieldLength   = 2
  lengthAdjustment    = 0
  initialBytesToStrip = 2 (= the length of the Length field)
 
  BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
  +--------+----------------+      +----------------+
  | Length | Actual Content |----->| Actual Content |
  | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
  +--------+----------------+      +----------------+

例子3:

cafe占据两个字节,offset代表长度的偏移量,length代表长度的长度

  lengthFieldOffset   = 2 (= the length of Header 1)
  lengthFieldLength   = 3
  lengthAdjustment    = 0
  initialBytesToStrip = 0
 
  BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
  +----------+----------+----------------+      +----------+----------+----------------+
  | Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
  |  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
  +----------+----------+----------------+      +----------+----------+----------------+

例子4:

lengthAdjustment代表,从length之后,需要跳过2字节(类似Http的Header),才是真正的内容

  lengthFieldOffset   = 0
  lengthFieldLength   = 3
  lengthAdjustment    = 2 (= the length of Header 1)
  initialBytesToStrip = 0
 
  BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
  +----------+----------+----------------+      +----------+----------+----------------+
  |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
  | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
  +----------+----------+----------------+      +----------+----------+----------------+

例子5:

 lengthFieldOffset   = 1 (= the length of HDR1)
  lengthFieldLength   = 2
  lengthAdjustment    = 1 (= the length of HDR2)
  initialBytesToStrip = 3 (= the length of HDR1 + LEN)
 
  BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
  +------+--------+------+----------------+      +------+----------------+
  | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
  | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
  +------+--------+------+----------------+      +------+----------------+

代码测试:

package com.yikolemon.Decoder;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class LengthFieldDecoderTest {

    public static void main(String[] args) {
        EmbeddedChannel channel = new EmbeddedChannel(
                //int length是4个字节
                new LengthFieldBasedFrameDecoder(1024,0,4,4,0),
                new LoggingHandler(LogLevel.INFO)
        );
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
        send(buf,"Fuck u");
        send(buf,"Shit");
        channel.writeInbound(buf);
    }
    public static void send(ByteBuf buf,String str) {
        //4个字节的内容长度,    实际内容
        byte[] bytes =str.getBytes();
        int length = bytes.length;//实际内容长度
        buf.writeInt(length);
        //假装写入了一个版本号,所有需要adjustment对此(header)造成的影响进行修正
        buf.writeBytes("why:".getBytes());
        buf.writeBytes(bytes);
    }
}

3.协议的设计和解析

1.Redis协议

根据Redis的协议,使用Netty根据协议发送命令

Redis网络传输协议:

set ke vvvv 对应的网络数据发送:

*3

$3

set

$2

ke

$4

vvvv

首先在本机上搭建Redis,然后Netty发送命令

FakeRedisClient代码:

package com.yikolemon.FakeRedis;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;

public class FakeRedisClient {
    public static void main(String[] args) throws InterruptedException {
        final byte[] LINE={13,10};
        NioEventLoopGroup worker = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.group(worker);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                    @Override
                    //第一次连接调用
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        ByteBuf buf = ctx.alloc().buffer();
                        buf.writeBytes("*3".getBytes());
                        buf.writeBytes(LINE);
                        buf.writeBytes("$3".getBytes());
                        buf.writeBytes(LINE);
                        buf.writeBytes("set".getBytes());
                        buf.writeBytes(LINE);
                        buf.writeBytes("$2".getBytes());
                        buf.writeBytes(LINE);
                        buf.writeBytes("ke".getBytes());
                        buf.writeBytes(LINE);
                        buf.writeBytes("$4".getBytes());
                        buf.writeBytes(LINE);
                        buf.writeBytes("vvvv".getBytes());
                        buf.writeBytes(LINE);
                        ctx.writeAndFlush(buf);
                    }

                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        ByteBuf buf = (ByteBuf) msg;
                        System.out.println(buf.toString(Charset.defaultCharset()));
                    }
                });
            }
        });
        ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
        channelFuture.channel().closeFuture().sync();
    }
}

2.Http协议

使用Netty提供的HttpServerCodec(),此Handler能够解码Http请求。(它既是入站处理器也是出站处理器)

解码后,会生成为HttpRequest和HttpContent两个类,有两种方式对消息内容进行处理

  1. 方式一

    使用ChannelInboundHandlerAdapter()类处理,此方式处理需要对msg类型进行instance判断,需要if else,比较麻烦

  2. 方式二

    使用SimpleChannelInboundHandler,这里的T泛型类即此Handler会处理的消息,如果发送来的msg不为此类型,那么不会处理

DefaultFullHttpResponse作为HttpResponse,发送给Server,写入content和在header中的length

代码:

package com.yikolemon.FakeProtocol;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;

@Slf4j
public class FakeHttp {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup boss=new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss,worker);
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                ch.pipeline().addLast(new HttpServerCodec());
                /*ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        log.info("{}",msg.getClass());
                        if (msg instanceof HttpRequest){
                            //请求行,请求头
                        }
                        else if (msg instanceof HttpContent){
                            //请求体
                        }
                    }
                });*/
                ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                    @Override
                    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) throws Exception {
                        String uri = httpRequest.getUri();
                        log.info(uri);
                        DefaultFullHttpResponse response =
                                new DefaultFullHttpResponse(httpRequest.getProtocolVersion(), HttpResponseStatus.OK);
                        byte[] bytes = "<h1>Fuck this</h1>".getBytes();
                        response.content().writeBytes(bytes);
                        //告诉浏览器,content长度,防止浏览器一直读取
                        response.headers().setInt(CONTENT_LENGTH,bytes.length);
                        channelHandlerContext.writeAndFlush(response);
                    }
                });
            }
        });
        ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
        channelFuture.channel().closeFuture().sync();
    }
}

3.自定义协议的要素

  • 魔数:判断包是否无效:例如CAFEBABE
  • 版本号:可以支持协议的升级
  • 序列化算法:消息正文采用哪种序列化/反序列化方式,由此拓展,如json,protobuf,(hessian,jdk这俩为二进制)
  • 指令类型:登录,注册,单聊,群聊...
  • 请求序号:为了双工通信,提供异步的能力
  • 正文长度
  • 消息正文:使用json或者xml,或者对象流

4.聊天室消息协议

MessageCodec:

package com.yikolemon.Codec;

import com.yikolemon.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;

/**
 * 自定义编解码器
 */
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        //魔数写入
        out.writeBytes(new byte[]{1,2,3,4});
        //版本
        out.writeByte(1);
        //序列化算法 0:jdk 1:json
        out.writeByte(0);
        //指令类型
        out.writeByte(msg.getMessageType());
        //4个字节的请求序号
        out.writeInt(msg.getSequenceId());
        //对齐填充用
        out.writeByte(0xff);
        //长度
        //内容字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos=new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        //长度
        out.writeInt(bytes.length);
        out.writeBytes(bytes);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes=new byte[length];
        in.readBytes(bytes,0,length);
        if (serializerType==0){
            //使用jdk反序列化
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
            ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
            Message message = (Message)objectInputStream.readObject();
            log.info("{},{},{},{},{},{}",magicNum,version,serializerType,messageType,sequenceId,length);
            log.info("{}",message);
            out.add(message);
        }
    }
}

测试类:

package com.yikolemon.Codec;

import com.yikolemon.message.LoginRequestMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class MessageCodecTest {
    public static void main(String[] args) throws Exception {
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(
                //使用帧解码器能够解决半包和粘包问题
                new LengthFieldBasedFrameDecoder(1024,12,4,0,0),
                new LoggingHandler(LogLevel.INFO),
                new MessageCodec()
        );
        //encode
        LoginRequestMessage message = new LoginRequestMessage("yiko", "111");
        embeddedChannel.writeOutbound(message);

        //decode
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
        new MessageCodec().encode(null,message,buf);
        //入站
        embeddedChannel.writeInbound(buf);
    }
}

4.Handler线程安全

如果你使用的Handler没有被标注为Shareable,在addLast时不能共用Handler,否则会出错抛出异常

  1. 加上了@Shareable注解的Handler是线程安全的

    这些编码和解码器能够公共使用

  2. 没有加的时候需要注意线程安全问题(编码解码器)

    ByteToMessage的子类不能被标注为@Sharable

将3中的MessageCodec改为可以标注为Sharable的类:

package com.zko0.protocol;

import com.zko0.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;

/**
 * 在前面必须添加LengthFieldBasedFrameDecoder处理器
 * 确保接到的ByteBuf消息完整
 */
@Slf4j
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> list) throws Exception {
        ByteBuf out = ctx.alloc().buffer();
        //魔数写入
        out.writeBytes(new byte[]{1,2,3,4});
        //版本
        out.writeByte(1);
        //序列化算法 0:jdk 1:json
        out.writeByte(0);
        //指令类型
        out.writeByte(msg.getMessageType());
        //4个字节的请求序号
        out.writeInt(msg.getSequenceId());
        //对齐填充用
        out.writeByte(0xff);
        //长度
        //内容字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos=new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        //长度
        out.writeInt(bytes.length);
        out.writeBytes(bytes);
        list.add(out);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes=new byte[length];
        in.readBytes(bytes,0,length);
        if (serializerType==0){
            //使用jdk反序列化
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
            ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
            Message message = (Message)objectInputStream.readObject();
            log.info("{},{},{},{},{},{}",magicNum,version,serializerType,messageType,sequenceId,length);
            log.info("{}",message);
            out.add(message);
        }
    }
}

5.连接异常断开

channelInactive和exceptionCaught,分别处理channel连接断开和异常捕捉,重新实现该两个方法,可以实现客户端退出,服务端的对应处理

package com.zko0.server.handler;


import com.zko0.server.session.SessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {

    // 当连接断开时触发 inactive 事件
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.info("{} 已经断开", ctx.channel());
    }

    // 当出现异常时触发
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.info("{} 已经异常断开 异常是{}", ctx.channel(), cause.getMessage());
    }
}

6.心跳机制

连接假死问题:

  1. 当设备出现故障,底层TCP连接已经断开,但是应用程序没有感知到连接已经端口,会保持资源的占用
  2. 公网不稳定,丢包。连续丢包体现为:客户端数据无法发送,服务端无法接收到消息
  3. 应用线程阻塞

解决方式:使用心跳机制,一段事件内没有发送/接收到消息,触发事件,进行心跳检测

new IdleStateHandler()为Netty提供的Handler,能够在Channel长时间没有数据读/写的情况下,触发IdleState XXX事件

服务端检测:

在init内加入Handler

//判断是否读空闲时间过长,或者写空闲时间过长
//5s内没有收到channel数据,会出发IdleState#Read_IDLE事件
ch.pipeline().addLast(new IdleStateHandler(5,0,0));
// ChannelDuplexHandler可以作为入站或者出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler(){
    //用来触发特殊事件
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    IdleStateEvent event = (IdleStateEvent) evt;
    //触发读共享事件
        if (event.state()== IdleState.READER_IDLE){
            log.info("5s没有读到数据了");
            ctx.channel().close();
        }
    }
});

客户端检测:

ch.pipeline().addLast(new IdleStateHandler(0,3,0));
ch.pipeline().addLast(new ChannelDuplexHandler(){

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent) evt;
        //触发读共享事件
        if (event.state()== IdleState.WRITER_IDLE){
            log.info("3s没有写数据了");
            ctx.writeAndFlush(new PingMessage());
        }
    }
});

标签:Netty,进阶,netty,void,io,new,import,channel
From: https://www.cnblogs.com/zko0/p/17026291.html

相关文章

  • 牛客进阶题目5:信号发生器
    这个题目有点离谱,题里什么也没给,需要去题解中才知道方波、锯齿波和三角波最大值都为20,方波周期20,锯齿波周期21,三角波周期40对三种波形具体分析方波:周期为20且最大值也为2......
  • 牛客进阶题目4:输入序列不连续的序列检测
    跟上一题基本类似,多了个valid判定当前输入数据是否有效`timescale1ns/1nsmodulesequence_detect( inputclk, inputrst_n, inputdata, inputdata_valid, outpu......
  • Netty面试题及答案整理(2023最新版)
    **Netty面试题及答案**,每道都是认真筛选出的高频面试题,助力大家能找到满意的工作!###**下载链接**:[**全部面试题及答案PDF**](https://gitee.com/woniu201/interview-refere......
  • Netty-架构设计及入门程序-3
    一、原生NIO存在的问题1、NIO的类库和API繁杂,使用麻烦:需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。2、需要具备其他的额外技能:要熟......
  • 牛客进阶题目3:不重叠序列检测
    还是移位寄存器,加一个计数器来限制周期题目要求状态机,懒得画了,移位寄存器可根据时序图直接写`timescale1ns/1nsmodulesequence_detect( inputclk, inputrst_n, i......
  • 牛客进阶题目2:含有无关项的序列检测
    跟上一题类似这里有人可能会用到casex,最好别用,有的工具可能不支持`timescale1ns/1nsmodulesequence_detect( inputclk, inputrst_n, inputa, outputregmatch......
  • 牛客进阶题目1:输入序列连续检测
    检测01110001序列,满足序列则拉高match可以用状态机和移位寄存器,懒得画状态转移图,直接用移位寄存器解注意题中match在检测到序列后的下一周期拉高,所以需要延一拍`timesca......
  • C++进阶(智能指针)
    智能指针原理C++程序设计中使用堆内存是非常频繁的操作,堆内存的申请和释放都由程序员自己管理。程序员自己管理堆内存可以提高了程序的效率,但是整体来说堆内存的管理是麻......
  • 算法竞赛进阶指南 0x43 线段树
    文章目录​​线段树简介​​​​线段树的简单代码实现​​​​建树代码​​​​修改操作​​​​查询操作​​​​线段树的查询操作的时间复杂度分析:​​​​[AcWing245.你......
  • 音视频开发进阶|第七讲:分辨率与帧率·上篇
     在视频系列的前几篇推文中,我们已经接触了视频相关的不少概念,它们都是围绕着几个核心角色:色彩、像素、图像和视频来展开的。这几个核心角色之间的关系,大家应该都有了基本......