文章目录
背景介绍
用过GPT类语言模型的同学都知道,其在返回数据时都是一个字或几个字的显示,你是否思考过它是怎么传输的?经过一番查询学习,了解到了SSE,GPT就是通过SSE流式传输方式进行传输的。
SSE 全称为 Server-sent-events , 是一种基于 HTTP 协议的通信技术,允许服务器主动向客户端(通常是Web浏览器)发送更新。它是 HTML5 标准的一部分,设计初衷是用来建立一个单向的服务器到客户端连接,使得服务器可以实时地向客户端发送数据。这种服务端实时向客户端发送数据的传输方式,其实就是流式传输。今天就来模拟实践一番SSE传输。
目标
实现SSE流式数据传输
技术细节
1、客户端
Server-Sent Events(SSE)是 HTML5 的一部分,用于从服务器实时接收更新,目前大部分主流浏览器都提供了支持:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>Demo</title>
</head>
<body>
<div id="result"></div>
<script>
let result = "";
if ("EventSource" in window) {
// 连接服务器
var sseSource = new EventSource(
"http://127.0.0.1:8080/test/createSse?uid=123"
);
// 连接打开
sseSource.onopen = function () {
console.log("连接打开");
document.getElementById("result").innerText += '连接打开\n';
};
// 连接错误
sseSource.onerror = function (err) {
console.log("连接错误:", err);
};
sseSource.onclose = function () {
console.log("连接关闭");
};
// 接收到数据
sseSource.onmessage = function (event) {
console.log("接收到数据:", event.data);
if (event.data.startsWith("[DONE]")) {
document.getElementById("result").innerText+='数据传输完毕\n'
} else {
handleReceiveData(event);
}
};
// 关闭链接
function handleCloseSse() {
console.log("关闭链接");
sseSource.close();
}
// 处理服务器返回的数据
function handleReceiveData(data) {
try {
let json = JSON.parse(data.data);
document.getElementById("result").innerText += json.message;
} catch (error) {}
//
}
}
</script>
</body>
</html>
2、服务端
我们目前服务端主要使用 SpringBoot,其对 SSE 主要提供了两种支持:
- Spring WebMVC:传统的基于 Servlet 的同步阻塞编程模型,即 同步模型Web框架。
- Spring WebFlux:异步非阻塞的响应式编程模型,即 异步模型Web框架。
今天使用spring-webmvc,我用的版本是5.2.15
首先在springBoot项目中新建个SseClient.java类,代码如下
public class SseClient {
private static final Logger logger = LoggerFactory.getLogger(SseClient.class);
private static final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 创建连接
*/
public SseEmitter createSse(String uid) {
//默认30秒超时,设置为0L则永不超时
SseEmitter sseEmitter = new SseEmitter(0l);
//完成后回调
sseEmitter.onCompletion(() -> {
logger.info("[{}]结束连接...................", uid);
sseEmitterMap.remove(uid);
});
//超时回调
sseEmitter.onTimeout(() -> {
logger.info("[{}]连接超时...................", uid);
});
//异常回调
sseEmitter.onError(
throwable -> {
try {
logger.info("[{}]连接异常,{}", uid, throwable.toString());
sseEmitter.send(SseEmitter.event()
.id(uid)
.name("发生异常!")
.data("发生异常请重试!")
.reconnectTime(3000));
sseEmitterMap.put(uid, sseEmitter);
} catch (IOException e) {
e.printStackTrace();
}
}
);
try {
sseEmitter.send(SseEmitter.event().reconnectTime(5000));
} catch (IOException e) {
e.printStackTrace();
}
sseEmitterMap.put(uid, sseEmitter);
logger.info("[{}]创建sse连接成功!", uid);
return sseEmitter;
}
/**
* 给指定用户发送消息
*
*/
public boolean sendMessage(String uid,String messageId, String message) {
if (StrUtil.isBlank(message)) {
logger.info("参数异常,msg为null", uid);
return false;
}
SseEmitter sseEmitter = sseEmitterMap.get(uid);
if (sseEmitter == null) {
logger.info("消息推送失败uid:[{}],没有创建连接,请重试。", uid);
return false;
}
try {
sseEmitter.send(SseEmitter.event().id(messageId).reconnectTime(1*60*1000L).data(message));
logger.info("用户{},消息id:{},推送成功:{}", uid,messageId, message);
return true;
}catch (Exception e) {
sseEmitterMap.remove(uid);
logger.info("用户{},消息id:{},推送异常:{}", uid,messageId, e.getMessage());
sseEmitter.complete();
return false;
}
}
/**
* 断开
* @param uid
*/
public void closeSse(String uid){
if (sseEmitterMap.containsKey(uid)) {
SseEmitter sseEmitter = sseEmitterMap.get(uid);
sseEmitter.complete();
sseEmitterMap.remove(uid);
}else {
logger.info("用户{} 连接已关闭",uid);
}
}
}
接着新建一个Controller,内容如下
@RestController
@RequestMapping(value ="/test")
public class TestController {
@Autowired
private SseClient sseClient;
@CrossOrigin
@GetMapping("/createSse")
public SseEmitter createConnect(String uid) {
return sseClient.createSse(uid);
}
}
现在先将后端代码运行起来,然后运行前端代码,效果如下
可以看到已经连接成功。
接下来我们来模拟数据发送,在controller里面增加个发送消息的接口
@CrossOrigin
@GetMapping("/sendMsg")
public String sseChat(String uid) {
for (int i = 0; i < 10; i++) {
JSONObject jo = new JSONObject();
jo.put("uid", uid);
jo.put("message", "服务器消息" + i + "\n");
sseClient.sendMessage(uid, "no" + i, jo.toString());
try {
//睡眠一下,模拟GPT数据处理
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
}
//告诉前端数据发送完毕
sseClient.sendMessage(uid, "", "[DONE]");
return "ok";
}
然后重新启动后,就可以进行消息发送了,这里我用postman来调用发送消息的接口
可以看到前端只发送一次请求,服务器端的数据通过事件流的方式传输到前端。
总结
SSE 与 WebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息。
总体来说,WebSocket 更强大和灵活。因为它是全双工通道,可以双向通信;SSE 是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求。但是,SSE 也有自己的优点。
- SSE使用 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。
- SSE属于轻量级,使用简单;WebSocket 协议相对复杂。
- SSE 默认支持断线重连,WebSocket 需要自己实现。
- SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。
- SSE 支持自定义发送的消息类型。
因此,两者各有特点,可根据自己的使用场景选择。
标签:uid,sseEmitterMap,流式,连接,sseEmitter,SSE,数据传输,ChatGPT,logger From: https://blog.csdn.net/xs_2012/article/details/142378253