1、扩展序列化算法
序列化、反序列化主要用在消息正文的转换上:
- 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[]);
- 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理。
目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下:
// 反序列化
byte[] body = new byte[bodyLength];
bytebuf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);
// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();
为了支持更多序列化算法,抽象一个 Serializer 接口。
pulic interface Serializer {
// 反序列化方法
<T> T deserialize(Class<T> clazz, byte[] bytes);
// 序列化方法
<T> byte[] serialize(T object);
}
代码演示:
public interface Serializer {
<T> T deserialize(Class<T> clazz, byte[] bytes);
<T> byte[] serialize(T object);
enum Algorithm implements Serializer {
Java {
@Override
@SuppressWarnings("unchecked")
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
ObjectInputStream oiStream = null;
oiStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
return ((T) oiStream.readObject());
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化失败", e);
}
}
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream baoStream = new ByteArrayOutputStream();
ObjectOutputStream ooStream = null;
ooStream = new ObjectOutputStream(baoStream);
ooStream.writeObject(object);
return baoStream.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化失败", e);
}
}
},
JSON {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
String json = new String(bytes, StandardCharsets.UTF_8);
return new Gson().fromJson(json, clazz);
}
@Override
public <T> byte[] serialize(T object) {
String json = new Gson().toJson(object);
return json.getBytes(StandardCharsets.UTF_8);
}
}
}
}
2、参数调优
2.1、CONNECT_TIMEOUT_MILLIS
- 属于 SocketChannel 参数;
- 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常;
- SO_TIMEOUT 主要用在阻塞IO,阻塞 IO 中 accept、read 等待都是无限等待的,如果不希望永远阻塞,使用它调整超时时间。
/**
*
// 客户端通过 .option() 方法配置参数
// 服务器端通过 .option() 方法给 ServerSocketChannel 配置参数;通过 .childOption() 方法给 SocketChannel 配置参数
*/
@Slf4j
public class TestConnectionTimeout {
public static void main(String[] args) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler());
ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
future.sync().channel().closeFuture().sync();
} catch (Exception ex) {
ex.printStackTrace();
log.debug("timeout");
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
2.2、SO_BACKLOG
属于 ServerSocketChannel 参数。
- 第1次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列;
- 第2次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server;
- 第3次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue。
在 Linux 2.2 之前,backlog 大小包括了2个队列的大小,在 2.2 之后,分别用下面两个参数来控制:
- sysc queue - 半连接队列:大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 sync cookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略;
- accept queue - 全连接队列:大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值。如果 accept queue 队列满了,server 将发送一个拒绝连接的错误信息到 client。
netty 中,可以通过 option(ChannelOption.SO_BACK_LOG, 值) 来设置大小。
2.3、ulimit -n
属于操作系统参数,限制一个进程能同时打开的最大文件描述符(FD)数量。建议放在启动脚本中。
2.4、TCP_NODELAY
属于 SocketChannel 参数,netty 默认为false,即开启了 nagle 算法,如果我们不想要延迟,则应设为 true。
2.5、SO_SNDBUF & SO_REVBUF
滑动窗口上限,我们一般不需要调整,因为操作系统会自动帮我们进行调整。
- SO_SNDBUF 属于 SocketChannel 参数;
- SO_REVBUF 既可以用于 SocketChannel 参数,也可以用于 ServerSocketChannel 参数(建议设置到 ServerSocketChannel 上)。
2.6、ALLOCATOR
- 属于 SocketChannel 参数;
- 用来分配 ByteBuf,ctx.alloc()。
2.7、REVBUF_ALLOCATOR
- 属于 SocketChannel 参数;
- 控制 netty 接收缓冲区的大小;
- 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定。
3、RPC 框架
为了简化起见,在原来聊天项目的基础上新增 RPC 请求和响应消息。
@Data
public abstract class Message implements Serializable {
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
private static final Map<Integer, Class<? extends Message>> MESSAGE_CLASSES = new ConcurrentHashMap<>();
static {
MESSAGE_CLASSES.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
MESSAGE_CLASSES.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
public static Class<?> getMassageClass(int messageType) {
return MESSAGE_CLASSES.get(messageType);
}
public abstract int getMessageType();
}
请求消息:
@Data
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {
private String interfaceName; // 调用的接口全限定名,服务端根据它找到实现
private String methodName; // 调用接口中的方法名
private Class<?> returnType; // 方法返回类型
private Class<?>[] parameterTypes; // 方法参数类型数组
private Object[] parameterValues; // 方法参数值数组
public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType,
Class<?>[] parameterTypes, Object[] parameterValues) {
super(sequenceId);
this.interfaceName = interfaceName;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameterValues = parameterValues;
}
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_REQUEST;
}
}
响应消息:
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
private Object returnValue;
private Exception exceptionValue;
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_RESPONSE;
}
}
服务端架子:
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
RpcResponseMessage responseMsg = new RpcResponseMessage();
responseMsg.setSequenceId(message.getSequenceId());
try {
HelloService helloService = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));
Method method = helloService.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
Object result = method.invoke(helloService, message.getParameterValues());
responseMsg.setReturnValue(result);
} catch (Exception e) {
e.printStackTrace();
responseMsg.setExceptionValue(new Exception("远程调用出错:" + e.getCause().getMessage()));
}
ctx.writeAndFlush(responseMsg);
}
}
@Slf4j
public class RpcServer {
public static void main(String[] args) {
NioEventLoopGroup boos = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodec MESSAGE_CODEC = new MessageCodec();
// rpc 请求消息处理器
RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boos, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boos.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
客户端架子:
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
log.debug("{}", msg);
}
}
@Slf4j
public class RpcClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(new MessageCodec());
ch.pipeline().addLast(RPC_HANDLER);
}
});
Channel channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();
ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
1,
"com.clp.test3.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"张三"}
)).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
Throwable cause = future.cause();
log.error("error: {}", cause.getMessage());
}
}
});
channel.closeFuture().sync().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
group.shutdownGracefully();
}
});
} catch (Exception e) {
log.error("client error", e);
}
}
}
RPC例子:
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
// Map<SequenceId, 结果容器Promise> ,用来接收结果
public static final Map<Integer, Promise<Object>> PROMISE_MAP = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
// 拿到空的 promise
Promise<Object> promise = PROMISE_MAP.remove(msg.getSequenceId());
if (promise != null) {
Object returnValue = msg.getReturnValue();
Exception exceptionValue = msg.getExceptionValue();
if (exceptionValue != null) {
promise.setFailure(exceptionValue);
} else {
promise.setSuccess(returnValue);
}
}
log.debug("{}", msg);
}
}
@Slf4j
public class RpcClientManager {
public static void main(String[] args) {
HelloService helloService = getProxyService(HelloService.class);
helloService.sayHello("张三");
helloService.sayHello("李四");
}
// 创建代理类,实现要远程调用的接口
@SuppressWarnings("unchecked")
public static <T> T getProxyService(Class<T> serviceClass) {
ClassLoader loader = serviceClass.getClassLoader();
Class<?>[] interfaces = new Class[]{serviceClass};
Object proxy = Proxy.newProxyInstance(loader, interfaces, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 1、将方法的调用转化成 消息对象
int sequenceId = SequenceIdGenerator.nextId();
RpcRequestMessage message = new RpcRequestMessage(
sequenceId,
serviceClass.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 2、将消息对象发送出去
getChannel().writeAndFlush(message);
// 3、准备一个空 Promise 对象,用来接收结果 指定 promise 对象异步接收结果的线程(即要回调 listener 的方法的线程)
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
RpcResponseMessageHandler.PROMISE_MAP.put(sequenceId, promise);
// 等待 promise 的结果(sync()方法会抛异常,而await()不会)
promise.await();
if (promise.isSuccess()) {
// 调用正常
return promise.getNow();
} else {
// 调用失败
throw new RuntimeException(promise.cause());
}
}
});
return ((T) proxy);
}
private static Channel channel = null;
private static final Object LOCK = new Object();
// 获取唯一的 channel 对象
public static Channel getChannel() {
if (channel != null) return channel;
synchronized (LOCK) {
if (channel != null) return channel;
initChannel();
return channel;
}
}
// 初始化 Channel
static void initChannel() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(new MessageCodec());
ch.pipeline().addLast(RPC_HANDLER);
}
});
try {
channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();
channel.closeFuture().addListener((ChannelFutureListener) future -> group.shutdownGracefully());
} catch (Exception e) {
log.error("client error", e);
}
}
}
标签:Netty,public,static,new,序列化,优化,class,channel From: https://www.cnblogs.com/aoe1231/p/17509514.html