首页 > 编程语言 >深入探索Spring AI:源码分析流式回答

深入探索Spring AI:源码分析流式回答

时间:2024-10-11 09:32:41浏览次数:1  
标签:return stream AI Spring WebFlux 阻塞 Flux 源码

在上一章节中,我们深入分析了Spring AI的阻塞式请求与响应机制,并探讨了如何增强其记忆能力。今天,我们将重点讲解流式响应的概念与实现。毕竟,AI的流式回答功能与其交互体验密切相关,是提升用户满意度的重要组成部分。

基本用法

基本用法非常简单,只需增加一个 stream 方法即可实现所需功能。接下来,我们将通过代码示例来展示这一过程,帮助您更清晰地理解如何在实际应用中进行操作。请看以下代码:

@GetMapping(value = "/ai-stream",produces = MediaType.APPLICATION_OCTET_STREAM_VALUE + ";charset=UTF-8")
Flux<String> generationByStream(@RequestParam("userInput") String userInput) {
    Flux<String> output = chatClient.prompt()
            .user(userInput)
            .stream()
            .content();
    return output;
}

在我们增加 stream 方法之后,返回的对象类型将不再是原来的阻塞式 CallResponseSpec,而是转换为非阻塞的 StreamResponseSpec。与此同时,返回的数据类型也由之前的 String 变更为 Flux

在深入探讨其具体应用之前,首先让我来介绍一下 Flux 的概念与特性。

Spring WebFlux的处理器实现

首先,在 WebFlux 中,处理器已经实现了非阻塞式的功能。这意味着,只要我们的代码返回一个 Flux 对象,就能轻松实现响应功能。通过这种方式,应用程序能够高效地处理并发请求,而不会因阻塞操作而影响整体性能。

    @Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        if (this.handlerMappings == null) {
            return createNotFoundError();
        }
        if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
            return handlePreFlight(exchange);
        }
        return Flux.fromIterable(this.handlerMappings)
                .concatMap(mapping -> mapping.getHandler(exchange))
                .next()
                .switchIfEmpty(createNotFoundError())
                .onErrorResume(ex -> handleResultMono(exchange, Mono.error(ex)))
                .flatMap(handler -> handleRequestWith(exchange, handler));
    }

这里简单介绍一下 Spring WebFlux,虽然这不是我们的重点,但了解其基本概念还是很有帮助的。Spring WebFlux 是 Spring 框架的一部分,专为构建反应式应用而设计。它支持异步和非阻塞的编程模型,使得处理高并发请求变得更加高效。以下是 WebFlux 的几个关键特性:

  1. 反应式编程:WebFlux 基于反应式编程模型,使用 MonoFlux 类型来处理数据流。Mono 表示零或一个元素,而 Flux 则表示零个或多个元素。这种模型使得我们可以轻松处理异步数据流,从而提高代码的可读性和可维护性。
  2. 非阻塞 I/O:WebFlux 通过非阻塞的 I/O 操作(如 Netty 或 Servlet 3.1+ 容器)来实现高效的资源利用。与传统的阻塞 I/O 不同,WebFlux 在等待响应时能够释放线程,这样一来,就可以显著提高应用的并发能力,支持更多的同时请求而不增加线程开销。

了解这些特性将为后续的非阻塞式响应设计奠定基础,帮助我们更好地利用 WebFlux 的能力来提升应用性能。

源码分析

现在我们来详细看看我们的 content 是如何操作的。接下来的代码示例将展示具体的实现方式,帮助我们理解在 WebFlux 中如何处理数据流和响应:

public Flux<String> content() {
    return doGetFluxChatResponse(this.request).map(r -> {
        if (r.getResult() == null || r.getResult().getOutput() == null
                || r.getResult().getOutput().getContent() == null) {
            return "";
        }
        return r.getResult().getOutput().getContent();
    }).filter(StringUtils::hasLength);
}

