1.引入依赖
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>2.0.3</version>
</dependency>
2.基本配置
server:
port: 8081
socketio:
host: localhost
port: 9091
# 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
maxFramePayloadLength: 1048576
# 设置http交互最大内容长度
maxHttpContentLength: 1048576
# socket连接数大小(如只监听一个端口boss线程组为1即可)
bossCount: 1
workCount: 100
allowCustomRequests: true
# 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
upgradeTimeout: 10000
# Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
pingTimeout: 60000
# Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
pingInterval: 25000
3.创建配置类
@Configuration
public class SocketConfig {
@Value("${socketio.host}")
private String host;
@Value("${socketio.port}")
private Integer port;
@Value("${socketio.bossCount}")
private int bossCount;
@Value("${socketio.workCount}")
private int workCount;
@Value("${socketio.allowCustomRequests}")
private boolean allowCustomRequests;
@Value("${socketio.upgradeTimeout}")
private int upgradeTimeout;
@Value("${socketio.pingTimeout}")
private int pingTimeout;
@Value("${socketio.pingInterval}")
private int pingInterval;
@Bean
public SocketIOServer socketIOServer() {
com.corundumstudio.socketio.SocketConfig socketConfig = new com.corundumstudio.socketio.SocketConfig();
socketConfig.setReuseAddress(true);
socketConfig.setTcpNoDelay(true);
socketConfig.setSoLinger(0);
com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration();
configuration.setSocketConfig(socketConfig);
// host在本地测试可以设置为localhost或者本机IP,在Linux服务器跑可换成服务器IP
configuration.setHostname(host);
configuration.setPort(port);
// socket连接数大小(如只监听一个端口boss线程组为1即可)
configuration.setBossThreads(bossCount);
configuration.setWorkerThreads(workCount);
configuration.setAllowCustomRequests(allowCustomRequests);
// 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
configuration.setUpgradeTimeout(upgradeTimeout);
// Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
configuration.setPingTimeout(pingTimeout);
// Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
configuration.setPingInterval(pingInterval);
// 鉴权
configuration.setAuthorizationListener(handshakeData -> {
//获取socket链接发来的token参数
String token = handshakeData.getSingleUrlParam("token");
//如果验签通过就是true,否则false, false的话就不允许建立socket链接
return true;
});
SocketIOServer socketIOServer = new SocketIOServer(configuration);
return socketIOServer;
}
}
4.服务端实现
4.1前端事件监听
@Component
public class EventListenner {
@OnEvent("messageEvent")
public void onEvent(SocketIOClient client, String message, AckRequest request) {
System.out.println("收到推送消息:" + message);
/*
* 前端事件监听,自定义事件名称@OnEvent("messageEvent")
* 逻辑处理
* */
request.sendAckData("推送已收到");//数据返回
}
}
4.2创建通道连接缓存类
public class ClientCache {
/**
* 本地缓存已连接的客户端
*/
private static final Map<UUID, SocketIOClient> concurrentHashMap = new ConcurrentHashMap<>();
/**
* 存入本地缓存
*
* @param sessionId 页面sessionID
* @param socketIOClient 页面对应的通道连接信息
*/
public static void saveClient(UUID sessionId, SocketIOClient socketIOClient) {
concurrentHashMap.put(sessionId, socketIOClient);
}
/**
* 根据用户ID获取通道信息
*
* @param sessionId
* @return
*/
public static SocketIOClient getUserClient(UUID sessionId) {
return concurrentHashMap.get(sessionId);
}
/**
* 根据页面sessionID删除页面链接信息
*
* @param sessionId
*/
public static void deleteSessionClient(UUID sessionId) {
concurrentHashMap.remove(sessionId);
}
/**
* 客户端集合实例获取
*/
public static Map<UUID, SocketIOClient> getConcurrentHashMap(){
return concurrentHashMap;
}
}
4.3事件响应接口和实现类
public interface SocketIOService {
/**
* 启动服务
**/
void start();
/**
* 停止服务
**/
void stop();
}
@Slf4j
@Service
public class SocketIOServiceImpl implements SocketIOService {
@Resource
private SocketIOServer socketIOServer;
@Resource
private EventListenner eventListenner;
/**
* Spring IoC容器创建之后,在加载SocketIOServiceImpl Bean之后启动
*/
@PostConstruct
private void autoStartup() {
start();
}
/**
* Spring IoC容器在销毁SocketIOServiceImpl Bean之前关闭,避免重启项目服务端口占用问题
*/
@PreDestroy
private void autoStop() {
stop();
}
@Override
public void start() {
// 监听客户端连接
socketIOServer.addConnectListener(client -> {
try {
UUID sessionId = client.getSessionId();
ClientCache.saveClient(sessionId, client);
Map<UUID, SocketIOClient> concurrentHashMap = ClientCache.getConcurrentHashMap();
log.info("连接成功,sessionId:{},当前有{}个连接", sessionId,concurrentHashMap.size());
} catch (Exception e) {
log.info("连接异常:"+e);
client.sendEvent("connect",e.getMessage());
throw new RuntimeException(e);
}
});
// 监听客户端断开连接
socketIOServer.addDisconnectListener(client -> {
try {
UUID sessionId = client.getSessionId();
ClientCache.deleteSessionClient(sessionId);
Map<UUID, SocketIOClient> concurrentHashMap = ClientCache.getConcurrentHashMap();
log.info("关闭连接,sessionId:{},当前有{}个连接", sessionId,concurrentHashMap.size());
} catch (Exception e) {
log.info("断开异常:"+e);
client.sendEvent("disconnect",e.getMessage());
throw new RuntimeException(e);
}
});
// 处理自定义的事件,与连接监听类似,event为事件名,PushMessage为参数实体类 添加监听类
// 监听前端发送的事件消息推送事件
socketIOServer.addListeners(eventListenner);
log.info("消息推送服务启动完毕!");
socketIOServer.start();
}
@Override
public void stop() {
if (socketIOServer != null) {
socketIOServer.stop();
socketIOServer = null;
}
log.info("消息推送服务已关闭!");
}
}
4.4推送事件接口和实现类
public interface PushService {
/**
* 推送消息给客户
*
* @param message
* @param sessionId
*/
void pushUser(String message, UUID sessionId);
/**
* 推送消息给所有客户
*
* @param pushMessage
*/
void pushChatMessage(String pushMessage);
}
@Service
public class PushServiceImpl implements PushService {
/**
* 推送给单个客户端
*/
@Override
public void pushUser(String message, UUID sessionId) {
SocketIOClient socketIOClient = ClientCache.getUserClient(sessionId);
if(socketIOClient!=null){
socketIOClient.sendEvent("pushEvent", message);
}
}
/**
* 推送给所有客户端
*/
@Override
public void pushChatMessage(String pushMessage) {
Map<UUID, SocketIOClient> concurrentHashMap = ClientCache.getConcurrentHashMap();
concurrentHashMap.forEach((uuid, socketIOClient)->{
socketIOClient.sendEvent("pushEvent",pushMessage);
});
}
}
5.客户端实现
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
<script src="https://cdn.bootcss.com/socket.io/2.1.1/socket.io.js"></script>
<script type="text/javascript">
var socket = io.connect('http://localhost:9091', {
'reconnection delay' : 2000,
'force new connection' : true,
});
socket.on('pushEvent', function(data) {
// here is your handler on messages from server
console.log(data)
alert(data)
});
socket.on('connect', function(data) {
console.log("连接失败异常:"+data);
});
socket.on('disconnect', function (data) {
console.log("连接断开异常:"+data)
});
socket.emit('messageEvent', "cx",(data) => {
console.log(data)
});
</script>
标签:集成,springboot,socketio,void,private,sessionId,configuration,public
From: https://www.cnblogs.com/lshwl/p/18244363