maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
WebSocket.class
import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.CloseReason.CloseCodes;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@Component
@ServerEndpoint("/websocket/{idCard}")
@Slf4j
public class WebSocket {
private static final Set<WebSocketEntity> connections = new CopyOnWriteArraySet<>();
static {
startHeart();
}
/**
* 连接方法
*/
@OnOpen
public synchronized void onOpen(@PathParam("idCard") String idCard, Session session) {
addUser(idCard, session);
}
/**
* 收到消息执行
*/
@OnMessage
public synchronized void onMessage(@PathParam("idCard") String idCard, String message) {
WebSocketMessage msg = JSONUtil.toBean(message, WebSocketMessage.class);
if (WebSocketMessage.MSG_TYPE_HEART.equals(msg.getMsgType())) {
log.info("【websocket心跳(接收)】:收到{}的心跳:{}", idCard, message);
WebSocketEntity entity = getUserEntity(idCard);
if (entity != null) {
entity.setHeartbeat(true);
entity.setLastTime(System.currentTimeMillis());
}
}
}
/**
* 链接错误执行
*/
@OnError
public synchronized void one rror(@PathParam("idCard") String idCard, Throwable error) {
log.info("【websocket消息推送(异常)】:{}:发生了错误:{}", idCard, error);
removeUser(idCard, new CloseReason(CloseCodes.NO_EXTENSION, "客户端异常"));
}
@OnClose
public static synchronized void onClose(@PathParam("idCard") String idCard, CloseReason reason) {
removeUser(idCard, reason);
}
/**
* 获取在线人数
*/
private static synchronized int getUserOnlineNum() {
return connections.size();
}
/**
* 用户上线
*/
private synchronized void addUser(String idCard, Session session) {
//查询当前用户是否已在集合
WebSocketEntity entity = getUserEntity(idCard);
if (null == entity) {
WebSocketEntity webSocket = new WebSocketEntity();
webSocket.setIdCard(idCard);
webSocket.setSession(session);
webSocket.setHeartbeat(true);
webSocket.setLastTime(System.currentTimeMillis());
connections.add(webSocket);
log.info("【websocket消息推送(上线)】:\"{}\"用户已上线,当前人数为:{}", idCard, getUserOnlineNum());
}
}
/**
* 根据idCard获取实体类
*/
private static WebSocketEntity getUserEntity(String idCard) {
WebSocketEntity entity = null;
if (connections.size() == 0) {
return null;
}
for (WebSocketEntity webSocketEntity : connections) {
if (webSocketEntity.getIdCard().contentEquals(idCard)) {
entity = webSocketEntity;
break;
}
}
return entity;
}
/**
* 根据多个idCard获取多个实体类
*/
private static List<WebSocketEntity> getUserEntities(List<String> idCard) {
List<WebSocketEntity> entities = new ArrayList<>();
if (connections.size() == 0) {
return entities;
}
for (WebSocketEntity webSocketEntity : connections) {
if (idCard.contains(webSocketEntity.getIdCard())) {
entities.add(webSocketEntity);
}
}
return entities;
}
/**
* 用户下线
*/
private static void removeUser(String idCard, CloseReason reason) {
WebSocketEntity entity = getUserEntity(idCard);
if (null != entity) {
connections.remove(entity);
log.info("【websocket消息推送(下线)】:\"{}\"用户因{}已下线,当前人数为:{}", idCard, reason.getReasonPhrase(), getUserOnlineNum());
}
}
/**
* 发送消息
*/
public static synchronized void sendMsg(String idCard, String message) {
WebSocketEntity userEntity = getUserEntity(idCard);
if (userEntity != null) {
Session session = userEntity.getSession();
if (session != null) {
synchronized (session) {
try {
session.getBasicRemote().sendText(message);
log.info("【websocket消息推送(发送)】:发送给\"{}\"用户消息:{}", userEntity.getIdCard(), message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
/**
* 批量发送消息
*/
public static synchronized void sendMsgToUsers(List<String> idCard, String message) {
List<WebSocketEntity> userEntities = getUserEntities(idCard);
if (userEntities.size() != 0) {
userEntities.forEach(userEntity -> {
Session session = userEntity.getSession();
if (session != null) {
synchronized (session) {
try {
session.getBasicRemote().sendText(message);
log.info("【websocket消息推送(发送)】:发送给\"{}\"用户消息:{}", userEntity.getIdCard(), message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
}
}
/**
* 启动心跳包
*/
private static synchronized void startHeart() {
ExamineHeartThread examineHeart = new ExamineHeartThread();
Thread examineThread = new Thread(examineHeart);
KeepHeartThread keepHeart = new KeepHeartThread();
Thread keepThread = new Thread(keepHeart);
keepThread.start();
examineThread.start();
}
/**
* 发送心跳包
*/
public static synchronized void sendPing(String message) {
if (connections.size() <= 0) {
return;
}
log.info("【websocket心跳】:发送心跳包当前人数为:" + getUserOnlineNum());
for (WebSocketEntity webSocketEntity : connections) {
synchronized (webSocketEntity) {
webSocketEntity.setLastTime(System.currentTimeMillis());
webSocketEntity.setHeartbeat(false);
try {
webSocketEntity.getSession().getBasicRemote().sendText(message);
log.info("【websocket心跳(发送)】:发送给\"{}\"用户消息:{}", webSocketEntity.getIdCard(), message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送心跳包 10秒一次
*/
private static class KeepHeartThread implements Runnable {
@SneakyThrows
@Override
public void run() {
WebSocketMessage msg = new WebSocketMessage();
msg.setMsgType(WebSocketMessage.MSG_TYPE_HEART);
String message = JSONUtil.toJsonStr(msg);
while (true) {
try {
sendPing(message);
Thread.sleep(10000);
} catch (Exception e) {
log.error("【websocket心跳(异常)】:发送心跳包异常:", e);
}
}
}
}
/**
* 检测是否收到client心跳 每秒一次
*/
private static class ExamineHeartThread implements Runnable {
@Override
public void run() {
while (true) {
try {
// 心跳过期时间
long now = System.currentTimeMillis();
for (WebSocketEntity entity : connections) {
// 心跳过期时间
long heartbeatExpirationTime = entity.getLastTime() + 3000;
if (!entity.isHeartbeat() && entity.getLastTime() != 0 && now > heartbeatExpirationTime) {
onClose(entity.getIdCard(), new CloseReason(CloseCodes.NORMAL_CLOSURE, "没有收到心跳"));
}
}
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("【websocket心跳(异常)】:检测心跳异常:", e);
}
}
}
}
}
WebSocketEntity.class
import lombok.Data;
import javax.websocket.Session;
@Data
public class WebSocketEntity {
private String idCard;
private Session session;
private boolean heartbeat;
private long lastTime;
}
WebSocketMessage.class
import lombok.Data;
@Data
public class WebSocketMessage {
// 心跳
public static final Integer MSG_TYPE_HEART = 0;
// PC端消息
public static final Integer MSG_TYPE_PC_MESSAGE = 1;
// APP端消息
public static final Integer MSG_TYPE_APP_MESSAGE = 2;
// 刷新页面
public static final Integer MSG_TYPE_REFRESH_PAGE = 3;
// 请求成功
public static final String SUCCESS_CODE = "200";
// 请求失败
public static final String FAILURE_CODE = "201";
// 消息类型
private Integer msgType;
// 请求结果码
private String code;
// token
private String authorization;
// 消息
private String message;
}
标签:WebSocket,String,idCard,private,static,心跳,import,机制,public
From: https://www.cnblogs.com/zhimi/p/17326565.html