这里的实现相对简单,主要是传入了一个函数。接下来,我们将深入分析 doGetFluxChatResponse 的代码实现,以便更好地理解其具体逻辑和运作方式:

private Flux<ChatResponse> doGetFluxChatResponse2(DefaultChatClientRequestSpec inputRequest) {
//此处省略重复代码
    var fluxChatResponse = this.chatModel.stream(prompt);
//此处省略重复代码
    return advisedResponse;
}

这里的代码逻辑与阻塞回答基本相同,唯一的不同之处在于它调用了 chatModel.stream(prompt) 方法。接下来,我们将深入探讨 chatModel.stream(prompt) 方法的具体实现和其背后的设计思路:

public Flux<ChatResponse> stream(Prompt prompt) {
        return Flux.deferContextual(contextView -> {
        //此处省略重复代码
            Flux<OpenAiApi.ChatCompletionChunk> completionChunks = this.openAiApi.chatCompletionStream(request,
                    getAdditionalHttpHeaders(prompt));
//此处省略重复代码
            Flux<ChatResponse> chatResponse = completionChunks.map(this::chunkToChatCompletion)
                .switchMap(chatCompletion -> Mono.just(chatCompletion).map(chatCompletion2 -> {
//此处省略重复代码
                        return new ChatResponse(generations, from(chatCompletion2, null));
                    }
                }));
//此处省略重复代码
            return new MessageAggregator().aggregate(flux, observationContext::setResponse);

        });
    }

同样的逻辑在这里就不再赘述,我们将重点关注其中的区别。在这一部分,我们使用了 chatCompletionStream,而且与之前不同的是,这里不再使用 retryTemplate,而是引入了 webClient,这是一个能够接收事件流的工具类。

