前言
这篇随笔属实没想到一个好名字,起因是在项目中遇到了一个springboot服务会发出多个socket服务的场景,而且我们使用的是socketIO服务,为了减少调试工作和重复的开发工作,让开发在项目中专注于业务编写,因此封装了一个在启动springboot服务时,自动创建socketIONamespace的逻辑
依赖
在使用此依赖时,我的项目版本为:
因为是要跟公司其他团队的架构保持一致,所以我们的socketIo的版本偏低,使用的是
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.19</version>
</dependency>
<dependency>
<groupId>io.socket</groupId>
<artifactId>socket.io-client</artifactId>
<version>1.0.0</version>
</dependency>
逻辑
首先需要添加一个socketIo的配置类
import com.corundumstudio.socketio.HandshakeData;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class SocketIoConfig {
@Value("${socketio.port}")
private Integer port;
@Value("${socketio.bossCount}")
private int bossCount;
@Value("${socketio.workCount}")
private int workCount;
@Value("${socketio.allowCustomRequests}")
private boolean allowCustomRequests;
@Value("${socketio.upgradeTimeout}")
private int upgradeTimeout;
@Value("${socketio.pingTimeout}")
private int pingTimeout;
@Value("${socketio.pingInterval}")
private int pingInterval;
@Bean
public SocketIOServer socketIoServer() {
SocketConfig socketConfig = new SocketConfig();
socketConfig.setTcpNoDelay(true);
socketConfig.setSoLinger(0);
// 因为使用了Spring的
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
config.setSocketConfig(socketConfig);
// 设置授权监听器
config.setAuthorizationListener(new AuthorizationListener() {
@Override
public boolean isAuthorized(HandshakeData data) {
// 在这里添加你的授权逻辑
// 例如,检查握手数据中的 token
String token = data.getSingleUrlParam("token");
return "valid_token".equals(token);
}
});
config.setPort(port);
config.setBossThreads(bossCount);
config.setWorkerThreads(workCount);
config.setAllowCustomRequests(allowCustomRequests);
config.setUpgradeTimeout(upgradeTimeout);
config.setPingTimeout(pingTimeout);
config.setPingInterval(pingInterval);
return new SocketIOServer(config);
}
}
这些类的配置,我是使用的 application.yml
进行管理,在 application.yml
中添加了相应的配置.
对于授权监听器这块的逻辑,我一直没有真正的使用起来。我对于token的处理是放在了建立socket链接后进行处理
需要添加一个策略接口 SocketIoStrategy
和一个初始化加载的类 SocketInitHandle
public interface SocketIoStrategy {
/**
* 定义命名空间的Url
* * @return 命名空间url
*/
String defineNamespaceUrl();
/**
* 链接后方法
*
* @param client socket客户端信息
*/
void connected(SocketIOClient client);
/**
* 自定义监听器
*
* @param socketIoNamespace socketIo命名空间
*/
void customListener(SocketIONamespace socketIoNamespace);
/**
* 链接后方法
*
* @param client socket客户端信息
*/
void disconnect(SocketIOClient client);
}
import com.corundumstudio.socketio.SocketIONamespace;
import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
@Slf4j
@Service
public class SocketInitHandle implements ApplicationListener<ContextRefreshedEvent> {
@Resource
private SocketIOServer socketIoServer;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
ApplicationContext applicationContext = event.getApplicationContext();
// 获取继承了ISocketIoService的bean
applicationContext.getBeansOfType(SocketIoStrategy.class).forEach((beanName, socketService) -> {
log.info("{} socket io namespace:{}", beanName, socketService.defineNamespaceUrl());
String namespaceUrl = socketService.defineNamespaceUrl();
SocketIONamespace socketIoNamespace = socketIoServer.addNamespace(namespaceUrl);
socketIoNamespace.addConnectListener(socketService::connected);
socketIoNamespace.addDisconnectListener(socketService::disconnect);
socketService.customListener(socketIoNamespace);
});
// 启动服务
socketIoServer.start();
}
/**
* Spring IoC容器在销毁SocketIOServiceImpl Bean之前关闭,避免重启项目服务端口占用问题
*/
@PreDestroy
private void autoStop() {
stop();
}
public void stop() {
if (socketIoServer != null) {
socketIoServer.stop();
socketIoServer = null;
}
}
}
这样,在项目启动时,会自动识别到实现了SocketIoStrategy这个接口的类,下面是一个这个接口实现的样例
@Slf4j
@Service
public class DemoSocketIoStrategyImpl implements SocketIoStrategy {
/**
* 存放已连接的客户端
*/
private static final Map<String, SocketUtil.SocketClientInfo> CLIENT_MAP = new ConcurrentHashMap<>();
@Override
public String defineNamespaceUrl() {
return "/demo";
}
@Override
public void connected(SocketIOClient client) {
SocketUtil.SocketClientInfo clientInfo = SocketUtil.formationSocketInfoWithClient(client);
bizLog.info("************ 客户端:{} 已连接 ************", clientInfo.getClientId());
// 自定义事件`connected` -> 与客户端通信 (也可以使用内置事件,如:Socket.EVENT_CONNECT)
client.sendEvent(Socket.EVENT_CONNECT, "成功连接");
CLIENT_MAP.put(clientInfo.getClientId(), clientInfo);
}
@Override
public void customListener(SocketIONamespace socketIoNamespace) {
socketIoNamespace.addEventListener("PUSH_DATA_EVENT", String.class, (client, data, ackSender) -> {
// 客户端推送`client_info_event`事件时,onData接受数据,这里是string类型的json数据,还可以为Byte[],object其他类型
String clientId = SocketUtil.getClientIdByClient(client);
bizLog.info("client{} push msg:{}", clientId, data);
});
}
@Override
public void disconnect(SocketIOClient client) {
String clientId = SocketUtil.getClientIdByClient(client);
bizLog.info("{} *********************** 客户端已断开连接", clientId);
if (clientId != null) {
CLIENT_MAP.remove(clientId);
client.disconnect();
}
}
public void pushBroadcastMessages(String eventType, String msgContent) {
CLIENT_MAP.forEach((clientId, clientInfo) -> {
bizLog.info("send fence msg to {}, content:{}", clientId, msgContent);
clientInfo.getClient().sendEvent(eventType, msgContent);
});
}
}
这个类实现了socketIO策略,还提供了一个广播的实现方法,在服务中需要广播消息时,执行消息的类型和内容即可发送
前端逻辑
下面是一个基于react的使用逻辑
const mySocket = useRef<any>(null);
const startSocket = () => {
if (!mySocket) {
// socketIoUrl是对应后端服务的域名,/demo是对应链接的链接路径,用于区分一个服务中的多个socket逻辑
mySocket = io(`${socketIoUrl}/demo`, {
reconnectionDelayMax: 10000,
query: {
// 一些在链接的时候需要携带的参数
},
});
mySocket.on('connect', (ev: any) => {
console.log('socket 连接成功', ev);
// 在链接成功后,发送一个emitEventType类型的事件消息
mySocket.emit('emitEventType',"emit data")
});
// 此处监听后端服务的eventType类型的事件的数据
mySocket.on('eventType', (data: any) => {
// 接收到数据后的逻辑
});
}
};
useEffect(()=>{
// 在加载的时候建立链接
startSocket();
return () => {
//断开socket连接
if (mySocket.current) {
mySocket.current.closeSocket();
}
};
},[])
标签:socketIO,Springboot,启动,void,socketio,client,import,config,public
From: https://www.cnblogs.com/xing-chen-d/p/18452150