首页 > 编程语言 >Springboot项目基于RxJava + SSE流式调用AI

Springboot项目基于RxJava + SSE流式调用AI

时间:2024-12-07 21:32:58浏览次数:5  
标签:Springboot AI 流式 RxJava SSE 数据流 temperature

目录

什么是RxJava

什么是SSE

RxJava + SSE流式调用AI

导入依赖

封装AI Manager简化传参

传入参数

返回值

Controller层调用

一定要用GET,不要用POST

封装 Prompt

建立SSE对象

AI 生成后SSE流式返回

RxJava处理返回的数据流模板(根据自己的业务需求修改具体逻辑)


什么是RxJava

RxJava 是一个基于事件驱动的、利用可观测序列来实现异步编程的类库,是响应式编程在 Java 语言上的实现。

  • 提供了响应式编程模型,使得代码更具可读性。比如 doOnNext、doOnError。
  • 提供了丰富的操作符,简化异步操作、事件处理、数据转化等。比如 map、filter、flatMap。
  • 简化线程管理,可以很容易地在不同线程中进行数据流的处理。比如observeOn、subscribeOn。

什么是SSE

SSE 服务器发送事件(Server-Sent Events)是一种用于从服务器到客户端的 单向、实时 数据传输技术,基于 HTTP 协议实现。

  • 单向通信:SSE 只支持服务器向客户端的单向通信,客户端不能向服务器发送数据。
  • 文本格式:SSE 使用 纯文本格式 传输数据,使用 HTTP 响应的 text/event-stream MIME 类型。
  • 保持连接:SSE 通过保持一个持久的 HTTP 连接,实现服务器向客户端推送更新,而不需要客户端频繁轮询。
  • 自动重连:如果连接中断,浏览器会自动尝试重新连接,确保数据流的连续性

SSE的单向通信、长时间链接、自动重连等特点天然适合对话AI的场景。

RxJava + SSE流式调用AI

导入依赖

AI(以智谱AI为例)

<dependency>
    <groupId>cn.bigmodel.openapi</groupId>
    <artifactId>oapi-java-sdk</artifactId>
    <version>release-V4-2.3.0</version>
</dependency>

RxJava

<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams</artifactId>
  <version>1.0.4</version>
</dependency>

封装AI Manager简化传参

传入参数
  • systemMessage:系统消息的内容。
  • userMessage:用户消息的内容。
  • temperature:请求的温度参数,用于控制生成文本的随机性
  • messages:一个ChatMessage对象列表,表示请求中的消息。
  • temperature:请求的温度参数。
返回值
  • Flowable响应式对象是 RxJava 中的一个核心类,用于表示一个可以发出零个或多个数据项的数据流。它是响应式编程模型的一个实现,允许以声明式的方式处理异步数据流。

    /**
     * 通用流式请求(简化消息传递)
     *
     * @param systemMessage
     * @param userMessage
     * @param temperature
     * @return
     */
    public Flowable<ModelData> doStreamRequest(String systemMessage, String userMessage, Float temperature) {
        List<ChatMessage> chatMessageList = new ArrayList<>();
        ChatMessage systemChatMessage = new ChatMessage(ChatMessageRole.SYSTEM.value(), systemMessage);
        chatMessageList.add(systemChatMessage);
        ChatMessage userChatMessage = new ChatMessage(ChatMessageRole.USER.value(), userMessage);
        chatMessageList.add(userChatMessage);
        return doStreamRequest(chatMessageList, temperature);
    }


    /**
     * 通用流式请求
     *
     * @param messages
     * @param temperature
     * @return
     */
    public Flowable<ModelData> doStreamRequest(List<ChatMessage> messages, Float temperature) {
        // 构建请求
        ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder()
                .model(Constants.ModelChatGLM4)
                .stream(Boolean.TRUE)
                .temperature(temperature)
                .invokeMethod(Constants.invokeMethod)
                .messages(messages)
                .build();
        try {
            ModelApiResponse invokeModelApiResp = clientV4.invokeModelApi(chatCompletionRequest);
            return invokeModelApiResp.getFlowable();
        } catch (Exception e) {
            e.printStackTrace();
            throw new BusinessException(ErrorCode.SYSTEM_ERROR, e.getMessage());
        }
    }

Controller层调用

一定要用GET,不要用POST
@GetMapping("/ai-sse")
封装 Prompt
        String userMessage = getGenerateQuestionUserMessage(questionNumber, optionNumber);
建立SSE对象
SseEmitter sseEmitter = new SseEmitter(0L);
AI 生成后SSE流式返回
        Flowable<ModelData> modelDataFlowable = aiManager.doStreamRequest(GENERATE_QUESTION_SYSTEM_MESSAGE, userMessage, null);