public Flux<ChatCompletionChunk> chatCompletionStream(ChatCompletionRequest chatRequest,
        MultiValueMap<String, String> additionalHttpHeader) {

    Assert.notNull(chatRequest, "The request body can not be null.");
    Assert.isTrue(chatRequest.stream(), "Request must set the stream property to true.");

    AtomicBoolean isInsideTool = new AtomicBoolean(false);

    return this.webClient.post()
        .uri(this.completionsPath)
        .headers(headers -> headers.addAll(additionalHttpHeader))
        .body(Mono.just(chatRequest), ChatCompletionRequest.class)
        .retrieve()
        .bodyToFlux(String.class)
        // cancels the flux stream after the "[DONE]" is received.
        .takeUntil(SSE_DONE_PREDICATE)
        // filters out the "[DONE]" message.
        .filter(SSE_DONE_PREDICATE.negate())
        .map(content -> ModelOptionsUtils.jsonToObject(content, ChatCompletionChunk.class))
//此处省略一堆代码

这段代码的主要目的是通过 webClient 向指定路径发起一个 POST 请求,同时设置合适的请求头和请求体。在获取响应数据时,使用了事件流的方式(通过 bodyToFlux 方法)来接收响应内容,并对数据进行过滤和转换,最终将其转化为 ChatCompletionChunk 对象。

尽管其余的业务逻辑与之前相似,但有一点显著的区别,即整个流程的返回类型以及与 OpenAI API 的调用方式都是非阻塞式的。

总结

在当今的数字时代,流式响应机制不仅提升了系统的性能,还在用户体验上扮演了关键角色。通过引入 Flux 类型,Spring WebFlux 的设计理念使得应用能够以非阻塞的方式处理并发请求,从而有效利用资源并减少响应延迟。

我们终于全面讲解了Spring AI的基本操作,包括阻塞式回答、流式回答以及记忆增强功能。这些内容为我们深入理解其工作机制奠定了基础。接下来,我们将继续深入探索源码,重点分析回调函数、实体类映射等重要功能。

这将帮助我们更好地理解Spring AI的内部运作原理,并为进一步的优化和定制化提供指导。


我是努力的小雨,一名 Java 服务端码农,潜心研究着 AI 技术的奥秘。我热爱技术交流与分享,对开源社区充满热情。同时也是一位腾讯云创作之星、阿里云专家博主、华为云云享专家、掘金优秀作者。

标签:return,stream,AI,Spring,WebFlux,阻塞,Flux,源码
From: https://www.cnblogs.com/guoxiaoyu/p/18440684

相关文章

  • ai课堂行为分析系统
    ai课堂行为分析系统利用图像识别算法和数据分析技术,ai课堂行为分析系统对学生在课堂上的表情状态、课堂表现和互动行为进行实时监测和评估。ai课堂行为分析系统通过摄像头采集学生的图像,并通过算法分析学生的表情、姿态和互动行为,从而评估学生的参与度、专注度和互动质量。ai课堂......
  • 聊聊git push到远程服务器出现RPC failed问题
    前言最近小组成员跟我说,他git提交不了代码。我问了下原因,他说他代码一提交就会报代码语言:txt复制error:RPCfailed;HTTP413curl22TherequestedURLreturnederror:413fatal:theremoteendhungupunexpectedlyfatal:theremoteendhungupunexpected......
  • Result Maps collection already contains value for xxx.xxx.dao.BaseResultMap错误
    重复引入jar包问题解决方法,在pom文件中排除这个jar包原:<dependency><groupId>com.hedu</groupId><artifactId>sweet-template-webapp</artifactId><version>1.0</version></dependency>排除后:&......
  • 【Kubernets】容器网络基础二:通讲CNI(Container Network Interface)容器网络接口实现方
    文章目录背景知识Underlay网络Overlay网络一、基本概念二、工作原理三、实现方案四、应用场景两者对比示意图CNI实现有哪些?FlannelFlannel的工作原理Flannel的主要组件数据传输机制总结Calico一、架构基础二、核心组件与功能三、路由与数据包转发四、安全策略五、数......
  • 前端大模型入门:Langchain的不同文本分割器对比和效果展示-教你根据场景选出最合适的方
    在前端开发大模型应用的时候,处理和分割文本是常见需求,毕竟现在的大模型输入输出都有限-嵌入等也是有token限制的,合理的文本分割能显著提高模型的表现。Langchain提供了多种文本分割方式,本文将对比五种文本分割器:CharacterTextSplitter、RecursiveCharacterTextSplitter、Tok......
  • Linux !ko/5.17-BBRplus AMD64(X86_64)内核致命的 futex_wait 函数死锁问题。
    !ko表示系统内核(system-kernel)致命:在CentOS(RedHat)、Ubuntu、Debian等多个发行版本Linux操作系统上,若人们升级 5.17-BBRplus版本内核,那么在应用程式频繁的futex_wait(syscall)等待唤醒时,或会存在futex_wait函数发生死锁的疑难问题。LMP:futex(2)-Linuxmanualpa......
  • ORB-SLAM2源码学习:ORBextractor.cc 逻辑梳理
    前言 温馨提示:请在看过源代码的情况下看此文章学习效果更好。ORBextractor.cc这个源文件定义函数众多,要是按照源码的顺序进行学习会抓不住重点,下面是我梳理的四个核心函数。一、主入口函数:operator()说明:主入口函数直接调用了ComputePyramid()、ComputeKeyPointsOctTree()、......
  • SpringBoot增删该查
    SpringBoot+Mybatis增删该查()1、xml基础配置<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation......
  • 基于SpringBoot + Vue的毕业设计选题系统的设计与实现 (角色:学生、教师、管理员)
    文章目录前言一、详细操作演示视频二、具体实现截图三、技术栈1.前端-Vue.js2.后端-SpringBoot3.数据库-MySQL4.系统架构-B/S四、系统测试1.系统测试概述2.系统功能测试3.系统测试结论五、项目代码参考六、数据库代码参考七、项目论文示例结语前言......
  • 计算机毕业设计 医院预约挂号系统的设计与实现 Python毕业设计 Python毕业设计选题【
    博主介绍:✌从事软件开发10年之余,专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌......