SseEmitter 详解
在现代的Web应用程序中,实时数据是非常重要的。为了实现这一点,许多框架和技术被开发出来,其中包括SseEmitter。SseEmitter是一个Spring框架中的类,它允许服务器向客户端发送实时数据。
SseEmitter是Spring框架中的一个类,用于实现服务器向客户端推送消息的功能。SSE代表Server-Sent Events,是一种基于HTTP协议的服务器推送技术。SseEmitter可以在服务器端创建一个SSE连接,然后通过该连接向客户端发送消息。
优缺点
- 优点:开发简单,无需引入其他包,可以相当于流式输出
- 缺点: 单向,只能服务端往客户端发,且占用一个连接,连接数有限制,传输只支持utf-8,不支持二进制;
SseEmitter正常测试,就是在一个方法里面开启,在另一个方法里面关闭
SseEmitter算作异步返回,在springmvc中有个设置超时参数: spring.mvc.async.request-timeout ,默认是10s,所以如果你返回的SseEmitter 10s内没关闭可能会自动关闭;
/**
* Amount of time before asynchronous request handling times out. If this value is
* not set, the default timeout of the underlying implementation is used, e.g. 10
* seconds on Tomcat with Servlet 3.
* 设定async请求的超时时间,以毫秒为单位,如果没有设置的话,以具体实现的超时时间为准,比如tomcat的servlet3的话是10秒.
*/
private Duration requestTimeout;
SseEmitter的用法非常简单。首先,我们需要在服务器端创建一个controller对象。有两个方法,一个创建SseEmitter对象,一个往SseEmitter对象发送数据: 代码如下:
@Api(tags = "sse", description = "sse")
@Controller
@Slf4j
public class SseController {
private final Map<String, SseEmitter> sseCache = new HashMap<>();
@GetMapping(value = "/sse")
@ApiOperation("/sse")
@ResponseBody
public SseEmitter sse(String sseKey) {
SseEmitter emitter = new SseEmitter();
emitter.onCompletion(() -> {
log.error("SseEmitter结束:" + sseKey);
});
sseCache.put(sseKey, emitter);
return emitter;
}
@GetMapping(value = "/send")
@ApiOperation("send")
@ResponseBody
public void send(String sseKey, String data) throws IOException {
SseEmitter sseEmitter = sseCache.get(sseKey);
if (sseEmitter != null) {
sseEmitter.send(data);
}
sseEmitter.complete();
}
}
然后,我们可以先调用/sse返回SseEmitter,然后调用/send使用SseEmitter对象的send方法向客户端发送消息。 自己测试一下:
http://localhost:8080/sse?sseKey=111
http://localhost:8080/send?data=aaa&sseKey=111
这会向客户端发送一条消息,消息内容为"aaa"。
除了文本消息,我们还可以向客户端发送事件。事件是一个JSON对象,包含一个名称和一些数据。客户端可以根据事件名称来处理不同类型的事件。代码如下:
Map<String, Object> data = new HashMap<>();
data.put("name", "John Doe");
data.put("age", 30);
emitter.send(SseEmitter.event().name("user").data(data));
这会向客户端发送一个名为"user"的事件,数据为一个包含"name"和"age"两个属性的JSON对象。 在第一个浏览器返回data:aaa 控制台打印
2023-08-14 15:20:31.449 INFO 19916 [io-8080-exec-13] c.s.s.controller.LoginInterceptor [16] : request uri : /sse
2023-08-14 15:20:38.230 INFO 19916 [io-8080-exec-14] c.s.s.controller.LoginInterceptor [16] : request uri : /send
2023-08-14 15:20:38.233 INFO 19916 [io-8080-exec-15] c.s.s.controller.LoginInterceptor [16] : request uri : /sse
2023-08-14 15:20:38.234 ERROR 19916 [io-8080-exec-15] c.s.s.controller.sse.SseController [34] : SseEmitter结束:111
在一个方法里面返回并关闭
场景: 这一种是在一个方法里面生成,然后异步处理,发送数据,然后进行关闭.举个栗子,对接chatgpt流式返回数据: 然后在完成里面将SseEmitter 关闭;
https://github.com/PlexPt/chatgpt-java
@GetMapping("/chat/sse")
@CrossOrigin
public SseEmitter sseEmitter(String prompt) {
//国内需要代理 国外不需要
Proxy proxy = Proxys.http("127.0.0.1", 1080);
ChatGPTStream chatGPTStream = ChatGPTStream.builder()
.timeout(600)
.apiKey("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")
.proxy(proxy)
.apiHost("https://api.openai.com/")
.build()
.init();
SseEmitter sseEmitter = new SseEmitter(-1L);
SseStreamListener listener = new SseStreamListener(sseEmitter);
Message message = Message.of(prompt);
listener.setOnComplate(msg -> {
//回答完成,可以做一些事情
});
chatGPTStream.streamChatCompletion(Arrays.asList(message), listener);
return sseEmitter;
}
注意事项
在使用SseEmitter时,我们需要注意以下几点:
- SseEmitter对象在使用完毕后必须关闭,否则会导致资源泄漏。可以在finally块中关闭SseEmitter对象。
- 如果客户端断开了连接,服务器应该捕获到异常并关闭SseEmitter对象。
- SseEmitter对象有一个默认的超时时间,如果在指定时间内没有发送任何消息,连接会自动关闭。可以通过setTimeout方法修改超时时间。
总之,SseEmitter是一个非常有用的类,可以方便地实现服务器向客户端推送消息的功能。在实际应用中,我们可以将其用于实时通知、实时数据展示等场景。
标签:深入浅出,send,sseEmitter,sse,SseEmitter,data,客户端 From: https://blog.51cto.com/yida/7077901