RxJava处理返回的数据流模板(根据自己的业务需求修改具体逻辑)
        modelDataFlowable
                //指定下游操作执行的线程
                .observeOn(Schedulers.io())
                .map(映射操作)
                .filter(过滤操作)
                .flatMap(对数据流中的每个项应用一个函数,该函数返回一个新的数据流,然后将所有这 
                 些数据流合并成一个单一的数据流)
                .doOnNext(在数据流的每个项被发射时执行一个副作用操作
                 //通过SSE返回给前端
                 sseEmitter.send())
                .doOnError(在数据流发生错误时执行一个副作用操作)
                .doOnComplete(在数据流完成时执行一个副作用操作)
                //订阅数据流,开始数据的发射
                .subscribe();

标签:Springboot,AI,流式,RxJava,SSE,数据流,temperature
From: https://blog.csdn.net/hrh1234h/article/details/144315640

相关文章

  • 文章提示疑拟AI创作,请注意甄别怎么办?去除ai写作痕迹这么做!
      文章提示疑拟AI创作怎么办?去除ai写作痕迹这么做。“TTAI检测”有一键降低AI率的功能!先总结:AI生成的文章是完美的,但缺少情绪,注意力完全不分散,有固定的格式可寻!人类写的文章多少都会!有错误,但是有情绪,注意力会分散,有困惑度!原因:确实用了AI。发表前没有进行人工处理。(......
  • 8条笔记,30天涨粉1.8w!用AI做林黛玉怼人视频,涨粉太香了!(附完整教程)
    大家好,我是程序员X小鹿,前互联网大厂程序员,自由职业2年+,也一名AIGC爱好者,持续分享更多前沿的「AI工具」和「AI副业玩法」,欢迎一起交流~最近「林黛玉怼人」系列的账号太火了!而且内容太太太让人上瘾了!在小红书刷到了林黛玉怼人的视频,愣是被硬控了10多分钟,才「依依不舍......
  • 清华华子哥:AI科研入门教程(二)
    哈喽,各位童靴们好啊,上次华子哥和大家分享了找论文部分,这次来和大家分享一下读论文部分第一层:看山是山对于survey,全部看一遍,可以再借助gpt整理一下主要内容;对于**research,**最好的方法就是把筛选出来最有价值的论文原文进行逐句的精读,但是我们时间不够用,那就直接上翻译器......
  • 【springboot开发】Spring Boot 3 中的日志框架详解(含源码分析)
    一、引言二、spring-boot-starter-logging介绍四、日志框架加载源码分析五、结论一、引言SpringBoot3在日志处理方面提供了一套灵活且强大的解决方案。默认情况下,SpringBoot3使用SLF4J(SimpleLoggingFacadeforJava)作为日志门面,而Logback作为日志的实现框架。SLF4......
  • AI辅助数据库设计评审
    场景我们之前已经基于之前大模型同一会话进行过需求评审,在研发设计完成数据模型后,导出数据库设计DDL文件,上传到AI平台进行下一步评审。实践groupbuy.txt文件是我们数据库设计SQL脚本文件提示词1您是软件工程专家,我刚刚上传数据库设计DDL文件{groupbuy.txt},请结合以上需求与原型......
  • springboot毕设教学网站设计与实现论文+程序+部署
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容一、研究背景随着信息技术的飞速发展,互联网在教育领域的应用日益广泛。传统教学模式受到时间和空间的限制,难以满足现代教育对于灵活性、个性化和资源共享的需......
  • springboot毕设健身工作室膳食健康与身材管理系统论文+程序+部署
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容一、研究背景随着现代生活节奏的加快,人们对健康和身材管理的关注度日益提高。健身工作室成为许多人追求健康生活方式的重要场所。然而,在健身过程中,膳食健康与......
  • CMake学习2024.12.7问AI的问题记录
    iwtbf:target_include_directories(&{PROJECT_BINARY_DIR})是什么GitHubCopilot:target_include_directories是CMake中的一个命令,用于为目标添加包含目录。&{PROJECT_BINARY_DIR}是一个变量,表示项目的二进制目录。语法如下:target_include_directories(<target>[SYSTEM......
  • springboot毕设 外文学术期刊遴选服务平台 程序+论文
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景在全球化背景下,外文学术期刊作为科研交流的重要载体,承载着最前沿的学术研究成果和理论创新。随着学科领域的不断细分和交叉融合,科研人员面临着海量期......
  • springboot毕设 网络书城项目 程序+论文
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着互联网技术的飞速发展和电子商务的日益普及,人们的购物方式正经历着前所未有的变革。传统的实体书店虽然依然承载着文化与知识的传递,但其运营模式......