首页 > 其他分享 >Netty——4、优化

Netty——4、优化

时间:2023-06-28 14:55:55浏览次数:37  
标签:Netty public static new 序列化 优化 class channel

1、扩展序列化算法

序列化、反序列化主要用在消息正文的转换上:

  • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[]);
  • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理。

目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下:

// 反序列化
byte[] body = new byte[bodyLength];
bytebuf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);

// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();

为了支持更多序列化算法,抽象一个 Serializer 接口。

pulic interface Serializer {
    // 反序列化方法
    <T> T deserialize(Class<T> clazz, byte[] bytes);
    
    // 序列化方法
    <T> byte[] serialize(T object);
}

代码演示:

public interface Serializer {

    <T> T deserialize(Class<T> clazz, byte[] bytes);

    <T> byte[] serialize(T object);

    enum Algorithm implements Serializer {
        Java {
            @Override
            @SuppressWarnings("unchecked")
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {
                try {
                    ObjectInputStream oiStream = null;
                    oiStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
                    return ((T) oiStream.readObject());
                } catch (IOException | ClassNotFoundException e) {
                    throw new RuntimeException("反序列化失败", e);
                }
            }

            @Override
            public <T> byte[] serialize(T object) {
                try {
                    ByteArrayOutputStream baoStream = new ByteArrayOutputStream();
                    ObjectOutputStream ooStream = null;
                    ooStream = new ObjectOutputStream(baoStream);
                    ooStream.writeObject(object);
                    return baoStream.toByteArray();
                } catch (IOException e) {
                    throw new RuntimeException("序列化失败", e);
                }
            }
        },

        JSON {
            @Override
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {
                String json = new String(bytes, StandardCharsets.UTF_8);
                return new Gson().fromJson(json, clazz);
            }

            @Override
            public <T> byte[] serialize(T object) {
                String json = new Gson().toJson(object);
                return json.getBytes(StandardCharsets.UTF_8);
            }
        }
    }

}

2、参数调优

2.1、CONNECT_TIMEOUT_MILLIS

  • 属于 SocketChannel 参数;
  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常;
  • SO_TIMEOUT 主要用在阻塞IO,阻塞 IO 中 accept、read 等待都是无限等待的,如果不希望永远阻塞,使用它调整超时时间。
/**
 *
 // 客户端通过 .option() 方法配置参数
 // 服务器端通过 .option() 方法给 ServerSocketChannel 配置参数;通过 .childOption() 方法给 SocketChannel 配置参数
 */
@Slf4j
public class TestConnectionTimeout {
    public static void main(String[] args) {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
                    .channel(NioSocketChannel.class)
                    .handler(new LoggingHandler());
            ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
            future.sync().channel().closeFuture().sync();
        } catch (Exception ex) {
            ex.printStackTrace();
            log.debug("timeout");
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

2.2、SO_BACKLOG

属于 ServerSocketChannel 参数。

  • 第1次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列;
  • 第2次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server;
  • 第3次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue。

在 Linux 2.2 之前,backlog 大小包括了2个队列的大小,在 2.2 之后,分别用下面两个参数来控制:

  • sysc queue - 半连接队列:大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 sync cookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略;
  • accept queue - 全连接队列:大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值。如果 accept queue 队列满了,server 将发送一个拒绝连接的错误信息到 client。

netty 中,可以通过 option(ChannelOption.SO_BACK_LOG, 值) 来设置大小。

2.3、ulimit -n

属于操作系统参数,限制一个进程能同时打开的最大文件描述符(FD)数量。建议放在启动脚本中。

2.4、TCP_NODELAY

属于 SocketChannel 参数,netty 默认为false,即开启了 nagle 算法,如果我们不想要延迟,则应设为 true。

2.5、SO_SNDBUF & SO_REVBUF

滑动窗口上限,我们一般不需要调整,因为操作系统会自动帮我们进行调整。

  • SO_SNDBUF 属于 SocketChannel 参数;
  • SO_REVBUF 既可以用于 SocketChannel 参数,也可以用于 ServerSocketChannel 参数(建议设置到 ServerSocketChannel 上)。

2.6、ALLOCATOR

  • 属于 SocketChannel 参数;
  • 用来分配 ByteBuf,ctx.alloc()。

2.7、REVBUF_ALLOCATOR

  • 属于 SocketChannel 参数;
  • 控制 netty 接收缓冲区的大小;
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定。

3、RPC 框架

为了简化起见,在原来聊天项目的基础上新增 RPC 请求和响应消息。

@Data
public abstract class Message implements Serializable {

    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
    
    private static final Map<Integer, Class<? extends Message>> MESSAGE_CLASSES = new ConcurrentHashMap<>();
    
    static {
        MESSAGE_CLASSES.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        MESSAGE_CLASSES.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }

    public static Class<?> getMassageClass(int messageType) {
        return MESSAGE_CLASSES.get(messageType);
    }

    public abstract int getMessageType();
    
}

请求消息:

@Data
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {
    private String interfaceName; // 调用的接口全限定名,服务端根据它找到实现
    private String methodName; // 调用接口中的方法名
    private Class<?> returnType; // 方法返回类型
    private Class<?>[] parameterTypes; // 方法参数类型数组
    private Object[] parameterValues; // 方法参数值数组

    public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType,
                             Class<?>[] parameterTypes, Object[] parameterValues) {
        super(sequenceId);
        this.interfaceName = interfaceName;
        this.methodName = methodName;
        this.returnType = returnType;
        this.parameterTypes = parameterTypes;
        this.parameterValues = parameterValues;
    }


    @Override
    public int getMessageType() {
        return RPC_MESSAGE_TYPE_REQUEST;
    }
}

响应消息:

@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
    private Object returnValue;
    private Exception exceptionValue;
    
