目录
RxJava处理返回的数据流模板(根据自己的业务需求修改具体逻辑)
什么是RxJava
RxJava 是一个基于事件驱动的、利用可观测序列来实现异步编程的类库,是响应式编程在 Java 语言上的实现。
- 提供了响应式编程模型,使得代码更具可读性。比如 doOnNext、doOnError。
- 提供了丰富的操作符,简化异步操作、事件处理、数据转化等。比如 map、filter、flatMap。
- 简化线程管理,可以很容易地在不同线程中进行数据流的处理。比如observeOn、subscribeOn。
什么是SSE
SSE 服务器发送事件(Server-Sent Events)是一种用于从服务器到客户端的 单向、实时 数据传输技术,基于 HTTP 协议实现。
- 单向通信:SSE 只支持服务器向客户端的单向通信,客户端不能向服务器发送数据。
- 文本格式:SSE 使用 纯文本格式 传输数据,使用 HTTP 响应的 text/event-stream MIME 类型。
- 保持连接:SSE 通过保持一个持久的 HTTP 连接,实现服务器向客户端推送更新,而不需要客户端频繁轮询。
- 自动重连:如果连接中断,浏览器会自动尝试重新连接,确保数据流的连续性
SSE的单向通信、长时间链接、自动重连等特点天然适合对话AI的场景。
RxJava + SSE流式调用AI
导入依赖
AI(以智谱AI为例)
<dependency>
<groupId>cn.bigmodel.openapi</groupId>
<artifactId>oapi-java-sdk</artifactId>
<version>release-V4-2.3.0</version>
</dependency>
RxJava
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.4</version>
</dependency>
封装AI Manager简化传参
传入参数
systemMessage
:系统消息的内容。userMessage
:用户消息的内容。temperature
:请求的温度参数,用于控制生成文本的随机性messages
:一个ChatMessage
对象列表,表示请求中的消息。temperature
:请求的温度参数。
返回值
-
Flowable响应式对象:
是 RxJava 中的一个核心类,用于表示一个可以发出零个或多个数据项的数据流。它是响应式编程模型的一个实现,允许以声明式的方式处理异步数据流。
/**
* 通用流式请求(简化消息传递)
*
* @param systemMessage
* @param userMessage
* @param temperature
* @return
*/
public Flowable<ModelData> doStreamRequest(String systemMessage, String userMessage, Float temperature) {
List<ChatMessage> chatMessageList = new ArrayList<>();
ChatMessage systemChatMessage = new ChatMessage(ChatMessageRole.SYSTEM.value(), systemMessage);
chatMessageList.add(systemChatMessage);
ChatMessage userChatMessage = new ChatMessage(ChatMessageRole.USER.value(), userMessage);
chatMessageList.add(userChatMessage);
return doStreamRequest(chatMessageList, temperature);
}
/**
* 通用流式请求
*
* @param messages
* @param temperature
* @return
*/
public Flowable<ModelData> doStreamRequest(List<ChatMessage> messages, Float temperature) {
// 构建请求
ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder()
.model(Constants.ModelChatGLM4)
.stream(Boolean.TRUE)
.temperature(temperature)
.invokeMethod(Constants.invokeMethod)
.messages(messages)
.build();
try {
ModelApiResponse invokeModelApiResp = clientV4.invokeModelApi(chatCompletionRequest);
return invokeModelApiResp.getFlowable();
} catch (Exception e) {
e.printStackTrace();
throw new BusinessException(ErrorCode.SYSTEM_ERROR, e.getMessage());
}
}
Controller层调用
一定要用GET,不要用POST
@GetMapping("/ai-sse")
封装 Prompt
String userMessage = getGenerateQuestionUserMessage(questionNumber, optionNumber);
建立SSE对象
SseEmitter sseEmitter = new SseEmitter(0L);
AI 生成后SSE流式返回
Flowable<ModelData> modelDataFlowable = aiManager.doStreamRequest(GENERATE_QUESTION_SYSTEM_MESSAGE, userMessage, null);
RxJava处理返回的数据流模板(根据自己的业务需求修改具体逻辑)
modelDataFlowable
//指定下游操作执行的线程
.observeOn(Schedulers.io())
.map(映射操作)
.filter(过滤操作)
.flatMap(对数据流中的每个项应用一个函数,该函数返回一个新的数据流,然后将所有这
些数据流合并成一个单一的数据流)
.doOnNext(在数据流的每个项被发射时执行一个副作用操作
//通过SSE返回给前端
sseEmitter.send())
.doOnError(在数据流发生错误时执行一个副作用操作)
.doOnComplete(在数据流完成时执行一个副作用操作)
//订阅数据流,开始数据的发射
.subscribe();
标签:Springboot,AI,流式,RxJava,SSE,数据流,temperature
From: https://blog.csdn.net/hrh1234h/article/details/144315640