业务需求: 此项目为一气象项目的小部分,在内网条件下,实现网页端的聊天系统,要求可以进行点对点聊天以及群聊,可以发送文字,文件,图片等内容。 技术选型: 此业务要求实时接收人和群组发送的消息,必须使用长链接的服务器,选用netty建立长链接,此外部分业务是传通的crud场景,使用springboot,tomcat,mabytis-plus等进行开发。数据库使用postgre与redis。 技术设计: 整体分为tomcat与netty两部分,tomcat负责传统crud,文件oss等内容,由springboot,mabytis-plus等进行实现,这里不再赘述。netty负责实现聊天,实时消息等。 netty服务器的整个链路从nettyServer开始,在启动tomcat时初始化启动一个NettyServer线程,在线程里建立netty服务器。
@Override public Channel call() throws Exception { ChannelFuture channelFuture = null; try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(parentGroup,childGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,128) .childHandler(new MyChannelInitializer()); channelFuture = bootstrap.bind(new InetSocketAddress(nettyNetPort)).syncUninterruptibly(); this.channel = channelFuture.channel(); } catch (Exception e) { log.error("socket server start error", e.getMessage()); } finally { if (null != channelFuture && channelFuture.isSuccess()) { log.info("socket server start done. "); } else { log.error("socket server start error. "); } } return channel; }此时注意的是通道处理器,项目里使用的MyChannelInitializer,这一步做的是对接收到的消息的解码对返回消息的编码等工作,由于是内网,前端是网页,所以这里只对网页端发送的消息进行简单解码,不在这里进行消息分类(如果是客户端,就可以进行自定义编码了,可以在这里加密,对消息进行分类等)
@Component public class MyChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel channel) throws Exception { //因为基于http协议,使用http的编码和解码器 channel.pipeline().addLast(new HttpServerCodec()); //是以块方式写,添加ChunkedWriteHandler处理器 channel.pipeline().addLast(new ChunkedWriteHandler()); /* 1.http数据在传输过程中是分段,HttpObjectAggregator就是可以将多个段聚合 2.这就是为什么,当浏览器发送大量数据时,就会发出多次http请求 */ channel.pipeline().addLast(new HttpObjectAggregator(8192)); /* 1.对应websocket,它的数据是以帧(frame)形式传递 2.可以看到WebSocketFrame下面的六个子类 3.浏览器请求时,ws://localhost:7000/hello表示请求的url 4.WebSocketServerProtocolHandler核心功能是将http协议升级为ws协议,保持长连接 5.是通过一个状态码101 */ channel.pipeline().addLast(new WebSocketServerProtocolHandler("/")); // 在管道中添加我们自己的接收数据实现方法 channel.pipeline().addLast(new MyTextWebSocketFrameHandler()); } }
接下来就是对建立的通道进行处理了,我们已经在MyChannelInitializer加入了MyTextWebSocketFrameHandler适配器。
@Slf4j @Component public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { String text = msg.text(); Packet packet = JSON.parseObject(text, Packet.class); Integer command = packet.getCommand(); Packet sonPacket = JSON.parseObject(text, Packet.get(command)); Channel channel = ctx.channel(); IHandlerAlgorithm handlerAlgorithm = HandlerConfig.handlerAlgorithmMap.get(command); handlerAlgorithm.msgHandle(channel, sonPacket); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { SocketChannelUtil.removeChannel(ctx.channel().id().toString()); SocketChannelUtil.removeChannelGroupByChannel(ctx.channel()); log.info("客户端断开通知:{}", ctx.channel()); } ... }
这里主要是接受消息与链接断开,链接断开即用户下线,要在我们维护的用户与通道的关系map以及用户与群组的map中删除用户。 接受消息(channelRead0方法)这里使用了策略模式,先把消息解码为提前定义好的类Packet,再根据packet的command解码为对应的packet的子类,再使用IHandlerAlgorithm的实现类进行业务处理。 数据设计 基础数据包括用户,群组,用户之间的好友关系,用户加入群组,消息,对话框(用户与用户的,用户与群组的) 标签:netty,网页,系统,群组,ctx,用户,聊天,new,channel From: https://www.cnblogs.com/ssqswyf/p/17222821.html