    @Override
    public int getMessageType() {
        return RPC_MESSAGE_TYPE_RESPONSE;
    }
}

服务端架子:

@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
        RpcResponseMessage responseMsg = new RpcResponseMessage();
        responseMsg.setSequenceId(message.getSequenceId());
        try {
            HelloService helloService = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));
            Method method = helloService.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
            Object result = method.invoke(helloService, message.getParameterValues());

            responseMsg.setReturnValue(result);

        } catch (Exception e) {
            e.printStackTrace();
            responseMsg.setExceptionValue(new Exception("远程调用出错:" + e.getCause().getMessage()));
        }
        ctx.writeAndFlush(responseMsg);
    }
}

@Slf4j
public class RpcServer {
    public static void main(String[] args) {
        NioEventLoopGroup boos = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();

        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodec MESSAGE_CODEC = new MessageCodec();
        // rpc 请求消息处理器
        RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boos, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProtocolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boos.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

客户端架子:

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        log.debug("{}", msg);
    }
}

@Slf4j
public class RpcClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProtocolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(new MessageCodec());
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();

            ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
                    1,
                    "com.clp.test3.HelloService",
                    "sayHello",
                    String.class,
                    new Class[]{String.class},
                    new Object[]{"张三"}
            )).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        Throwable cause = future.cause();
                        log.error("error: {}", cause.getMessage());
                    }
                }
            });

            channel.closeFuture().sync().addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    group.shutdownGracefully();
                }
            });

        } catch (Exception e) {
            log.error("client error", e);
        }
    }
}

RPC例子:

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
    // Map<SequenceId, 结果容器Promise> ,用来接收结果
    public static final Map<Integer, Promise<Object>> PROMISE_MAP = new ConcurrentHashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        // 拿到空的 promise
        Promise<Object> promise = PROMISE_MAP.remove(msg.getSequenceId());
        if (promise != null) {
            Object returnValue = msg.getReturnValue();
            Exception exceptionValue = msg.getExceptionValue();
            if (exceptionValue != null) {
                promise.setFailure(exceptionValue);
            } else {
                promise.setSuccess(returnValue);
            }
        }

        log.debug("{}", msg);
    }
}

@Slf4j
public class RpcClientManager {
    public static void main(String[] args) {
        HelloService helloService = getProxyService(HelloService.class);
        helloService.sayHello("张三");
        helloService.sayHello("李四");
    }

    // 创建代理类,实现要远程调用的接口
    @SuppressWarnings("unchecked")
    public static <T> T getProxyService(Class<T> serviceClass) {
        ClassLoader loader = serviceClass.getClassLoader();
        Class<?>[] interfaces = new Class[]{serviceClass};
        Object proxy = Proxy.newProxyInstance(loader, interfaces, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 1、将方法的调用转化成 消息对象
                int sequenceId = SequenceIdGenerator.nextId();
                RpcRequestMessage message = new RpcRequestMessage(
                        sequenceId,
                        serviceClass.getName(),
                        method.getName(),
                        method.getReturnType(),
                        method.getParameterTypes(),
                        args
                );
                // 2、将消息对象发送出去
                getChannel().writeAndFlush(message);
                // 3、准备一个空 Promise 对象,用来接收结果        指定 promise 对象异步接收结果的线程(即要回调 listener 的方法的线程)
                DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
                RpcResponseMessageHandler.PROMISE_MAP.put(sequenceId, promise);
                // 等待 promise 的结果(sync()方法会抛异常,而await()不会)
                promise.await();
                if (promise.isSuccess()) {
                    // 调用正常
                    return promise.getNow();
                } else {
                    // 调用失败
                    throw new RuntimeException(promise.cause());
                }
            }
        });
        return ((T) proxy);
    }

    private static Channel channel = null;
    private static final Object LOCK = new Object();

    // 获取唯一的 channel 对象
    public static Channel getChannel() {
        if (channel != null) return channel;
        synchronized (LOCK) {
            if (channel != null) return channel;
            initChannel();
            return channel;
        }
    }

    // 初始化 Channel
    static void initChannel() {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.group(group);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ProtocolFrameDecoder());
                ch.pipeline().addLast(LOGGING_HANDLER);
                ch.pipeline().addLast(new MessageCodec());
                ch.pipeline().addLast(RPC_HANDLER);
            }
        });
        try {
            channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();
            channel.closeFuture().addListener((ChannelFutureListener) future -> group.shutdownGracefully());
        } catch (Exception e) {
            log.error("client error", e);
        }
    }
}

 

