文章目录
一、数据实时推送实现
在现代Web应用中,实现数据的实时推送是提升用户体验的重要手段。以下是三种常见的实现方式:
1. 轮询(Polling)
描述:
客户端通过定时向服务器发送AJAX请求,服务器接收请求并返回响应信息。
优点:
- 实现简单,适用于小型应用。
缺点:
- 当服务器数据未更新时,重复请求会浪费带宽和服务器资源。
- 延迟较高,不适合高实时性需求。
2. 长连接(Long Polling)
描述:
客户端发起一个长时间的HTTP请求,服务器在有数据更新时才响应。连接关闭后,客户端立即重新发起新的请求,保持持续连接。
优点:
- 实现比轮询更高效的数据推送。
- 数据实时性较好,避免了频繁的无效请求。
缺点:
- 需要维护较多的长连接,增加服务器资源开销。
- 实现复杂度高于轮询。
3. WebSocket
描述:
WebSocket 是 HTML5 提供的一种客户端与服务器之间进行全双工通信的协议。它在单个TCP连接上进行全双工通信,实现客户端和服务器的实时双向数据交互。
优点:
- 实时双向通信,延迟低。
- 高效资源利用,连接一次保持长连接。
- 支持多种应用场景,如聊天室、实时监控、通知系统等。
缺点:
- 部分旧浏览器不支持,需要考虑兼容性。
- 安全性要求较高,需要通过加密(如 wss)传输敏感数据。
应用场景:
- 聊天室
- 实时数据监控(如股票K线图)
- 消息提醒系统
- 实时协作工具(如协同编辑)
建立连接过程:
- 客户端发送一次常规的 HTTP GET 请求,其中包含
Upgrade: websocket
头部,表明希望升级协议。 - 服务器同意后,协议从 HTTP 升级为 WebSocket。
- 连接建立后,客户端和服务器可以随时进行数据交互,直到连接关闭。
二、代码实现
以下将介绍如何在 Spring Boot 后端和前端实现 WebSocket 通信。
后端实现
1. 引入依赖
在 pom.xml
中添加 Spring Boot WebSocket 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2. 配置类
创建一个 WebSocket 配置类,用于注册 ServerEndpointExporter
,使得 @ServerEndpoint
注解生效。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
/**
* 注册 ServerEndpointExporter, 自动注册使用 @ServerEndpoint 注解声明的 WebSocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3. WebSocket 服务类
创建一个 WebSocket 服务类,处理 WebSocket 的生命周期事件和消息传输。
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
@Component
@Data
@ServerEndpoint(value = "/websocket/{userId}")
@Slf4j
public class WebSocketServer {
// 线程安全的 Set,用于存放每个客户端的 WebSocketServer 实例
private static final CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();
// private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<>();
private Session session;
private String userId;
/**
* 连接建立成功调用的方法
*
* @param session WebSocket 会话
* @param userId 用户唯一标识,从路径参数获取
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
log.info("WebSocket连接成功,用户ID: {}", userId);
this.session = session;
this.userId = userId;
// // 需要用hashmap的时候,就可以考虑是否不用循环遍历
// 移除已有的相同用户ID的连接
webSocketSet.stream()
.filter(webSocket -> webSocket.getUserId().equals(userId))
.findFirst()
.ifPresent(existingWebSocket -> {
webSocketSet.remove(existingWebSocket);
log.info("移除旧的WebSocket连接,用户ID: {}", userId);
try {
// 不要对集合用remove。。血一般的教训
existingWebSocket.session.close();
} catch (IOException e) {
log.error("关闭旧的WebSocket连接时出错: {}", e.getMessage());
}
});
webSocketSet.add(this);
log.info("当前连接数:{}", webSocketSet.size());
// 发送欢迎消息
sendMessage("连接成功,欢迎使用WebSocket服务!");
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this);
log.info("WebSocket连接关闭,用户ID: {}", userId);
log.info("当前连接数:{}", webSocketSet.size());
}
/**
* 接收到客户端消息时调用的方法
*
* @param message 客户端发送的消息
*/
@OnMessage
public void onMessage(String message) {
log.info("收到来自用户ID {} 的消息:{}", userId, message);
// 可以在这里处理业务逻辑,例如广播消息或发送给特定用户
// 示例:回复客户端
sendMessage("服务器收到消息:" + message);
}
/**
* 发生错误时调用的方法
*
* @param throwable 异常信息
*/
@OnError
public void one rror(Throwable throwable) {
log.error("WebSocket发生错误,用户ID: {},错误信息: {}", userId, throwable.getMessage());
}
/**
* 发送消息到客户端
*
* @param message 要发送的消息
*/
public void sendMessage(String message) {
if (this.session != null && this.session.isOpen()) {
synchronized (this.session) {
try {
// 这里有getBasicRemote 和 getAsyncRemote方法
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("发送消息失败,用户ID: {},错误信息: {}", userId, e.getMessage());
}
}
} else {
log.warn("尝试发送消息时,WebSocket会话已关闭,用户ID: {}", userId);
}
}
/**
* 群发消息到所有连接的客户端
*
* @param message 要发送的消息
* @param userId 指定的用户ID,如果为null或空,则广播给所有用户
*/
public static void sendAllMessage(String message, String userId) {
webSocketSet.forEach(webSocket -> {
if (userId == null || userId.isEmpty() || webSocket.getUserId().equals(userId)) {
webSocket.sendMessage(message);
}
});
}
}
4. 定时任务
创建一个定时任务,定期向所有连接的客户端推送消息。
本人理解是,后端要实时向前端推送数据,所以使用定时任务推数据。如果大家有什么更好的想法,可以在评论区或者私信交流一下。
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.CopyOnWriteArraySet;
@Component
@EnableScheduling
@RequiredArgsConstructor
@Slf4j
public class WebSocketScheduledTask {
private final RandomDataService randomDataService;
// 每5秒执行一次
@Scheduled(cron = "0/5 * * * * ?")
public void scheduledTask() {
log.info("执行定时任务,推送随机数据");
CopyOnWriteArraySet<WebSocketServer> webSocketSet = WebSocketServer.getWebSocketSet();
String message1 = randomDataService.getRandomData1();
String message2 = randomDataService.getRandomData2();
String message4 = randomDataService.getRandomData4();
String message5 = randomDataService.getRandomData5();
// 举个例子,实际中我是定义一个VO,一个接口把所有数据返回给前端
String combinedMessage = String.join(" | ", message1, message2, message4, message5);
webSocketSet.forEach(webSocket -> webSocket.sendMessage(combinedMessage));
}
}
前端实现
创建一个简单的前端页面,实现 WebSocket 的连接、消息发送和接收。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket 测试</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
#message {
border: 1px solid #ccc;
padding: 10px;
height: 300px;
overflow-y: scroll;
margin-top: 10px;
}
.controls {
margin-bottom: 10px;
}
</style>
</head>
<body>
<h2>WebSocket 测试</h2>
<div class="controls">
<label for="userId">用户ID:</label>
<input id="userId" type="text" value="123" />
<button onclick="connectWebSocket()">连接</button>
<button onclick="disconnectWebSocket()">断开连接</button>
</div>
<div class="controls">
<input id="text" type="text" placeholder="输入消息" />
<button onclick="sendMessage()">发送消息</button>
</div>
<div id="message"></div>
<script type="text/javascript">
let websocket = null;
/**
* 连接 WebSocket
*/
function connectWebSocket() {
const userId = document.getElementById("userId").value.trim();
if (!userId) {
alert('请输入用户ID');
return;
}
const wsUrl = `ws://${window.location.hostname}:8080/websocket/${userId}`;
websocket = new WebSocket(wsUrl);
// 连接发生错误的回调方法
websocket.onerror = function () {
showMessage("连接发生错误");
};
// 连接成功建立的回调方法
websocket.onopen = function () {
showMessage("连接成功");
};
// 接收到消息的回调方法
websocket.onmessage = function (event) {
showMessage("收到服务端消息:" + event.data);
};
// 连接关闭的回调方法
websocket.onclose = function () {
showMessage("连接已关闭");
};
// 监听窗口关闭事件,主动关闭 WebSocket 连接
window.onbeforeunload = function () {
if (websocket) {
websocket.close();
}
};
}
/**
* 断开 WebSocket 连接
*/
function disconnectWebSocket() {
if (websocket) {
websocket.close();
websocket = null;
}
}
/**
* 发送消息到服务器
*/
function sendMessage(){
if (websocket && websocket.readyState === WebSocket.OPEN) {
const message = document.getElementById("text").value.trim();
if (message) {
websocket.send(message);
showMessage("发送消息:" + message);
document.getElementById("text").value = '';
} else {
alert('请输入要发送的消息');
}
} else {
alert('WebSocket未连接');
}
}
/**
* 显示消息到页面
* @param {string} message
*/
function showMessage(message) {
const messageDiv = document.getElementById("message");
const timestamp = new Date().toLocaleTimeString();
messageDiv.innerHTML += `[${timestamp}] ${message}<br/>`;
messageDiv.scrollTop = messageDiv.scrollHeight;
}
</script>
</body>
</html>
优化点说明:
- 用户ID输入: 添加用户ID输入框,方便测试不同用户的连接。
- 样式优化: 增加简单的样式,使页面更易读。
- 消息展示: 添加时间戳,方便查看消息的接收时间。
- 连接管理: 分离连接和发送消息的逻辑,增强用户体验。
测试工具推荐
可以使用以下在线工具进行 WebSocket 调试和测试:
三、常见错误及解决方案
错误 1: 参数缺失或注解错误
错误信息:
A parameter of type [class java.lang.String] was found on method [onError] of class [WebSocketServer] that did not have a @PathParam annotation
原因:
@OnError
方法的参数缺少必要的注解,导致 WebSocket 容器无法正确识别参数。
解决方案:
确保 @OnError
方法的参数不需要 @PathParam
注解。通常,@OnError
方法只需要一个 Throwable
参数。
示例修正:
@OnError
public void one rror(Throwable error) {
log.error("WebSocket发生错误,用户ID: {},错误信息: {}", userId, error.getMessage());
}
错误 2: 遍历集合时修改集合导致的并发错误
错误描述:
在遍历 webSocketSet
时尝试移除元素,导致 ConcurrentModificationException
异常。
别问为什么,也别问我怎么知道的。。。
解决方案:
使用线程安全的集合类 CopyOnWriteArraySet
,并避免在遍历时修改集合。若需移除元素,可在遍历前或遍历后进行。
示例修正:
在 onOpen
方法中,先找到需要移除的 WebSocket 实例,然后再进行移除操作。
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
// ...(其他代码)
// 移除已有的相同用户ID的连接
webSocketSet.stream()
.filter(webSocket -> webSocket.getUserId().equals(userId))
.findFirst()
.ifPresent(existingWebSocket -> {
webSocketSet.remove(existingWebSocket);
log.info("移除旧的WebSocket连接,用户ID: {}", userId);
try {
existingWebSocket.session.close();
} catch (IOException e) {
log.error("关闭旧的WebSocket连接时出错: {}", e.getMessage());
}
});
// ...(其他代码)
}
错误 3: 客户端无法连接 WebSocket
可能原因及解决方案:
-
端口不正确或服务器未启动:
- 确保 Spring Boot 应用正在运行,并且监听的端口(如 8080)正确。
-
WebSocket 路径错误:
- 确认前端连接的 WebSocket URL 与后端
@ServerEndpoint
路径一致。
- 确认前端连接的 WebSocket URL 与后端
-
跨域问题:
- 确保服务器允许跨域连接,必要时在 WebSocket 配置中添加跨域支持。
-
防火墙或代理阻挡:
- 检查服务器防火墙或代理设置,确保 WebSocket 的端口开放。
-
使用
ws
与wss
协议不匹配:- 如果服务器使用 HTTPS,客户端应使用
wss://
协议连接 WebSocket。
- 如果服务器使用 HTTPS,客户端应使用
参考资料
- WebSocket 基本连接、定时任务、动态多定时任务、解决不能注入,前端 Json 消息解码 - 哔哩哔哩
- 万字详解,带你彻底掌握 WebSocket 用法 - 知乎
- 【后端】WebSocket 学习笔记 - CSDN 博客