目录
sse工作原理
SSE 的工作原理基于 HTTP 协议,它通过以下几个步骤完成:
-
客户端请求:客户端通过
GET
请求来开启 SSE 通道。客户端的请求必须设置适当的请求头,告诉服务器它希望接收 SSE 数据流。 -
服务器响应:服务器响应时设置适当的 HTTP 头信息,告诉客户端它将发送一个事件流。然后,服务器保持这个连接,定期推送数据(事件)给客户端。
-
数据流传输:服务器通过持续的 HTTP 连接,将数据以事件流的形式发送给客户端。每个事件数据块会以
data:
开头,之后是数据内容。事件之间通常以\n\n
(两个换行符)分隔。 -
客户端处理:客户端通过 JavaScript 监听 SSE 事件流,并在收到数据时进行相应的处理。
SSE 的优缺点
优点:
- 简单:相比 WebSocket,SSE 实现起来相对简单,尤其适合只需要服务器推送数据的场景。
- 基于 HTTP:使用标准的 HTTP 协议,不需要额外的协议或库支持,因此很容易部署和维护。
- 自动重连:如果连接断开,客户端会自动尝试重新连接。
缺点:
- 单向通信:只支持服务器到客户端的数据传输,无法实现双向通信(如 WebSocket)。
- 浏览器支持:虽然现代浏览器都支持 SSE,但 Internet Explorer 和早期版本的 Edge 不支持。
Servlet 方式的基本逻辑
- 客户端通过 HTTP 请求访问服务器的一个特定 URL(如
/sse
)。 - 服务器响应时设置
Content-Type
为text/event-stream
,并保持连接不关闭。 - 服务器通过该连接持续推送数据,每个数据块以
data:
开头,并用两个换行符分隔。 - 客户端通过 JavaScript 监听这些数据并更新页面。
SSE 与 WebSocket 的对比
虽然 SSE 和 WebSocket 都可以用于实时通信,但它们各自的特点适用于不同的场景:
特性 | SSE | WebSocket |
---|---|---|
连接方式 | 单向通信,客户端从服务器接收数据 | 双向通信,客户端和服务器都可以发送和接收数据 |
协议 | 基于 HTTP(默认端口 80 或 443) | 独立协议,基于 TCP(默认端口 80 或 443) |
客户端支持 | 大多数现代浏览器支持,IE 不支持 | 大多数现代浏览器支持 |
传输延迟 | 通常较低,适合推送消息 | 延迟通常较低,适合实时互动 |
应用场景 | 服务器推送数据到客户端,如实时通知、股票行情等 | 双向实时互动,如多人游戏、聊天室等 |
示例代码
maven依赖
<dependencies>
<!-- Spring Boot Starter Web 依赖(包含了Servlet API、Tomcat等) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
controller层
package com.ybk.common.controller;
import com.ybk.common.api.ApiResult;
import com.ybk.common.api.Result;
import com.ybk.common.core.sse.SseServer;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* @author
* @date
*/
@RestController
@RequestMapping("/pos/sse")
public class SseController {
/**
* 用户SSE连接
* 它返回一个SseEmitter实例,这时候连接就已经创建了.
*
* @return
*/
@GetMapping(value = "/connect",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect(@RequestParam String snNo) {
return SseServer.createConnect(snNo);
}
@GetMapping("/sendMessage")
public Result<?> sendMessage(@RequestParam String snNo,@RequestParam String message) {
/**
* 一般取登录用户账号作为 messageId。分组的话需要约定 messageId的格式。
* 这里模拟创建一个用户连接
*/
SseServer.sendMessage(snNo,message);
return ApiResult.success();
}
}
SseServer
package com.ybk.common.core.sse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
* @author
* @date
*/
@Slf4j
public class SseServer {
/**
* 当前连接总数
*/
private static AtomicInteger currentConnectTotal = new AtomicInteger(0);
/**
* messageId的 SseEmitter对象映射集
*/
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
public static SseEmitter createConnect(String messageId) {
SseEmitter sseEmitter = new SseEmitter(0L);
sseEmitter.onCompletion(completionCallBack(messageId));
sseEmitter.onTimeout(timeOutCallBack(messageId));
sseEmitter.onError(errorCallBack(messageId));
sseEmitterMap.put(messageId, sseEmitter);
currentConnectTotal.incrementAndGet();
return sseEmitter;
}
public static void sendMessage(String messageId, String message) {
if (sseEmitterMap.containsKey(messageId)) {
try {
sseEmitterMap.get(messageId).send(message);
} catch (IOException e) {
log.error("发送消息异常 ==> messageId={}, 异常信息:", messageId, e.getMessage());
throw new RuntimeException(e);
}
} else {
throw new RuntimeException("连接不存在或者超时, messageId=" + messageId);
}
}
/**
* 移除 MessageId
*
* @param messageId
*/
public static void removeMessageId(String messageId) {
sseEmitterMap.remove(messageId);
//数量-1
currentConnectTotal.getAndDecrement();
log.info("remove messageId={}", messageId);
}
/**
* 断开SSE连接时的回调
*
* @param messageId
* @return
*/
private static Runnable completionCallBack(String messageId) {
return () -> {
log.info("结束连接 ==> messageId={}", messageId);
removeMessageId(messageId);
};
}
/**
* 连接超时时回调触发
*
* @param messageId
* @return
*/
private static Runnable timeOutCallBack(String messageId) {
return () -> {
log.info("连接超时 ==> messageId={}", messageId);
removeMessageId(messageId);
};
}
/**
* 连接报错时回调触发。
*
* @param messageId
* @return
*/
private static Consumer<Throwable> errorCallBack(String messageId) {
return throwable -> {
log.error("连接异常 ==> messageId={}", messageId);
removeMessageId(messageId);
};
}
}
总结
- SSE 是一种服务器推送技术,通过 HTTP 协议将数据流传输到客户端,适合用于实时更新场景。
- Java Servlet 和 Spring Boot 都可以很方便地实现 SSE,分别使用标准的
HttpServlet
和SseEmitter
类。 - SSE 的优点是实现简单,且基于 HTTP 协议,但不支持双向通信;相比之下,WebSocket 支持双向通信,适用于更复杂的实时互动场景。