一、Netty和网络通信
(一)Netty框架简介
1、BIO
存在性能问题、可靠性问题、可维护性问题
BIO的核心问题是同步阻塞
public void service() throws IOException {
ServerSocket serverSocket = new ServerSocket(port);
while (true) {
Socket receiveSocket = serverSocket.accept();
final Socket socket = receiveSocket;
threadPool.execute(new Runnable() {
public void run() {
try {
//处理请求
process(socket);
socket.close();
} catch (Exception e) {
......
}
}
});
}
}
2、NIO 模型
Java NIO的结构:一个线程对应Selector,一个Selector对应多个Channel(连接),Selector 会根据不同的事件,在各个Channel上切换,切换到哪个channel由事件决定。每个Channel都会对应一个Buffer,Buffer 就是一个内存块,底层是一个数组。
Java NIO的问题:
编程模型复杂,对开发人员不友好
功能支持不够,简单的拆包操作都需要手工实现
底层基于操作系统的Epoll实现,存在线程空轮询bug
维护成本较高,容易出现开发上的漏洞
3、Netty的解决方案:
高效API:内置一组辅助类,简化开发难度
多协议支持:TCP、UDP等
内置编解码:Java序列化、ProtoBuf等
Listenter机制:异步操作集成监听器回调
管道-过滤器:可插拔、高扩展架构
4、Netty逻辑架构
从上到下分为服务编排层、事件调度层、网络通讯层
服务编排层:负责组装各类服务,它是 Netty 的核心处理链,用以实现网络事件的动态编排和有序传播,对应ChannelPipeline、ChannelHandler、ChannelHandlerContext
事件调度层:通过 Reactor 模型对各类事件进行聚合处理,通过 Selector 主循环线程集成多种事件,对应EventLoopGroup、EventLoop
网络通信层:执行网络I/O操作,并触发各种网络事件,这些网络事件会分发给事件调度层进行处理,对应BootStrap、ServerBootStrap、Channel
5、Netty功能特性包括:编解码、粘包拆包、多协议、可靠性
(1)编解码:
Netty提供了MessageToMessage的编解码器和ByteToMessage编解码器。
(2)粘包拆包:
FixedLengthFrameDecoder:基于固定长度划分业务包
LengthFieldBasedFrameDecoder:使用特定协议头划分业务包
LineBasedFrameDecoder:基于换行符划分业务包
DelimiterBasedFrameDecoder:使用自定义的分隔符划分业务包
(3)多协议:
HTTP、SSH、FTP、TCP、UDP、自定义协议
(4)可靠性:
超时控制:通过ChannelConfig进行异步连接超时配置
心跳检测:通过IdleStateHandler进行链路空闲状态处理器
流量整形:通过AbstractTrafficShapingHandler进行可定制的流量整形处理器
6、Netty应用场景:
Netty在分布式系统构建过程中应用非常广泛,属于基础设施类框架,例如Dubbo默认使用Netty作为高性能异步通信框架,Avro的RPC框架使用Netty构建服务端和客户端通信服务。
(二)启动Netty客户端和服务端
1、启动Netty服务端:
public class Server {
public static void main(String[] strings) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker) // 初始化线程模型
.channel(NioServerSocketChannel.class) // 指定IO模型
.childHandler(new ChannelInitializer<niosocketchannel>() { // 可以指定一堆 handler
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 处理IO读写逻辑
......
}
});
// 绑定端口启动服务
bootstrap.bind(8080);
}
}
2、服务端绑定监听器:
ChannelFuture future = bootstrap.bind(port);
future.addListener(new ChannelFutureListener() {
// 监听服务端绑定事件
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()){
logger.info("server started! using port {} " , port);
}else {
logger.info("server start failed! using port {} " , port);
channelFuture.cause().printStackTrace();
System.exit(0);
} }
});
3、启动Netty客户端:
public class Client {
public static void main(String[] strings) {
NioEventLoopGroup worker = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(worker) // 初始化线程模型
.channel(NioSocketChannel.class) // 指定IO模型
.childHandler(new ChannelInitializer<niosocketchannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 处理IO读写逻辑
......
}
});
bootstrap.bind("time.geekbang.org",8080).addListener(new ChannelFutureListener() {
// 监听客户端绑定事件
public void operationComplete(ChannelFuture channelFuture) throws Exception {
......
}
});
}
}
4、设置和获取属性:
当需要在channel中设置一些属性时,可以 使用channel.attr来设置其属性值,例如下面的代码是设置登录状态。
public class LoginUtil {
// 设置通道属性
public static void markAsLogin(Channel channel) {
channel.attr(Attributes.LOGIN).set(true);
}
public static boolean hasLogin(Channel channel) {
// 获取通道属性
Attribute<boolean> login = channel.attr(Attributes.LOGIN);
//只要标志位不为空,即表示登录过
if (login.get() != null)
return true;
return false;
}
}
(三)实现Netty客户端和服务端双向通信
双向通道流程:客户端发送数据到服务端、服务端读取客户端数据、服务端返回数据到客户端、客户端读取服务端数据
1、客户端发送数据到服务端:
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("客户端与服务端链接成功");
//准备数据
ByteBuf byteBuf = ctx.alloc().buffer();
byte[] bytes = "你好,我是lcl".getBytes(StandardCharsets.UTF_8);
byteBuf.writeBytes(bytes);
//写入数据
ctx.channel().writeAndFlush(byteBuf);
}
}
2、服务端读取客户端数据
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("服务端收到来自客户端消息");
//转换收到的消息内容
ByteBuf byteBuf = (ByteBuf) msg;
//在客户端输出接收到的消息内容
System.out.println("收到客户端端数据:" + byteBuf.toString(StandardCharsets.UTF_8));
}
}
3、服务端返回数据到客户端
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("服务端收到来自客户端消息");
......
//向客户端输出内容
ByteBuf byteBufToClient = ctx.alloc().buffer();
byte[] bytes = "你好,我是服务端:".getBytes(StandardCharsets.UTF_8);
byteBufToClient.writeBytes(bytes);
ctx.channel().writeAndFlush(byteBufToClient);
}
}
4、客户端读取服务端数据
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("客户端读取服务端数据");
//转换接收到的消息内容
ByteBuf byteBuf = (ByteBuf) msg;
//在服务端输出接收到的消息内容
System.out.println("收到服务端数据:" + byteBuf.toString(StandardCharsets.UTF_8));
}
}
(四)基于Netty重构RPC架构
1、基于Netty重新实现RPC框架
(1)定义服务端handler
public class ServerProxyHandler extends SimpleChannelInboundHandler<object> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
Protocol protocol = (Protocol) o;
// 获取当前实现类
String className = protocol.getInterfaceName()+"Impl";
String methodName = protocol.getMethodName();
Class[] paramterTypes = protocol.getParamsTypes();
Object[] paramValues = protocol.getParameters();
// 反射获取实现类
Class aClass = Class.forName(className);
Object bean = aClass.newInstance();
Method method = aClass.getDeclaredMethod(methodName, paramterTypes);
// 调用
Object result = method.invoke(bean, paramValues);
channelHandlerContext.writeAndFlush(result);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
}
(2)服务端启动类
public class NettyRpcServer {
public static void main(String[] args) {
new NettyRpcServer().start(8111);
}
NioEventLoopGroup worker;
NioEventLoopGroup boss;
private void start(int port) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
worker = new NioEventLoopGroup();
boss = new NioEventLoopGroup();
serverBootstrap.group(worker, boss);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<socketchannel>(){
@Override
public void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast(new ObjectEncoder());
channelPipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
channelPipeline.addLast(new ServerProxyHandler());
}
});
try {
serverBootstrap.bind(port).sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
(3)客户端handler
public class ClientHandler extends SimpleChannelInboundHandler<object> {
private Protocol protocol;
private Object result;
private Channel channel;
public ClientHandler(Protocol protocol){
this.protocol = protocol;
}
public Object getResult(){
return result;
}
@Override
public void channelActive(ChannelHandlerContext cxt) {
channel = cxt.channel();
channel.writeAndFlush(protocol);
}
@Override
protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
this.result = o;
System.out.println("ClientHandler.channelRead0=======" + result);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
}
(4)客户端启动类
public class NettyRpcClient1 {
private NioEventLoopGroup work;
//private ClientProxyHandler clientProxyHandler;
public void start(String host, int port){
Protocol protocol = new Protocol();
protocol.setInterfaceName("com.lcl.galaxy.rpc.service.UserService");
protocol.setMethodName("getUserNameByCode");
Class[] paramterType = {String.class};
Object[] paramters = {"1"};
protocol.setParameters(paramters);
protocol.setParamsTypes(paramterType);
final ClientHandler clientHandler = new ClientHandler(protocol);
Bootstrap bootstrap = new Bootstrap();
work = new NioEventLoopGroup();
bootstrap.group(work).channel(NioSocketChannel.class).handler(new ChannelInitializer<socketchannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast(new ObjectEncoder());
channelPipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
channelPipeline.addLast(clientHandler);
}
});
try {
bootstrap.connect(host, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("NettyRpcClient1.start======== " + clientHandler.getResult());
}
}
(5)客户端启动
NettyRpcClient1 nettyRpcClient1 = new NettyRpcClient1();
nettyRpcClient1.start("127.0.0.1", 8111);
(6)输出
输出有一个null,是有问题的,这是因为channel.writeAndFlush是异步的,调用完成后直接就返回了,此时在start方法中的输出为null,第二次输出是在channelRead0中输出的。
NettyRpcClient1.start======== null
ClientHandler.channelRead0=======lcl1
2、解决channel.writeAndFlush异步输出问题
(1)重新定义客户端handler
让handler继承SimpleChannelInboundHandler的同时,实现Callable接口,然后channelRead0方法和call方法同步阻塞,在call方法中进行等待,然后等netty服务端响应(触发channelRead0)方法时,再接触call()方法的阻塞,这样就可以在call()方法中获取到结果。
public class ClientProxyHandler extends SimpleChannelInboundHandler<object> implements Callable {
private Protocol protocol;
private Object result;
private Channel channel;
public void setProtocol(Protocol protocol){
this.protocol = protocol;
}
@Override
public void channelActive(ChannelHandlerContext cxt) {
channel = cxt.channel();
}
@Override
protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
this.result = o;
notify();
}
@Override
public synchronized Object call() throws Exception {
channel.writeAndFlush(protocol);
wait();
return result;
}
}
(2)重新定义启动类
在启动类中,直接调用clientProxyHandler的call方法即可。
public class NettyRpcClient2 {
private NioEventLoopGroup work;
private ClientProxyHandler clientProxyHandler;
public void start(String host, int port){
Protocol protocol = new Protocol();
protocol.setInterfaceName("com.lcl.galaxy.rpc.service.UserService");
protocol.setMethodName("getUserNameByCode");
Class[] paramterType = {String.class};
Object[] paramters = {"1"};
protocol.setParameters(paramters);
protocol.setParamsTypes(paramterType);
clientProxyHandler = new ClientProxyHandler();
clientProxyHandler.setProtocol(protocol);
Bootstrap bootstrap = new Bootstrap();
work = new NioEventLoopGroup();
bootstrap.group(work).channel(NioSocketChannel.class).handler(new ChannelInitializer<socketchannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast(new ObjectEncoder());
channelPipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
channelPipeline.addLast(clientProxyHandler);
}
});
try {
bootstrap.connect(host, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println("NettyRpcClient1.start======== " + clientProxyHandler.call());
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、解决客户端硬编码问题
在上述两个样例中,客户端调用都是使用硬编码调用的接口,在客户端增加代理,让其使用方法调用的方式进行调用。
主要就是让客户端启动和方法调用分离,客户端启动还和上面的一样,而服务端调用,则使用JDK动态代理生成代理对象,调用时设置Protocol协议,并且创建一个FutureTask任务异步执行,最终调用futureTask的get方法(实际上就是调用handler的call()方法)获取数据。
public class NettyRpcClient3 {
private NioEventLoopGroup work;
private ClientProxyHandler clientProxyHandler;
public void start(String host, int port){
Bootstrap bootstrap = new Bootstrap();
work = new NioEventLoopGroup();
clientProxyHandler = new ClientProxyHandler();
bootstrap.group(work).channel(NioSocketChannel.class).handler(new ChannelInitializer<socketchannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast(new ObjectEncoder());
channelPipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
channelPipeline.addLast(clientProxyHandler);
}
});
try {
bootstrap.connect(host, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public <t> T proxy(final Class<t> target){
return (T) Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Protocol protocol = new Protocol();
protocol.setInterfaceName(target.getName());
protocol.setMethodName(method.getName());
protocol.setParameters(args);
protocol.setParamsTypes(method.getParameterTypes());
System.out.println("NettyRpcClient3.invoke======" + protocol);
clientProxyHandler.setProtocol(protocol);
FutureTask<object> futureTask = new FutureTask<>(clientProxyHandler);
new Thread(futureTask).start();
return futureTask.get();
}
});
}
调用:在方法调用时,将启动、获取代理对象、方法调用分开。
NettyRpcClient3 nettyRpcClient3 = new NettyRpcClient3();
nettyRpcClient3.start("127.0.0.1", 8111);
UserService userService = nettyRpcClient3.proxy(UserService.class);
String userNameByCode = userService.getUserNameByCode("2");
System.out.println(userNameByCode);
二、使用Netty实现IM系统
(一)IM系统简介
IM(Instant Messaging,即时通讯)又叫做实时通信,是如今最流行的通讯方式,可以让两人或多人通过网络传递文字、文件、图片、语音等消息。例如微信、QQ、钉钉等。
IM单聊过程:所有的用户都需要登录鉴权,鉴权通过后,用户A向聊天服务器发送消息,聊天服务器将聊天消息发送给用户B。
IM单聊步骤和指令:
IM群聊过程:
IM群聊步骤和指令
IM和Netty:
客户端实现逻辑:客户端创建指令并对指令进行编码,然后将指令发送到IM服务器,IM服务器将指令发送到客户端,客户端进行粘包拆包处理后进行指令解码。
服务端实现逻辑:与客户端类似,接收到客户端的请求后,进行粘包拆包处理并解码指令,然后处理指令,最后重新创建指令并对指令编码后返回给客户端。
(二)Pipeline与ChannelHandler
1、管道过滤器
管道-过滤器模式:通过Pipeline+Filter的方式形成一个闭环
Netty中的管道-过滤器:
一个Channel对应一个ChannelPipeline,一个ChannelPipeline对应对个ChannelHandler,使用ChannelHandlerContext在多个ChannelHandler中传递数据。
ChannelHandler:
ChannelHandler是一个接口,对应ChannelInboundHandler和ChannelOutboundHandler两个进口,分别表示读和写,但是一个类实现一个接口时,还是要实现所有的方法,因此Netty提供了ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter两个适配器,通过适配器,简化通道的开发。
2、代码样例
定义一个 ChannelInboundHandler:
// 定义InboundHandler
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
......
}
}
配置 ChannelInboundHandler
bootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<niosocketchannel>(){
@Override
protected void initChannel(NioSocketChannel ch)throws Exception{
//处理读数据逻辑
ch.pipeline().addLast(new InboundHandler1()); // 事件通过Pipeline传播
......
}
});
定义一个 ChannelOutboundHandler:
// 定义OutboundHandler
public class OutboundHandler1 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
......
}
}
配置 ChannelOutboundHandler
bootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<niosocketchannel>(){
@Override
protected void initChannel(NioSocketChannel ch)throws Exception{
//处理读数据逻辑
ch.pipeline().addLast(new InboundHandler1());
......
//处理写数据逻辑
ch.pipeline().addLast(new OutboundHandler1()); // 事件通过Pipeline传播
......
}
});
3、Netty内置ChannelHandlerAdapter:
(1)MessageToByteEncoder:
public abstract class MessageToByteEncoder<i> extends ChannelOutboundHandlerAdapter {
// 将自定义对象转换为ByteBuf
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf byteBuf) throws Exception;
}
自定义Handler继承MessageToByteEncoder
public class PacketEncoder extends MessageToByteEncoder<mymessage> {
@Override
protected void encode(ChannelHandlerContext ctx, MyMessage message, ByteBuf byteBuf) throws Exception {
// 完成自定义编码
ByteBuf byteBuf = ...
}
}
(2)ByteToMessageDecoder:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
// 将ByteBuf转换为自定义对象
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<object> out) throws Exception;
}
自定义Handler继承ByteToMessageDecoder
public class PacketDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {
//完成自定义解码
MyMessage message = ...;
list.add(message);
}
}
(3)SimpleChannelInboundHandler:
public abstract class SimpleChannelInboundHandler<i> extends ChannelInboundHandlerAdapter {
protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
}
自定义Handler继承SimpleChannelInboundHandler
public class LoginResponseHandler extends SimpleChannelInboundHandler<mymessage> {
// 和ChannelInboundHandlerAdapter的区别:该方法会自动释放系统资源
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MyMessage message) throws Exception {
//针对MyMessage处理登录逻辑
}
}
4、自定义Handler:AuthenticationHandler:
public class AuthenticationHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//如果认证通过则继续执行,否则直接断开连接
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//打印日志
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//执行下线
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//异常时断开连接
}
}
(三)IM单聊的原理与实现
1、设计通信协议
为了实现定制化和扩展性,通常都需要定义一套私有协议,例如Dubbo框架。而对于IM系统来说,也是需要自定义协议,或者说自定义数据结构。
IM自定义协议组成结构:魔数(4字节,区别协议) + 版本号(1字节) + 序列化(1字节) + 指令(1字节,表示登录、登出、聊天) + 数据长度(4字节) + 数据(N字节),通常情况下我们可以将自定义协议部分编码到一个字节对象中进行传递。
根据上述定义,首先需要定义编解码器。
(1)定义指令类型
public interface Command {
/**
* 心跳包
*/
final Byte HEART_BEAT = 0;
/**
* 登录请求
*/
final Byte LOGIN_REQUEST = 1;
/**
* 登录响应
*/
final Byte LOGIN_RESPONSE = 2;
/**
* 消息请求
*/
final Byte MESSAGE_REQUEST = 3;
/**
* 消息响应
*/
final Byte MESSAGE_RESPONSE = 4;
/**
* 默认错误
*/
final Byte DEFAULT_ERROR = 127;
}
(2)定义序列化接口
public interface Serializer {
/**
* 序列化算法
*/
byte getSerializerAlgorithm();
/**
* 将对象序列化成二进制
*
*/
byte[] serialize(Object object);
/**
* 将二进制反序列化为对象
*/
<t> T deSerialize(Class<t> clazz, byte[] bytes);
}
(3)定义不同序列化标识:这里暂时先只实现基于Json的序列化
public interface SerializeAlgorithm {
/**
* json序列化标识
*/
byte json = 1;
}
(4)定义Json序列化实现类
序列化类型为Json,序列化和反序列化使用FastJson进行处理。
public class JsonSerializer implements Serializer {
@Override
public byte getSerializerAlgorithm() {
return SerializeAlgorithm.json;
}
@Override
public byte[] serialize(Object object) {
return JSON.toJSONBytes(object);
}
@Override
public <t> T deSerialize(Class<t> clazz, byte[] bytes) {
return JSON.parseObject(bytes,clazz);
}
}
(5)定义数据包基类
定义了版本号,以及定义了抽象的方法getCommand,用以获取指令类型
@Data
public abstract class Packet {
private Byte version = 1;
public abstract Byte getCommand();
}
(6)定义编解码器
首先使用两个 map,一个存储不同指令对应的数据结构,一个存储序列化类型对应的序列化实现类。
按照上述 IM 自定义协议组成结构:魔数(4字节) + 版本号(1字节) + 序列化(1字节) + 指令(1字节) + 数据长度(4字节) + 数据(N字节),在序列化方法 encode 中将数据包转换为ByteBuf,在反序列化方法 decode 中,根据指令类型将ByteBuf转换为对应的数据包。
public class PacketCodeC {
/**
* 魔数
*/
public static final int MAGIC_NUMBER = 0x88888888;
public static PacketCodeC instance = new PacketCodeC();
/**
* 采用单例模式
*/
public static PacketCodeC getInstance() {
return instance;
}
private static final Map<byte, class<?="" extends="" packet="">> packetTypeMap;
private static final Map<byte, serializer=""> serializerMap;
static {
packetTypeMap = new HashMap<byte, class<?="" extends="" packet="">>();
packetTypeMap.put(Command.DEFAULT_ERROR, DefaultErrorPacket.class);
//packetTypeMap.put(Command.HEART_BEAT, HeartBeatPacket.class);
packetTypeMap.put(Command.LOGIN_REQUEST, LoginRequestPacket.class);
packetTypeMap.put(Command.LOGIN_RESPONSE, LoginResponsePacket.class);
packetTypeMap.put(Command.MESSAGE_REQUEST, MessageRequestPacket.class);
packetTypeMap.put(Command.MESSAGE_RESPONSE, MessageResponsePacket.class);
serializerMap = new HashMap<byte, serializer="">();
Serializer serializer = new JsonSerializer();
serializerMap.put(serializer.getSerializerAlgorithm(), serializer);
}
private PacketCodeC() {
}
/**
* 默认使用json序列化,如需修改,可以使用setSerializer(serializer)
*/
private static Serializer serializer = new JsonSerializer();
/**
* 编码
* <p>
* 魔数(4字节) + 版本号(1字节) + 序列化算法(1字节) + 指令(1字节) + 数据长度(4字节) + 数据(N字节)
*/
public ByteBuf encode(ByteBufAllocator alloc, Packet packet) {
//创建ByteBuf对象
ByteBuf buf = alloc.ioBuffer();
return encode(buf, packet);
}
public ByteBuf encode(ByteBuf buf, Packet packet) {
//序列化java对象
byte[] objBytes = serializer.serialize(packet);
//实际编码过程,即组装通信包
//魔数(4字节) + 版本号(1字节) + 序列化算法(1字节) + 指令(1字节) + 数据长度(4字节) + 数据(N字节)
buf.writeInt(MAGIC_NUMBER);
buf.writeByte(packet.getVersion());
buf.writeByte(serializer.getSerializerAlgorithm());
buf.writeByte(packet.getCommand());
buf.writeInt(objBytes.length);
buf.writeBytes(objBytes);
return buf;
}
public ByteBuf encode(Packet packet) {
return encode(ByteBufAllocator.DEFAULT, packet);
}
/**
* 解码
* </p><p>
* 魔数(4字节) + 版本号(1字节) + 序列化算法(1字节) + 指令(1字节) + 数据长度(4字节) + 数据(N字节)
*/
public Packet decode(ByteBuf buf) {
//魔数校验(暂不做)
buf.skipBytes(4);
//版本号校验(暂不做)
buf.skipBytes(1);
//序列化算法
byte serializeAlgorithm = buf.readByte();
//指令
byte command = buf.readByte();
//数据长度
int length = buf.readInt();
//数据
byte[] dataBytes = new byte[length];
buf.readBytes(dataBytes);
Class packetType = getRequestType(command);
Serializer serializer = getSerializer(serializeAlgorithm);
if (packetType != null && serializer != null) {
return serializer.deSerialize(packetType, dataBytes);
}
return null;
}
/**
* 根据序列化算法获取对应的serializer
*
* @param serializeAlgorithm
* @return serializer
*/
private Serializer getSerializer(byte serializeAlgorithm) {
return serializerMap.get(serializeAlgorithm);
}
/**
* 根据指令类型获取对应的packet
*
* @param command
* @return packet
*/
private Class getRequestType(byte command) {
return packetTypeMap.get(command);
}
/**
* 设置序列化方式
*/
public static void setSerializer(Serializer serializer) {
PacketCodeC.serializer = serializer;
}
}
2、业务场景设计:登录
客户端构建登录对象,将其序列化,然后通过通道将数据发送到服务端;
服务端收到数据后,反序列化,校验登录,然后构建登录响应对象,将其序列化,然后通过通道将数据发送到客户端;
客户端收到数据后,反序列化,处理响应结果。
(1)登录对象
主要是用户ID、用户名、密码,同时设置其对应的指令,即登录请求
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LoginRequestPacket extends Packet {
private String userId;
private String userName;
//private String password;
@Override
public Byte getCommand() {
return Command.LOGIN_REQUEST;
}
}
(2)客户端登录请求Handler:LoginHandler
作用是在客户端就绪时,构建登录对象,然后将数据发送到服务端
@Slf4j
public class LoginHandler extends ChannelInboundHandlerAdapter {
private String userId;
private String userName;
public LoginHandler(String userId, String userName){
this.userId = userId;
this.userName = userName;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 构建登录对象
LoginRequestPacket loginRequestPaket = new LoginRequestPacket();
loginRequestPaket.setUserId(userId);
loginRequestPaket.setUserName(userName);
log.info("用户{}登录中......", userId);
ctx.channel().writeAndFlush(loginRequestPaket);
}
}
(3)结果对象基类对象
由于结果对象都有状态码、信息等内容,因此定义一个统一的结果对象基类
@Data
public abstract class BaseReponsePacket extends Packet {
/**
* 返回状态码,0000-成功
*/
private String code = "0000";
/**
* 返回消息
*/
private String msg;
/**
* 判断是否操作成功
* @return
*/
public boolean success(){
return "0000".equals(code);
}
}
(4)登录结果对象
返回内容就是基类的状态码和消息体,而指令对应的是登录结果。
public class LoginResponsePacket extends BaseReponsePacket {
@Override
public Byte getCommand() {
return Command.LOGIN_RESPONSE;
}
}
(5)服务端处理登录请求:服务端LoginRequestHandler
主要是校验用户是否已经登陆,登录成功则设置用户登录状态,绑定Session。
@Slf4j
public class LoginRequestHandler extends SimpleChannelInboundHandler<loginrequestpacket> {
private LoginRequestHandler(){}
private static LoginRequestHandler instance = new LoginRequestHandler();
public static LoginRequestHandler getInstance(){
return instance;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LoginRequestPacket loginRequestPaket) throws Exception {
LoginResponsePacket responsePaket = login(channelHandlerContext, loginRequestPaket);
channelHandlerContext.channel().writeAndFlush(responsePaket);
}
private LoginResponsePacket login(ChannelHandlerContext ctx, LoginRequestPacket paket) {
LoginResponsePacket response = new LoginResponsePacket();
if(checkLogin(ctx, paket)){
log.info("用户{}登录成功", paket.getUserId());
response.setCode("0000");
response.setMsg("登录成功");
// 设置用户登录状态
LoginUtil.markAsLogin(ctx.channel());
// 绑定 Session 和 Channel 的关系
SessionUtil.bindSession(new Session(paket.getUserId(), paket.getUserName()), ctx.channel());
}else {
log.info("用户{}登录失败", paket.getUserId());
response.setCode("1001");
response.setMsg("登录失败");
}
return response;
}
private boolean checkLogin(ChannelHandlerContext ctx, LoginRequestPacket paket) {
return !SessionUtil.hasLogin(ctx.channel());
}
}
(6)服务端发送登录响应:服务端LoginRequestHandler
@Slf4j
public class LoginResponseHandler extends SimpleChannelInboundHandler<loginresponsepacket> {
private LoginResponseHandler(){}
private static LoginResponseHandler instance = new LoginResponseHandler();
public static LoginResponseHandler getInstance(){
return instance;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LoginResponsePacket loginResponsePaket) throws Exception {
if(loginResponsePaket.success()){
log.info("登录成功");
LoginUtil.markAsLogin(channelHandlerContext.channel());
}else {
log.info("登录失败:{}", loginResponsePaket.getMsg());
}
}
}
3、Session管理
上面提到了如果用户登录成功,则绑定 Session 和 Channel 的关系,那么就需要提供对应的处理。
(1)定义Session
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Session {
private String userId;
private String userName;
}
(2)Session工具类
提供一个Map,存储用户和channel的对应关系;
同时提供绑定Session和解除绑定的方法,在绑定方法中,将用户id和channel放入Map中,同时设置channel的session属性为当前Session;
提供根据session获取channel、根据用户id获取channel、根据channel获取session等方法;
提供判断是否已登录的hasLogin方法,在该方法中,从channel的session属性中获取对应的Session,如果Session不为空,则说明已经登陆。
public interface Attributes {
/**
* 登录状态,使用channel.attr()方法保存
*/
AttributeKey<boolean> LOGIN = AttributeKey.newInstance("login");
AttributeKey<session> SESSION = AttributeKey.newInstance("session");
}
public class SessionUtil {
private static final Map<string, channel=""> sessionMap = new ConcurrentHashMap<>();
public static void bindSession(Session session, Channel channel){
sessionMap.putIfAbsent(session.getUserId(), channel);
channel.attr(Attributes.SESSION).set(session);
}
public static void unBindSession(Session session, Channel channel){
sessionMap.remove(session.getUserId());
channel.attr(Attributes.SESSION).remove();
}
public static Channel getChannelBySession(Session session) {
return sessionMap.get(session.getUserId());
}
public static Map<string, channel=""> getAllSession() {
return sessionMap;
}
public static Channel getChannelByUserId(String userId){
return sessionMap.get(userId);
}
public static Session getSessionByChannel(Channel channel){
return channel.attr(Attributes.SESSION).get();
}
public static boolean hasLogin(Channel channel) {
Attribute<session> login = channel.attr(Attributes.SESSION);
// 只要标志位不为空,即表示登录过
if(login.get() != null){
return true;
}
return false;
}
}
4、登录状态管理
提供标记登录的方法markAsLogin,在该方法中设置channel的login属性为true;
提供判断是否登录的方法hasLogin,从channel中获取属性login对应的值,如果不为空则说明已经登录。
public class LoginUtil {
public static void markAsLogin(Channel channel){
channel.attr(Attributes.LOGIN).set(true);
}
public static boolean hasLgin(Channel channel) {
Attribute<boolean> login = channel.attr(Attributes.LOGIN);
if(login.get() != null){
return true;
}
return false;
}
}
5、消息收发
消息收发和登录操作一样处理,把登录对象转换为消息对象即可。
(1)消息请求对象
@Data
public class MessageRequestPacket extends Packet {
/**
* 消息内容
*/
private String message;
/**
* 消息接受者
*/
private String toUserId;
@Override
public Byte getCommand() {
return Command.MESSAGE_REQUEST;
}
}
(2)消息响应对象
@Data
public class MessageResponsePacket extends BaseReponsePacket {
/**
* 响应内容
*/
private String message;
/**
* 消息来源
*/
private String fromUserId;
private String fromUserName;
@Override
public Byte getCommand() {
return Command.MESSAGE_RESPONSE;
}
}
(3)处理请求handler
根据channel获取session信息,从session中获取消息来源,从消息体中获取消息接收方,然后根据接收用户id获取对应channel,并传递数据。
@Slf4j
public class MessageRequestHandler extends SimpleChannelInboundHandler<messagerequestpacket> {
private MessageRequestHandler(){}
private static MessageRequestHandler instance = new MessageRequestHandler();
public static MessageRequestHandler getInstance(){
return instance;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageRequestPacket messageRequestPacket) throws Exception {
log.info("收到来自客户端的消息: {}", messageRequestPacket.getMessage());
// 获取Session
Session session = SessionUtil.getSessionByChannel(channelHandlerContext.channel());
String fromUserId = session.getUserId();
String frromUserName = session.getUserName();
// 构造响应体
MessageResponsePacket response = new MessageResponsePacket();
response.setMessage(messageRequestPacket.getMessage());
response.setFromUserId(fromUserId);
response.setFromUserName(frromUserName);
// 发送聊天消息
p2pChat(channelHandlerContext, messageRequestPacket.getToUserId(), response);
}
private void p2pChat(ChannelHandlerContext channelHandlerContext, String toUserId, MessageResponsePacket response) {
Channel channel = SessionUtil.getChannelByUserId(toUserId);
if(channel != null && LoginUtil.hasLgin(channel)){
channel.writeAndFlush(response);
}else {
DefaultErrorPacket defaultErrorPacket = new DefaultErrorPacket();
defaultErrorPacket.setCode("3001");
defaultErrorPacket.setMsg("该用户没有登录,无法发送消息");
channel.writeAndFlush(defaultErrorPacket);
}
}
}
(4)处理响应handler
输出收到的消息即可。
@Slf4j
public class MessageResponseHandler extends SimpleChannelInboundHandler<messageresponsepacket> {
private MessageResponseHandler(){}
private static MessageResponseHandler instance = new MessageResponseHandler();
public static MessageResponseHandler getInstance(){
return instance;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageResponsePacket messageResponsePacket) throws Exception {
log.info("收到{}的消息:{}", messageResponsePacket.getFromUserId(), messageResponsePacket.getMessage());
}
}
6、单聊场景客户端和服务端Pipeline
对于整个单聊场景,首先客户端登录,然后通过编码器,
(1)客户端Pipeline
主要就是添加编解码处理器PacketCodecHandler、登录处理器LoginHandler、登录响应处理器LoginResponseHandler和消息处理器MessageResponseHandler
@Slf4j
public class ClientTest {
public static void main(String[] args) {
start();
}
private static String userId = "101";
private static String userName = "lcl";
private static String host = "127.0.0.1";
private static int port = 8886;
public static void start(){
NioEventLoopGroup work = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(work).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<niosocketchannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(PacketCodecHandler.getInstance());
nioSocketChannel.pipeline().addLast(new LoginHandler(userId, userName));
nioSocketChannel.pipeline().addLast(LoginResponseHandler.getInstance());
nioSocketChannel.pipeline().addLast(MessageResponseHandler.getInstance());
}
});
ChannelFuture future = bootstrap.connect(host, port).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
log.info("连接服务器成功");
}else {
log.info("连接服务器失败");
channelFuture.cause().printStackTrace();
System.exit(0);
}
}
});
try {
future.channel().closeFuture().sync();
log.info("与服务器断开连接!");
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
(2)服务端Pipeline
主要是添加编解码处理器PacketCodecHandler、登录请求处理器LoginRequestHandler和消息发送请求处理器MessageRequestHandler
@Slf4j
public class Server {
private static int port = 8886;
public static void start(){
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<niosocketchannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(PacketCodecHandler.getInstance());
nioSocketChannel.pipeline().addLast(LoginRequestHandler.getInstance());
nioSocketChannel.pipeline().addLast(MessageRequestHandler.getInstance());
}
});
ChannelFuture future = bootstrap.bind(port);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
log.info("server 启动成功,端口号:{}", port);
}else {
log.info("server 启动失败,端口号:{}", port);
channelFuture.cause().printStackTrace();
System.exit(0);
}
}
});
}
}
(四)客服系统案例演进
架构演进,抽离独立的IM服务,IM服务包括:im-server(IM服务端)、im-common(IM通用组件)、im-client(IM客户端)
im-common模块包含:IM单聊的原理与实现中通信协议、Session管理、登录状态管理;
im-client模块包含:客户端Pipeline以及登录处理器LoginHandler、登录结果处理器LoginResponseHandler、消息响应处理器MessageResponseHandler
im-server模块包括:服务端Pipeline、登录请求处理器LoginRequestHandler、消息请求处理器MessageRequestHandler
三、Netty可靠性和性能优化
(一)Netty可靠性分析和实现
Netty作为网络通信框架,无论是客户端自身问题还是网络问题,都可能出现网络通信异常情况的情况。
对于这种情况,需要进行进行空闲检测,如果检测出异常,可以关闭channel,或者发送邮件等进行告警。
Netty本身提供了空闲检测处理类IdleStateHandler,基于IdleStateHandler,不做处理Netty也可以做相关检测处理,但是一般情况下,为了做一些定制化处理,会实现自己的处理类。
1、服务端空闲检测
(1)自定义空闲检测处理类
创建自定义空闲检测处理类ServerIdleHandler并继承Netty自带的处理类IdleStateHandler,在构造函数中设置读空闲、写空闲、读写空闲,如下所示,设置读写空闲为150ms。
重写channelIdle方法,如果超过读写空闲阈值,则会触发IdleStateEvent事件,即执行channelIdle方法,在该方法中关闭channel。
// 服务端实现空闲检测,客户端实现方式类似
public class ServerIdleHandler extends IdleStateHandler {
private static int HERT_BEAT_TIME = 150;
// 参数设置:读空闲、写空闲、读写空闲
public ServerIdleHandler() {
super(0, 0, HERT_BEAT_TIME);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
//系统出现问题,关闭连接
ctx.channel().close();
}
}
(2)添加ServerIdleHandler到channelPipeline
bootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<niosocketchannel>(){
@Override
protected void initChannel(NioSocketChannel ch)throws Exception{
//放在Pipeline的最前面
ch.pipeline().addLast(new ServerIdleHandler());
......
}
});
2、自定义客户端心跳处理类
创建客户端心跳处理类 ClientIdleHandler,继承 IdleStateHandler 实现类,如果超过 50ms 没有读写操作,则发送一次心跳。
public class ClientIdleHandler extends IdleStateHandler {
private static Logger logger = LoggerFactory.getLogger(ClientIdleHandler.class);
private static final int HEART_BEAT_TIME = 50;
public ClientIdleHandler() {
super(0, 0, HEART_BEAT_TIME);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
logger.info("发送心跳....");
ctx.writeAndFlush(new HeartBeatPacket());
}
}
3、自定义服务端回复心跳Handler
创建服务端回复心跳处理类 HeartBeatHandler,在读取到心跳包之后,回复客户端收到心跳。
public class HeartBeatHandler extends SimpleChannelInboundHandler<heartbeatpacket> {
private static Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, HeartBeatPacket heartBeatPacket) throws Exception {
logger.info("收到心跳包:{}", JSON.toJSONString(heartBeatPacket));
ctx.writeAndFlush(heartBeatPacket);
}
}
4、自定义客户端断线重连处理类
创建自定义客户端断线重连处理类ReConnectHandler,重新userEventTriggered方法和channelRead方法。
如果超时则会触发超时事件userEventTriggered,在该方法中,当状态为 READER_IDLE 时,进行重连。
public class ReConnectHandler extends ChannelInboundHandlerAdapter {
private int retryCount;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
if (++retryCount > 3) {
// 比较重试次数,执行重连操作
closeAndReconnection(ctx.channel());
} else {
ctx.writeAndFlush(MyHeartbeat.getHeartbeatPingBuf());
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
retryCount = 0;
// 重置重试次数
super.channelRead(ctx, msg);
}
}
(二)Netty性能优化方法
1、避免线程阻塞
Netty性能优化最大的切入点就是避免线程阻塞操作,Netty本身是Nio,对于业务线程操作本质上是阻塞的,如果阻塞处理,就会影响其他请求的处理。
例如下面的代码,如果在handler中执行数据库持久化,则会同步阻塞主线程的操作。
@Override
protected void channelRead0(ChannelHandlerContext ctx, T msg) throws Exception {
//针对消息的业务处理
//执行数据库持久化(同步阻塞主线程的耗时操作)
ctx.writeAndFlush(msg);
//执行其他业务处理
}
减少阻塞主线程的操作,可以通过使用线程池或者使用消息中间件的方式处理。
(1)通过线程池进行异步化
@Override
protected void channelRead0(ChannelHandlerContext ctx, T msg) throws Exception {
threadPool.submit(new Runable() {
//针对消息的业务处理
//执行数据库持久化
ctx.writeAndFlush(msg);
//执行其他业务处理
});
}
(2)通过消息中间件进行异步化
@Override
protected void channelRead0(ChannelHandlerContext ctx, T msg) throws Exception {
//创建事件
MyEvent event = createEvent(msg);
//生成消息对象
Message<string> message = MessageBuilder.withPayload(event).build();
//发送信息
rocketMQTemplate.sendMessage("event_group", "topic_chat", message, null);
}
2、其他优化
对于其他优化,可以使用共享Handler、合并编解码器、合并平行Handler等优化手段。
(1)共享Handler
在Netty中,一个Channel新建立时会执行initHandler方法并组装Pipeline,如果客户端太多,就会创建很多对应的handler。为了节省创建成本,对于无状态类,可以使用单例模式可以节省类的创建成本。
如下代码所示,其使用了单例模式创建MyMessageHandler,并使用@ChannelHandler.Sharable标注其是一个可以共享的handler
// 设置该Handler为全局共享
@ChannelHandler.Sharable
public class MyMessageHandler extends SimpleChannelInboundHandler<mymessage> {
// 单例模式
private static MyMessageHandler instance = new MyMessageHandler();
public static MyMessageHandler getInstance() {
return instance;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) throws Exception {
......
}
}
在构建pipeline时,就可以使用MyMessageHandler.getInstance()添加该handler。
nioSocketChannel.pipeline().addLast(MyMessageHandler.getInstance());
(2)合并编解码器
对于编解码器,可以将其合二为一,这样在添加编解码器时,只需要添加一个即可。
@ChannelHandler.Sharable
public class PacketCodecHandler extends MessageToMessageCodec<bytebuf, packet=""> {
private static PacketCodecHandler instance = new PacketCodecHandler();
public static PacketCodecHandler getInstance() {
return instance;
}
protected void encode(ChannelHandlerContext ctx, Packet packet, List<object> list) throws Exception {
ByteBuf byteBuf = ctx.channel().alloc().ioBuffer();
// 合并编解码器
PacketCodeC.getInstance().encode(byteBuf, packet);
list.add(byteBuf);
}
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<object> list) throws Exception {
// 合并编解码器
list.add(PacketCodeC.getInstance().decode(buf));
}
}
(3)合并平行Handler
对于平行handler,即职责平行的handler,可以合并为一个handler,然后将平行handler放入map中,然后根据不同的指令获取不同的handler来处理。
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<mymessage> {
private static ServerHandler instance = new ServerHandler();
public static ServerHandler getInstance() {
return instance;
}
private static Map<byte, simplechannelinboundhandler<?="" extends="" mymessage="">> handlerMap = new ConcurrentHashMap<>();
static {
handlerMap.putIfAbsent(CASE1, Case1Handler.getInstance());
handlerMap.putIfAbsent(CASE2, Case2Handler.getInstance());
handlerMap.putIfAbsent(CASE3, Case3Handler.getInstance());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) throws Exception {
// 合并Handler,减少对象创建过程
SimpleChannelInboundHandler handler = handlerMap.get(msg.getCommand());
handler.channelRead(ctx, msg);
}
}
(4)添加日志监控
在Netty中,writeAndFlush方法是一个异步操作,对于获取时间操作,可以添加监听,在监听中计算执行时间。
@Override
protected void channelRead0(ChannelHandlerContext ctx, T msg) throws Exception {
Long startTime = System.currentTimeMillis();
//针对消息的业务处理
//执行数据库持久化
ctx.writeAndFlush(msg).addListener(future -> { // 在Listener中获取最终结果
if (future.isDone()) {
//执行其他业务处理
Long timeConsumed = System.currentTimeMillis() - startTime;
}
});
}
(三)客服系统案例演进
优化策略:添加IM聊天可靠性机制、提升组件运行时性能
1、添加IM聊天可靠性机制
(1)自定义心跳处理
如果150秒没有读写操作,则关闭channel通道。
@Slf4j
public class ServerIdelHandler extends IdleStateHandler {
private static final int HEAT_BEAT_TIME = 150;
public ServerIdelHandler() {
super(0, 0, HEAT_BEAT_TIME);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
log.warn("{} 时间内没有收到心跳", HEAT_BEAT_TIME);
ctx.channel().close();
}
}
(2)定义心跳包
@Data
public class HeartBeatPacket extends Packet {
private String msg = "heart-beat";
@Override
public Byte getCommand() {
return Command.HEART_BEAT;
}
}
(3)客户端发送心跳handler
如果50秒没有读写操作,则发送心跳请求。
@Slf4j
public class ClientIdleHandler extends IdleStateHandler {
private static final int HEAT_BEAT_TIME = 50;
public ClientIdleHandler() {
super(0, 0, HEAT_BEAT_TIME);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
log.warn("发送心跳");
ctx.channel().writeAndFlush(new HeartBeatPacket());
}
}
(4)服务端处理心跳handler
服务端收到心跳处理,则返回心跳包给客户端。
@Slf4j
public class HeartBeatHandler extends SimpleChannelInboundHandler<heartbeatpacket> {
private HeartBeatHandler(){}
private static HeartBeatHandler instance = new HeartBeatHandler();
public static HeartBeatHandler getInstance(){
return instance;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HeartBeatPacket heartBeatPacket) throws Exception {
log.info("收到心跳包:{}", JSON.toJSONString(heartBeatPacket));
channelHandlerContext.channel().writeAndFlush(heartBeatPacket);
}
}
(5)客户端添加心跳包处理和心跳处理
bootstrap.group(work).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<niosocketchannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ServerIdelHandler());
nioSocketChannel.pipeline().addLast(PacketCodecHandler.getInstance());
nioSocketChannel.pipeline().addLast(new ClientIdleHandler());
nioSocketChannel.pipeline().addLast(new LoginHandler(userId, userName));
nioSocketChannel.pipeline().addLast(LoginResponseHandler.getInstance());
nioSocketChannel.pipeline().addLast(MessageResponseHandler.getInstance());
}
});
(6)服务端添加心跳包处理和心跳处理
bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<niosocketchannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ServerIdelHandler());
nioSocketChannel.pipeline().addLast(PacketCodecHandler.getInstance());
nioSocketChannel.pipeline().addLast(LoginRequestHandler.getInstance());
nioSocketChannel.pipeline().addLast(HeartBeatHandler.getInstance());
nioSocketChannel.pipeline().addLast(MessageRequestHandler.getInstance());
}
});
2、提升组件运行时性能
主要就是将无状态处理类用单例处理。
@ChannelHandler.Sharable
public class PacketCodecHandler extends MessageToMessageCodec<bytebuf, packet=""> {
private PacketCodecHandler(){}
private static PacketCodecHandler instance = new PacketCodecHandler();
public static PacketCodecHandler getInstance(){
return instance;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Packet packet, List<object> list) throws Exception {
ByteBuf byteBuf = channelHandlerContext.channel().alloc().ioBuffer();
PacketCodeC.getInstance().encode(byteBuf, packet);
list.add(byteBuf);
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<object> list) throws Exception {
list.add(PacketCodeC.getInstance().decode(byteBuf));
}
}
3、数据存储
(1)模拟消息发送
通过调用Controller来模拟消息发送,在消息发送时存储消息数据。
这里只是模拟,因为数据库操作比较耗时,一般对于这种操作不会放在Netty的handler中处理(原因上面已经说明),一般会采用异步处理,这里暂时先放在Controller中处理,下面会进行优化。
@RestController
@RequestMapping("/p2p")
public class P2PChatController {
@Autowired
private ImMessageService imMessageService;
@PostMapping(value = "/chat")
public ChatResponse p2pChat(@RequestBody P2PChatRequest request) {
ChatResponse response = new ChatResponse();
String userId = request.getToUserId();
Channel channel = SessionUtil.getChannelByUserId(userId);
if (channel == null) {
response.setCode("4001");
response.setMsg(userId + "没有登录,无法向其发送即时消息!");
return response;
}
Session session = SessionUtil.getSessionByChannel(channel);
MessageResponsePacket responsePacket = new MessageResponsePacket();
responsePacket.setFromUserName(session.getUserName());
responsePacket.setFromUserId(request.getFromUserId());
responsePacket.setMessage(request.getMsg());
//向客户端写入数据
ChannelUtil.writeAndFlush(channel,responsePacket);
// 在业务层执行数据持久化操作
saveImMessage(request);
return response;
}
private void saveImMessage(P2PChatRequest request) {
ImMessage imMessage = new ImMessage();
imMessage.setFromUserId(request.getFromUserId())
.setFromUsername(request.getFromUserId())
.setToUserId(request.getToUserId())
.setToUsername(request.getToUserId())
.setMessage(request.getMsg());
imMessageService.saveImMessage(imMessage);
}
}
(2)数据存储
为了操作数据库不影响性能,这里使用线程池的方式操作。
@Service
public class ImMessageServiceImpl extends ServiceImpl<immessagemapper, immessage=""> implements ImMessageService {
ExecutorService pool = Executors.newFixedThreadPool(2);
@Override
public void saveImMessage(ImMessage imMessage) {
pool.submit(new ImMessageTask(imMessage));
}
class ImMessageTask implements Runnable {
private ImMessage imMessage;
public ImMessageTask(ImMessage imMessage){
this.imMessage = imMessage;
}
@Override
public void run() {
save(imMessage);
}
}
}
四、打造高伸缩性IM系统
(一)IM系统伸缩性设计方案
1、扩展性思考
上面的代码,所有客户端都是和单个服务端通信,这样服务器就会成为系统性能瓶颈,这样就要考虑系统的伸缩性。
如何实现 IM 架构中的伸缩性问题:
服务端:对于服务端而言,需要集群部署,同时要可以动态伸缩(动态新增服务器、自动剔除服务器)
客户端/服务端长连接:对于Netty来说,肯定都是长链接,客户端会选择一台服务器创建长连接,创建长链接的条件是客户端需要知道服务端地址列表;所以对于客户端来说,有一个服务获取的过程,有一个服务绑定(和服务端创建长链接)的过程。
路由层:随着客户端和服务端越来越多,如何将客户端和服务端的绑定关系更加友好的把握起来,一般会使用一个中间层(路由层)来处理,客户端IM消息发送请求不是直接到服务端,而是通过路由层转发到服务端。
另外客户端与服务端交互首先要登录,登录状态在哪维护是一个需要考虑的问题,如果服务端维护,就需要考虑整个服务端集群通信保证服务端信息同步,这个比较难处理,如果存在路由层,路由层由于是一个无状态服务,那么无论谁登录路由层都可以获取相关的登录状态,这样路由层就可以无限扩展。
这样当客户端访问路由层时,路由层可以告诉客户端其可以访问哪个服务端服务器,这样整个请求就很高效,就可以做到很好的扩展,对于登录等信息,还可以放在三方容器中,例如Redis、MongoDB等,这样就又可以做一些分布式的扩展。
2、登录和请求转发思路
根据上述分析,客户端首先访问路由层,路由层做登录和请求的转发。
对于登录请求,那么就需要知道请求的用户信息,远程服务器的ip和端口,而对于消息发送,主要是需要传递消息来源人、消息接收人、以及消息本身。
public class LoginRequest {
private String userid;
private String username;
private String serverHost;
private int nettyPort;
private int httpPort;
}
public class ChatRequest {
private String fromUserId;
private String toUserId;
private String msg;
}
3、整体架构:
客户端只会与一个服务器绑定,但是有多个服务路由层,路由层做了数据梳理和业务逻辑处理,这样客户端和服务端主要做的就是消息收发通道,不作为业务逻辑的处理。
这样的架构边界清晰,性能好,扩展性好。
(二)IM系统伸缩性实现技术
对于上述架构,需要做系统选型。
服务端:需要引入注册中心,使用其服务注册和发现机制。
路由层:路由层是一个无状态的HTTP服务,那么就可以使用各种web请求技术。
客户端/服务端长连接:路由层一般会使用一个分布式缓存来维护客户端登录状态,这样路由层也就不存在性能瓶颈,因为路由层的本质是转发,要求就是快。
运行流程:
首先服务端注册到注册中心
客户端通过路由器获取服务列表,而路由器通过注册中心获取服务端列表,然后通过一定的算法指定一个服务器
客户端启动并绑定服务端
(三)客服系统案例演进
之前是客户端直接连接服务端,由于增加了路由层,那么路由层就需要增加对应的Controller进行处理,主要有两条支线,一个是登录,一个是消息收发。
1、路由器服务:im-router
新增im-router服务作为理由层。
(1)登录
在路由层,对于登录而言,上面做的登录主要是对Netty的channel和Session做了一个绑定,而这里的登录主要是实际登录状态的存储和验证,是两个不同维度的处理。
对于业务处理来说,肯定不关心底层的channel等信息,而应该是一个loginController。
@Slf4j
@RestController
@RequestMapping("/login")
public class LoginController {
@Autowired
private LoginService loginService;
@PostMapping("/")
public IMLoginResponse login(@RequestBody IMLoginRequest request){
IMLoginResponse response = new IMLoginResponse();
if(loginService.hasLogin(request.getUserid())){
response.setCode("2001");
response.setMsg("重复登录");
return response;
}
loginService.login(request);
response.setCode("0000");
response.setMsg("登录成功");
log.info("用户: {} 登录成功", request.getUserid());
return response;
}
}
对于LoginService而言,一般会将登录信息存储在Redis等缓存中,本次先不做处理,直接Mock一个实现,存储在Map中。
@Service
public class MockLoginServiceImpl implements LoginService {
private static Map<string, imloginrequest=""> loginMap = new ConcurrentHashMap<>();
@Override
public void login(IMLoginRequest request) {
loginMap.putIfAbsent(request.getUserid(), request);
}
@Override
public boolean hasLogin(String userId) {
return loginMap.containsKey(userId);
}
@Override
public IMLoginRequest getLoginInfo(String userId) {
return loginMap.get(userId);
}
}
(2)获取服务信息
对于获取服务信息,可以使用注册中心获取服务列表,然后通过一定的算法选取一个服务。这里先不演示注册中心的处理,直接返回一个硬编码地址。
@Slf4j
@RestController
@RequestMapping("/serverInfo")
public class ServerInfoController {
@GetMapping("/")
public String getServiceInfo(){
// 模拟服务器信息
IMServerInfo imServerInfo = new IMServerInfo();
imServerInfo.setHost("127.0.0.1");
imServerInfo.setNettyPort(8886);
imServerInfo.setHttpPort(9001);
// 暂时先不用注册中心,模拟服务器地址
return imServerInfo.toString();
}
}
(3)消息发送
对于收发消息,首先要判断用户是否登录,如果已经登录成功,在发送消息。
@Slf4j
@RestController
@RequestMapping("/p2p")
public class P2pChatController {
@Autowired
private LoginService loginService;
@Autowired
private ChatService chatService;
@PostMapping("/chat")
public ChatResponse p2pChat(@RequestBody P2PChatRequest request){
ChatResponse response = new ChatResponse();
if(!loginService.hasLogin(request.getFromUserId())){
response.setCode("3001");
response.setMsg("请先登录");
return response;
}
if(!loginService.hasLogin(request.getToUserId())){
response.setCode("3002");
response.setMsg("对方请先登录");
return response;
}
response = chatService.p2pChat(request);
return response;
}
}
判断是否登录就是从map中查看是否已经存在用户id,这里既要判断消息发送的用户id,也要判断消息接收的用户id。
消息发送时,首先通过http请求获取用户的登录信息,然后根据登录信息中绑定的服务端ip和端口请求服务端。
@Service
public class ChatServiceImpl implements ChatService {
@Autowired
private LoginService loginService;
@Autowired
private RestTemplate restTemplate;
@Override
public ChatResponse p2pChat(P2PChatRequest request) {
// 先通过登录信息获取模板服务器地址
IMLoginRequest loginRequest = loginService.getLoginInfo(request.getToUserId());
// 向目标服务器地址发起聊天请求
ChatResponse response = restTemplate.postForObject("http://" + loginRequest.getServerHost() + ":" + loginRequest.getHttpPort() + "/p2p/chat", request, ChatResponse.class);
return response;
}
}
登录信息包括:用户id、用户名、绑定的服务端ip、服务端http端口、服务端netty端口
@Data
@AllArgsConstructor
@NoArgsConstructor
public class IMLoginRequest {
private String userid;
private String username;
private String serverHost;
private int nettyPort;
private int httpPort;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
2、服务端:im-server
服务端提供消息发送的接口,在实现中,主要就是获取消息接收的用户id,并获取其对应的channel,将消息写入该channel,同时保存数据。
@RestController
@RequestMapping("/p2p")
public class P2PChatController {
@Autowired
private ImMessageService imMessageService;
@PostMapping(value = "/chat")
public ChatResponse p2pChat(@RequestBody P2PChatRequest request) {
ChatResponse response = new ChatResponse();
String userId = request.getToUserId();
Channel channel = SessionUtil.getChannelByUserId(userId);
if (channel == null) {
response.setCode("4001");
response.setMsg(userId + "没有登录,无法向其发送即时消息!");
return response;
}
Session session = SessionUtil.getSessionByChannel(channel);
MessageResponsePacket responsePacket = new MessageResponsePacket();
responsePacket.setFromUserName(session.getUserName());
responsePacket.setFromUserId(request.getFromUserId());
responsePacket.setMessage(request.getMsg());
//向客户端写入数据
ChannelUtil.writeAndFlush(channel,responsePacket);
// 在业务层执行数据持久化操作
saveImMessage(request);
return response;
}
private void saveImMessage(P2PChatRequest request) {
ImMessage imMessage = new ImMessage();
imMessage.setFromUserId(request.getFromUserId())
.setFromUsername(request.getFromUserId())
.setToUserId(request.getToUserId())
.setToUsername(request.getToUserId())
.setMessage(request.getMsg());
imMessageService.saveImMessage(imMessage);
}
}
3、客户端:im-client
客户端主要分三步处理:
从Router获取ServiceInfo信息:调用路由器获取一个服务端节点
调用Router执行登录操作:调用路由器登录,即将登录信息存入路由器服务
与Server建立长链接:将当前客户端与获取到的服务端节点绑定
@Slf4j
public class ClientRouterTest1 {
public static void main(String[] args) {
start();
}
private static String userId = "101";
private static String userName = "lcl";
private static String routeHost = "127.0.0.1";
private static int routePort = 9003;
public static void start(){
// 1、从Router获取ServiceInfo信息
IMServerInfo imServerInfo = getIMServerInfoFromRouter(routeHost, routePort);
// 2、调用Router执行登录操作
loginRouter(imServerInfo);
// 3、与Server建立长链接
connectToServer(imServerInfo);
}
private static IMServerInfo getIMServerInfoFromRouter(String routeHost, int routePort){
RestTemplate restTemplate = new RestTemplate();
IMServerInfo imServerInfo = restTemplate.getForObject("http://" + routeHost + ":" + routePort + "/serverInfo/", IMServerInfo.class);
log.info("获取服务端信息:{}", imServerInfo);
return imServerInfo;
}
private static void loginRouter(IMServerInfo imServerInfo){
RestTemplate restTemplate = new RestTemplate();
IMLoginRequest loginRequest = new IMLoginRequest(userId, userName, imServerInfo.getHost(), imServerInfo.getNettyPort(), imServerInfo.getHttpPort());
IMLoginResponse response = restTemplate.postForObject("http://" + routeHost + ":" + routePort + "/login/", loginRequest, IMLoginResponse.class);
log.info("登录返回信息:{}", response);
if(response.success()){
log.info("登录成功");
}else {
log.info("登录失败:{},程序即将退出......", response.getMsg());
System.exit(0);
}
}
private static void connectToServer(IMServerInfo imServerInfo){
start(imServerInfo, userId, userName);
}
private static void start(IMServerInfo imServerInfo, String userId, String userName){
NioEventLoopGroup work = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(work).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<niosocketchannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ServerIdelHandler());
nioSocketChannel.pipeline().addLast(PacketCodecHandler.getInstance());
nioSocketChannel.pipeline().addLast(new ClientIdleHandler());
nioSocketChannel.pipeline().addLast(new LoginHandler(userId, userName));
nioSocketChannel.pipeline().addLast(LoginResponseHandler.getInstance());
nioSocketChannel.pipeline().addLast(MessageResponseHandler.getInstance());
}
});
ChannelFuture future = bootstrap.connect(imServerInfo.getHost(), imServerInfo.getNettyPort()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
log.info("连接服务器成功");
}else {
log.info("连接服务器失败");
channelFuture.cause().printStackTrace();
System.exit(0);
}
}
});
try {
future.channel().closeFuture().sync();
log.info("与服务器断开连接!");
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
4、测试
(1)多个客户端
为了模拟多个客户端通信,则再创建一个ClientRouterTest2,内容与ClientRouterTest1一致,除了用户ID和用户名
(2)模拟消息发送
手动调用路由器中的消息发送Controller
POST localhost:9003/p2p/chat
Content-Type: application/json
{
"fromUserId" : "101",
"toUserId" : "100",
"msg" : "你好"
}
</string,></immessagemapper,></bytebuf,></byte,></bytebuf,></string,></string,>
</byte,></byte,></byte,></byte,> 标签:05,void,通信,private,class,new,public,channel,分布式 From: https://www.cnblogs.com/liconglong/p/17305216.html