要使用Netty框架实现WebSocket服务端与客户端通信,你需要按照以下步骤进行:
1、添加依赖
首先,在你的项目中引入Netty和其WebSocket支持库的依赖。如果你使用Maven或Gradle构建项目,可以在pom.xml或build.gradle文件中添加相应的依赖项。
Maven:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.77.Final</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
2、创建WebSocket服务器端
package com.ruoyi.rests.websocket.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
/**
* NettyServer Netty服务器配置
*/
public class NettyServer {
private final int port;
public NettyServer(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.option(ChannelOption.SO_BACKLOG, 1024);
sb.group(group, bossGroup) // 绑定线程池
.channel(NioServerSocketChannel.class) // 指定使用的channel
.localAddress(this.port)// 绑定监听端口
.childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("收到新连接");
ServletRequestAttributes sra = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
RequestContextHolder.setRequestAttributes(sra, true);
//websocket协议本身是基于http协议的,所以这边也要使用http解编码器
ch.pipeline().addLast(new HttpServerCodec());
//以块的方式来写的处理器
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpObjectAggregator(8192));
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket", null, true, 65536 * 10, false, true));
ch.pipeline().addLast(new MyWebSocketHandler());
}
});
ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
cf.channel().closeFuture().sync(); // 关闭服务器通道
} finally {
group.shutdownGracefully().sync(); // 释放线程池资源
bossGroup.shutdownGracefully().sync();
}
}
}
package com.ruoyi.rests.websocket.netty;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.util.TypeUtils;
import com.ruoyi.rests.api.domain.dto.Protocol;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class NettyConfig {
private static final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 存放请求ID与channel的对应关系
*/
private static final ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
private static final AtomicInteger counter = new AtomicInteger();
/**
* 定义两把锁
*/
public static AtomicInteger getCounter() {
return counter;
}
public static ChannelGroup getChannelGroup() {
return channelGroup;
}
public static ConcurrentHashMap<String, Channel> getChannelMap() {
return channelMap;
}
public static void sendMessage(Protocol protocol, Channel channel) {
TypeUtils.compatibleWithJavaBean = true;
channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(protocol)));
if(protocol.getTargetId().contains("3DPC")) {
protocol.setTargetId(protocol.getSenderUserId() + "3DDEVPC");
channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(protocol)));
}
}
public static void sendMessageAll(Protocol protocol, Channel channel) {
TypeUtils.compatibleWithJavaBean = true;
channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(protocol)));
}
public static void broadcast(Protocol protocol) {
TypeUtils.compatibleWithJavaBean = true;
getChannelMap().entrySet().stream()
.filter(it -> it.getKey().contains(protocol.getTargetAliasUTenant()))
.forEach(it -> sendMessage(protocol, it.getValue()));
}
public static void autoFlush() {
Iterator<Map.Entry<String, Channel>> iterator = getChannelMap().entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Channel> next = iterator.next();
if (!next.getValue().isOpen()) {
iterator.remove();
}
}
}
public static List<Channel> getChannel(String aliasId) {
return getChannelMap().entrySet().stream()
.filter(it -> it.getKey().contains(aliasId)).map(Map.Entry::getValue).collect(Collectors.toList());
}
}
消息处理
package com.ruoyi.rests.websocket.netty;
import com.alibaba.fastjson.util.TypeUtils;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.common.core.context.SecurityContextHolder;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.rests.api.domain.dto.Protocol;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
@Component
@ChannelHandler.Sharable
@Slf4j
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 读取数据
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
log.info("服务器收到消息:{}", msg.text());
// 获取用户ID,关联channel
String text = msg.text();
if (JSON.isValid(text)) {
Protocol protocol = JSON.parseObject(text, Protocol.class);
if (StringUtils.isNotEmpty(protocol.getTenantId()) && StringUtils.isNotEmpty(protocol.getSenderUserId())
&& StringUtils.isNotEmpty(protocol.getSessionId()) ) {
String uniqueKey = String.format("%s:%s", protocol.getSelfAliasUTenant(), protocol.getSessionId());
NettyConfig.getChannelMap().put(uniqueKey, ctx.channel());
if (StringUtils.isNotEmpty(protocol.getConversationType()) && PPSWokerCenter.exits(protocol.getConversationType())) {
SecurityContextHolder.setTenantId(protocol.getTenantId());
PPSWokerCenter.handler(protocol);
} else {
protocol.setContentText("暂不支持该协议");
TypeUtils.compatibleWithJavaBean = true;
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(protocol)));
}
}else {
protocol.setContentText("{tenantId,sendUserId,targetId} 请求参数其中可能为空");
TypeUtils.compatibleWithJavaBean = true;
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(protocol)));
}
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("客户端断开: 租户xx:{}", ctx.channel().id().asLongText());
if (ctx.channel().isActive()) {
removeUser(ctx);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
removeUser(ctx);
ctx.close();
}
/**
* 删除用户与channel的对应关系
*/
private void removeUser(ChannelHandlerContext ctx) {
// AttributeKey<String> key = AttributeKey.valueOf("userId");
// String userId = ctx.channel().attr(key).get();
// NettyConfig.getChannelMap().remove(userId);
if(ctx.channel().isActive()){
Iterator<Map.Entry<String, Channel>> iterator = NettyConfig.getChannelMap().entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Channel> entry = iterator.next();
if (Objects.equals(entry.getValue(), ctx.channel())){
iterator.remove();
}
}
}
}
}
package com.ruoyi.rests.websocket.netty;
import com.ruoyi.common.core.utils.MGThreadPool;
import com.ruoyi.rests.api.domain.dto.Protocol;
import com.ruoyi.rests.websocket.ProtocolHandler;
import com.ruoyi.rests.websocket.protocol.*;
import java.util.HashMap;
import java.util.Map;
public class PPSWokerCenter {
private static final Map<String, ProtocolHandler> protocolHandlerMap = new HashMap<>();
static {
register("REGISTER", new Register());
register("SYSTEMHEATBIT", new SystemHeatbit());
register("ONLINE", new Online());
register("MODELBYAREA", new ModelByArea());
register("RESET3DCAMERA", new Reset3DCamera());
register("SET3DCAMERA", new Set3DCamera());
register("EXCEEDAREA", new ExceedArea());
register("PERSONPOINTALL", new PersonPointAll());
register("PERSONPOINTHIDDEN", new PersonPointHidden());
register("PERSONPOINT", new PersonPoint());
register("CAMERAPOINTALL", new CameraPointAll());
register("CAMERAPOINTHIDDEN", new CameraPointHidden());
register("SENSORPOINTALL", new SensorPointAll());
register("SENSORPOINTHIDDEN", new SensorPointHidden());
register("PERSONSTREAM", new PersonStream());
register("OPENVIDEO", new OpenVideo());
register("VIDEOSTREAM", new VideoStream());
register("HISTORYGET", new HistoryGet());
register("HISTORYREADY", new HistoryReady());
register("HISTORYPLAY", new HistoryPlay());
register("HISTORYSTOP", new HistoryStop());
register("HISTORYCLEAR", new HistoryClear());
register("SYSTEMALARMWEB", new SystemAlarmWeb());
register("SYSTEMALARMLIST", new SystemAlarmWeb());
register("WEBSOCKETLOGOUT", new WebSocketLogOut());
}
// netty server 提供API
public static void register(String type, ProtocolHandler protocolHandler) {
protocolHandlerMap.put(type, protocolHandler);
}
public static void handler(Protocol protocol) {
MGThreadPool.runAsync(() -> {
ProtocolHandler protocolHandler = protocolHandlerMap.get(protocol.getConversationType());
protocolHandler.handler(protocol);
});
}
public static boolean exits(String conversationType) {
return protocolHandlerMap.containsKey(conversationType);
}
}
package com.ruoyi.rests.websocket;
import com.ruoyi.rests.api.domain.dto.Protocol;
public interface ProtocolHandler {
void handler(Protocol protocol);
}
附 实现类
package com.ruoyi.rests.websocket.protocol;
import com.ruoyi.rests.api.domain.dto.Protocol;
import com.ruoyi.rests.websocket.ProtocolHandler;
import com.ruoyi.rests.websocket.netty.NettyConfig;
public class SystemHeatbit implements ProtocolHandler {
@Override
public void handler(Protocol protocol) {
NettyConfig.getChannelMap().entrySet().stream()
.forEach(it -> NettyConfig.sendMessage(protocol, it.getValue()));
}
}
package com.ruoyi.rests.websocket.protocol;
import com.ruoyi.rests.api.domain.dto.Protocol;
import com.ruoyi.rests.websocket.ProtocolHandler;
import com.ruoyi.rests.websocket.netty.NettyConfig;
import io.netty.channel.Channel;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class Register implements ProtocolHandler {
@Override
public void handler(Protocol protocol) {
protocol.setContentText(true);
// 只对当前注册人 响应
String uniqueKey = String.format("%s:%s", protocol.getSelfAliasUTenant(), protocol.getSessionId());
NettyConfig.getChannel(protocol.getSelfAliasUTenant()).stream()
.filter(it -> it.equals(NettyConfig.getChannelMap().get(uniqueKey)))
.forEach(it -> NettyConfig.sendMessageAll(protocol, it));
//1. 根据senduserid 包含3d 进行逻辑处理
//1.a. 根据 channel map , 进行过滤不包含3d的
//2. 根据senduserId 不包含3d 进行逻辑处理
// 2.a. 根据channel map ,过滤包含3D
if(protocol.getSenderUserId().contains("3D")) {
List<Channel> oldChannels = NettyConfig.getChannelMap().entrySet().stream()
.filter(entry -> entry.getKey().contains(protocol.getSelfAliasUTenant()))
.filter(it -> it.getKey().contains("3D"))
.filter(entry -> !entry.getKey().contains(protocol.getSessionId())).map(Map.Entry::getValue).collect(Collectors.toList());
Protocol protocol1 = new Protocol();
protocol1.setConversationType("WEBSOCKETLOGOUT");
oldChannels.forEach(it -> {
NettyConfig.sendMessageAll(protocol1, it);
// it.close();
});
} else {
List<Channel> oldChannels = NettyConfig.getChannelMap().entrySet().stream()
.filter(entry -> entry.getKey().contains(protocol.getSelfAliasUTenant()) && !entry.getKey().contains("3D"))
.filter(entry -> !entry.getKey().contains(protocol.getSessionId())).map(Map.Entry::getValue).collect(Collectors.toList());
Protocol protocol1 = new Protocol();
protocol1.setConversationType("WEBSOCKETLOGOUT");
oldChannels.forEach(it -> {
NettyConfig.sendMessageAll(protocol1, it);
// it.close();
});
}
}
}
package com.ruoyi.rests.websocket.protocol;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import com.ruoyi.basis.api.RemoteAreaService;
import com.ruoyi.basis.api.RemoteBuildingService;
import com.ruoyi.basis.api.RemoteFloorService;
import com.ruoyi.basis.api.domain.area.Area;
import com.ruoyi.basis.api.domain.area.Building;
import com.ruoyi.basis.api.domain.area.Floor;
import com.ruoyi.basis.api.domain.area.type.AreaType;
import com.ruoyi.basis.api.domain.dto.AreaDto;
import com.ruoyi.common.core.constant.Constants;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.utils.SpringUtils;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.common.redis.service.RedisService;
import com.ruoyi.rests.api.domain.dto.Protocol;
import com.ruoyi.rests.domain.dto.AreaTypeDto;
import com.ruoyi.rests.websocket.ProtocolHandler;
import com.ruoyi.rests.websocket.netty.NettyConfig;
import com.ruoyi.system.api.RemoteModelService;
import com.ruoyi.system.api.domain.Model;
import java.util.List;
public class ModelByArea implements ProtocolHandler {
@Override
public void handler(Protocol protocol) {
RemoteModelService remoteModelService = SpringUtils.getBean(RemoteModelService.class);
RemoteAreaService remoteAreaService = SpringUtils.getBean(RemoteAreaService.class);
RemoteBuildingService remoteBuildingService = SpringUtils.getBean(RemoteBuildingService.class);
RemoteFloorService remoteFloorService = SpringUtils.getBean(RemoteFloorService.class);
RedisService redisService = SpringUtils.getBean(RedisService.class);
JSONObject map = JSON.parseObject(protocol.getContentText().toString(), JSONObject.class);
String areaId;
String areaIdStr = map.getString("areaId");
String areaType = map.getString("areaType");
AreaTypeDto areaTypeDto = new AreaTypeDto();
areaTypeDto.setAreaId(areaIdStr);
if (areaIdStr != null && !StringUtils.isEmpty(areaIdStr)) {
areaId = areaIdStr;
AreaDto areaDto = new AreaDto();
areaDto.setAreaType(areaType);
areaDto.setTenantId(protocol.getTenantId());
if (AreaType.AREA.equals(areaType)) {
areaDto.setAreaId(areaId);
R<Area> areaR = remoteAreaService.queryByAreaId(areaDto);
if (areaR.getCode() == Constants.SUCCESS && areaR.getData() != null) {
areaTypeDto.setX(areaR.getData().getX());
areaTypeDto.setY(areaR.getData().getY());
areaTypeDto.setZ(areaR.getData().getZ());
}
} else if (AreaType.BUILDING.equals(areaType)) {
R<Building> buildingR = remoteBuildingService.selectByBuildingId(areaId);
if (buildingR.getCode() == Constants.SUCCESS && buildingR.getData() != null) {
areaTypeDto.setX(buildingR.getData().getX());
areaTypeDto.setY(buildingR.getData().getY());
areaTypeDto.setZ(buildingR.getData().getZ());
}
areaDto.setBuildingId(areaId);
} else if (AreaType.FLOOR.equals(areaType)) {
R<Floor> floorR = remoteFloorService.selectByFloorId(areaId);
if (floorR.getCode() == Constants.SUCCESS && floorR.getData() != null) {
areaTypeDto.setX(floorR.getData().getX());
areaTypeDto.setY(floorR.getData().getY());
areaTypeDto.setZ(floorR.getData().getZ());
}
areaDto.setFloorId(areaId);
} else if (AreaType.ROOM.equals(areaType)) {
areaDto.setRoomId(areaId);
}
R<List<Model>> listR = remoteModelService.areaBindModel(areaDto);
if (listR.getCode() == Constants.SUCCESS && listR.getData() != null) {
List<Model> list = listR.getData();
redisService.setCacheObject(protocol.getTenantId() + ":" + protocol.getSenderUserId() ,list);
areaTypeDto.setAreaType(areaType);
areaTypeDto.setModelList(list);
protocol.setContentText(areaTypeDto);
} else {
protocol.setContentText(Lists.newArrayList());
}
}
NettyConfig.getChannelMap().entrySet().stream()
.filter(it -> it.getKey().contains(protocol.getTargetAliasUTenant()))
.forEach(it -> NettyConfig.sendMessage(protocol, it.getValue()));
}
}
标签:Netty,WebSocket,协议,ruoyi,protocol,new,import,com,channel
From: https://blog.csdn.net/m0_69146489/article/details/137346082