标签:Netty,public,static,new,序列化,优化,class,channel
From: https://www.cnblogs.com/aoe1231/p/17509514.html

相关文章

  • QT性能优化实战 QML优化 QT高性能 QT6系列视频课程 QT6 性能优化实战 QT高性能 QT原理
      QT性能优化实战视频课程QT6Widgets高性能应用编程 1.课前考试2.字符串优化(上)3.字符串优化(下)4.绘图优化(上)5.绘图优化(下) 6.QT界面优化(上)7.QT界面优化(下)8.QT高性能统计图优化 9.QT高性能图形视图图元场景优化......
  • Shuffle Cards (牛客多校) (rope 块状链表 用作可持续优化平衡树, 用于区间的整体移动
    rope:#include<ext/rope>usingnamespace__gnu_cxx; 定义方法:rope<变量类型>变量名称;人话解释:超级string算法解释:块状链表(即讲链表与数组的优势结合,形成分块思想)用途解释:这本来是一个用于快速操作string的工具,却一般被定义成int,然后用作可持久化线段树!insert(intpos,s......
  • Android性能优化:微信自用高性能持久化框架——MMKV组件原理
    MMKVMMKV——基于mmap的高性能通用key-value组件,底层序列化/反序列化使用protobuf实现,性能高,稳定性强。githubMMKV是基于mmap内存映射的移动端通用key-value组件,底层序列化/反序列化使用protobuf实现,性能高,稳定性强。从2015年中至今,在iOS微信上使用已有近3年,其......
  • 精选Android中高级面试题:性能优化,JNI,设计模式
    性能优化1、图片的三级缓存中,图片加载到内存中,如果内存快爆了,会发生什么?怎么处理?参考回答:首先我们要清楚图片的三级缓存是如何的:如果内存足够时不回收。内存不够时就回收软引用对象2、内存中如果加载一张500*500的png高清图片。应该是占用多少的内存?不考虑屏幕比的话:占用内存......
  • BAT 大厂Android研发岗必刷真题:Android异常与性能优化相关面试问题
    今天来讲一讲在面试中碰到的Android异常与性能优化相关问题:1、anr异常面试问题讲解a)什么是anr?应用程序无响应对话框b)造成anr的原因?**主线程中做了耗时操作c)android中那些操作是在主线程呢?activity的所有生命周期回调都是执行在主线程的Service默认是执行在主线程的BroadcastR......
  • ffmpeg播放RTSP的一点优化
    简单记录一下最近使用ffmpeg播放RTSP做的一点参数优化。先做如下定义:AVDictionary*options=NULL;1.画质优化原生的ffmpeg参数在对1920x1080的RTSP流进行播放时,花屏现象很严重,根据网上查的资料,可以通过增大“buffer_size”参数来提高画质,减少花屏现象如:av_dict_set(&op......
  • mysql优化原则
    1.尽量不要在列上运算,这样会导致索引失效例如:select*fromadminwhereyear(admin_time)>2014优化为:select*fromadminwhereadmin_time>'2014-01-01′2.limit的基数比较大时,使用betweenand代替例如:select*fromadminorderbyadmin_idlimit100000,10优化为:se......
  • 如何高度优化适用于企业的AI (二) 数据准备
    我们先从快速浏览整个过程首先,我们需要数据,要尽可能使用CSV/Json格式,基于这些信息,我们要让AI获取所有所需要的信息来正确的完成它的工作准备数据,微调过程需要很长的书剑才能运行,我们肯定是不希望出现错误的,所以我们使用OpenAI的数据准备工具(CLItool)来处理我们......
  • 数据库架构优化
    数据库架构优化一.分库分表1.整除取余比如有user有10个库,每个库有100张表userid=100087存放的数据库和表格的路径100087%10=7第7个库10087%100=87第87张表2.根据id最后1位和两位插入数据库和表Id的最后1位为数据库的位置Id的最后两位为表格的位置二.读写分离,主从配置主数据库进......
  • 代码优化
    代码优化1.使用对象池减少对重复对象的创建,比如tcp链接、数据库链接、多线程2.调整连接数,连接池、数据库链接数、tomcat、nginx连接数3.利用缓存技术增加缓存、本地缓存(tomcat内存之中,不走网络)、redis缓存4.串行改并行,即单线程改多线程5.同步改异步,使用场景:本次调用接口的结果会影......