首页 > 编程语言 >用 Higress AI 网关降低 AI 调用成本 - 阿里云天池云原生编程挑战赛参赛攻略

用 Higress AI 网关降低 AI 调用成本 - 阿里云天池云原生编程挑战赛参赛攻略

时间:2024-08-28 12:03:59浏览次数:16  
标签:embedding 网关 log AI Higress wrapper key query config

作者介绍:杨贝宁,爱丁堡大学博士在读,研究方向为向量数据库

《Higress  AI 网关挑战赛》正在火热进行中,Higress 社区邀请了目前位于排行榜 top5 的选手杨贝宁同学分享他的心得。下面是他整理的参赛攻略:

背景

我们要在 Higress 网关中编写 WebAssembly(wasm)插件,使得在 http 请求的各个阶段(requestHeader,requestBody,responseHeader,responseBody)能够将相应的请求或返回捕获进行业务逻辑的处理。具体到本比赛,主要需要实现的是缓存对大模型的请求(openai 接口的形式)在本地(或云数据库),并设计语义级别的缓存命中逻辑来实现降低响应请求且减少 token 费用的目的。

AI Cache 示例

以上图为例,本比赛主要的问题可以归纳为:(1)如何根据 Query 字符串生成合适的 Query 向量 ⇒ 向量生成器选型。(2)如何根据 Query 向量进行语义级别的查找,快速找到合适的缓存向量 ⇒ 缓存命中逻辑设计。(3)如何管理大量的缓存⇒向量数据库选型及重复初始化逻辑。

实际上 Redis 也具备 Vector Store 能力,这里的 Cache Store 和 Vector Store 是可以合并的。不过本 Demo 将二者分开了,Cache Store 使用 Redis,Vector Store 使用阿里云 DashVector 服务。

网关环境搭建

首先我们需要在线上搭建网关环境以供测试和评测使用,本文也提供了本地搭建网关环境的方法供读者参考。注意这里的线上搭建环境是以赛题介绍为基础展开的,若读者已经搭建好线上的开源网关或企业网关可跳过本节。

1.1 搭建线上企业版 Higress 环境**

赛题支持开源 Higress 和企业版 Higress 两种不同的配置,本文以企业版 Higress 为例进行展示。

1.1.1 申请免费试用并创建相应资源

1.1.2 创建服务 (通义千问的地址、Redis 服务、Dashscope 服务)

1.1.3 创建路由转发给通义千问

1.1.4 开启 Higress 插件市场中的 AI Proxy 插件

网关需要 AI Proxy 插件作为处理 AI 请求的支撑,我们可以采用插件市场中已有的 ai-proxy 插件。从源码编译的命令和上次如下所示。最后,配置环节需要提供大模型服务商的 api 和 token key 等,注意比赛需要使用通义千问的 qwen_long 模型。

1.1.5 编译上传 AI Cache 插件* (注意可能需要修改 “ai-cache” 名称防止和插件市场已有插件重复)

本比赛的核心在于自定义 AI Cache 插件以实现更鲁棒的缓存逻辑。首先还是以 Higress 源码提供的基础代码为例进行编译和上传,配置环节需要提供 redis 服务的名称。注意此步骤会在后续迭代代码中反复使用。

git clone https://github.com/alibaba/higress.git
cd higress/plugins/wasm-go
PLUGIN_NAME=ai-cache EXTRA_TAGS=proxy_wasm_version_0_2_100  make build

1.2 本地测试环境搭建和代码更新逻辑

由于线上环境的测试的成本较高,我们也可以采用 Higress + LobeChat 快速搭建私人 GPT 助理 [ 1] 的方式起两个 Docker 容器进行本地测试,参考 dockerfile 如下:

version: '3.9'

networks:
  higress-net:
    external: false

services:
  higress:
    image: registry.cn-hangzhou.aliyuncs.com/ztygw/aio-redis:1.4.1-rc.1
    environment:
      - GATEWAY_COMPONENT_LOG_LEVEL=misc:error,wasm:debug # 重要,开启日志
      - CONFIG_TEMPLATE=ai-proxy
      - DEFAULT_AI_SERVICE=qwen
      - DASHSCOPE_API_KEY= [YOUR_KEY]
    networks:
      - higress-net
    ports:
      - "9080:8080/tcp"
      - "9001:8001/tcp"
    volumes:
      - 本地data目录:/data
      - 本地log目录:/var/log/higress/ # 重要,方便在容器restrat之后查看日志
    restart: always
  lobechat:
    image: lobehub/lobe-chat
    environment:
      - CODE=123456ed
      - OPENAI_API_KEY=unused
      - OPENAI_PROXY_URL=http://higress:8080/v1
    networks:
      - higress-net
    ports:
      - "3210:3210/tcp"
    restart: always

主要更改了 Higress 的 image,environment 以及 volumes 的配置,启动和重启就是 docker compose up -d docker compose restart。

进一步地,我们需要了解本地代码编写的逻辑如何能反馈到测试环境中。和线上网关环境直接上传编译后的二进制 wasm 插件不同的是,这里需要采用的是:本地编写代码 ⇒ 本地编译 wasm 插件 ⇒ Docker 打包镜像并上传 ⇒ 修改本地测试环境配置中的镜像版本 ⇒ 开始测试并打印日志的流程。具体参考代码如下:

cd ${workspaceFolder}/higress/plugins/wasm-go
PLUGIN_NAME=ai-cache EXTRA_TAGS=proxy_wasm_version_0_2_100 make build 
// 修改版本号(version.txt)
export cur_version=$(cat ${workspaceFolder}/version.txt) && docker build -t [YOUR IMAGE_BASE_URL]:$cur_version -f Dockerfile . && docker push [YOUR_IMAGE_BASE_URL]:$cur_version
// 修改本地测试环境配置中的镜像版本
sudo bash -c \"sed -i 's|oci://registry.cn-hangzhou.aliyuncs.com/XXX:[0-9]*\\\\.[0-9]*\\\\.[0-9]*|oci://registry.cn-hangzhou.aliyuncs.com/XXX:$(cat version.txt)|g' data/wasmplugins/ai-cache-1.0.0.yaml\

文本向量请求逻辑及缓存命中逻辑编写

在本节中,我们将通过一个简单的示例来说明如何在网关中编写请求外部服务的缓存逻辑。

当查询到达时,与 Redis 中存储的键进行匹配(`redisSearchHandler`)。如果完全一致,则直接返回结果(`handleCacheHit`)。
如果不匹配,则请求 `text_embedding` 接口将查询转换为 `query_embedding`(`fetchAndProcessEmbeddings`)。
使用 `query_embedding` 与向量数据库中的向量进行 ANN 搜索,返回最接近的键,并通过阈值进行过滤(`performQueryAndRespond`)。
如果返回结果为空或距离大于阈值,则丢弃结果,本轮缓存未命中,最后将 `query_embedding` 存入向量数据库(`uploadQueryEmbedding`)。
如果距离小于阈值,则再次调用 Redis 对最相似的键进行匹配(`redisSearchHandler`)。
在响应阶段,请求 Redis 新增键值对,键为查询的问题,值为LLM 返回结果。

可以看到,除了 Redis 服务外,我们还需要请求文本向量化服务和向量数据库服务,这里我们分别选取向量生成器:阿里灵积通用文本向量接口 [ 2] 和向量数据库:阿里向量检索服务 DashVector [ 3] 作为服务商。

注意:由于 wasm 插件不支持协程等特性,调用外部服务需遵循:如何在插件中请求外部服务 [ 4]

2.1 外部服务声明和注册

为了实现思路 1-5 的连续外部服务调用。在 Higress 相关的配置上,我们首先需要声明外部服务:

DashVectorClient      wrapper.HttpClient `yaml:"-" json:"-"`
DashScopeClient       wrapper.HttpClient `yaml:"-" json:"-"`
redisClient    wrapper.RedisClient `yaml:"-" json:"-"`

并且在 ParseConfig 函数中注册外部服务:

c.DashVectorInfo.DashVectorClient = wrapper.NewClusterClient(wrapper.DnsCluster{
    ServiceName: c.DashVectorInfo.DashVectorServiceName,
    Port:        443,
    Domain:      c.DashVectorInfo.DashVectorAuthApiEnd,
})
c.DashVectorInfo.DashScopeClient = wrapper.NewClusterClient(wrapper.DnsCluster{
    ServiceName: c.DashVectorInfo.DashScopeServiceName,
    Port:        443,
    Domain:      "dashscope.aliyuncs.com",
})

这里的 ParseConfig 函数是在 http 请求的各个阶段回调函数(requestHeader,requestBody,responseHeader,responseBody)之前的注册函数。

2.2 AI Cache 配置文件

在增加了上述外部服务的基础上,对应的 AI Cache 的配置文件也需要进行修改,此处对应 1.1.5 节的配置。示例配置如下:

Dash:
  dashScopeKey: "YOUR_DASHSCOPE_KEY" # 这个是文本向量的key
  dashScopeServiceName: "qwen" # 重要,需要和scope对应的服务名匹配
  dashVectorCollection: "YOUR_CLUSTER_NAME"
  dashVectorEnd: "YOUR_VECTOR_END" 
  dashVectorKey: "YOUR_DASHVECTOR_KEY" # 这个是DASHVECTOR的key
  dashVectorServiceName: "DashVector.dns" # 重要,需要新建一个vector对应的DNS服务 
  sessionID: "XXX" # 可用可不用,主要用于重复初始化逻辑
redis: # 重要
  serviceName: "redis.static"
  timeout: 2000

2.3 连续 callback 实现连续服务调用

基于上述思路,实现的核心代码如下。其中的核心难点仍在于如何实现服务间的连续调用问题,以 onHttpRequestBody 函数为例,代码需要实现并发逻辑,而不是简单的顺序逻辑。因此,主函数代码必须返回 types.Action,即声明是阻塞还是继续执行。当前逻辑要求在处理完缓存命中逻辑后才能继续操作,因此主函数需要返回 types.Pause。最后,根据我们的处理逻辑,在调用外部服务的回调函数中,根据是否命中缓存执行 proxywasm.ResumeHttpRequest() 或直接返回 proxywasm.SendHttpResponse()。

// ===================== 以下是主要逻辑 =====================
// 主handler函数,根据key从redis中获取value ,如果不命中,则首先调用文本向量化接口向量化query,然后调用向量搜索接口搜索最相似的出现过的key,最后再次调用redis获取结果
// 可以把所有handler单独提取为文件,这里为了方便读者复制就和主逻辑放在一个文件中了
// 
// 1. query 进来和 redis 中存的 key 匹配 (redisSearchHandler) ,若完全一致则直接返回 (handleCacheHit)
// 2. 否则请求 text_embdding 接口将 query 转换为 query_embedding (fetchAndProcessEmbeddings)
// 3. 用 query_embedding 和向量数据库中的向量做 ANN search,返回最接近的 key ,并用阈值过滤 (performQueryAndRespond)
// 4. 若返回结果为空或大于阈值,舍去,本轮 cache 未命中, 最后将 query_embedding 存入向量数据库 (uploadQueryEmbedding)
// 5. 若小于阈值,则再次调用 redis对 most similar key 做匹配。(redisSearchHandler)
// 7. 在 response 阶段请求 redis 新增key/LLM返回结果

func redisSearchHandler(key string, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool, ifUseEmbedding bool) error {
    err := config.redisClient.Get(config.CacheKeyPrefix+key, func(response resp.Value) {
        if err := response.Error(); err == nil && !response.IsNull() {
            log.Warnf("cache hit, key:%s", key)
            handleCacheHit(key, response, stream, ctx, config, log)
        } else {
            log.Warnf("cache miss, key:%s", key)
            if ifUseEmbedding {
                handleCacheMiss(key, err, response, ctx, config, log, key, stream)
            } else {
                proxywasm.ResumeHttpRequest()
                return
            }
        }
    })
    return err
}

// 简单处理缓存命中的情况, 从redis中获取到value后,直接返回
func handleCacheHit(key string, response resp.Value, stream bool, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log) {
    log.Warnf("cache hit, key:%s", key)
    ctx.SetContext(CacheKeyContextKey, nil)
    if !stream {
        proxywasm.SendHttpResponse(200, [][2]string{{"content-type", "application/json; charset=utf-8"}}, []byte(fmt.Sprintf(config.ReturnResponseTemplate, response.String())), -1)
    } else {
        proxywasm.SendHttpResponse(200, [][2]string{{"content-type", "text/event-stream; charset=utf-8"}}, []byte(fmt.Sprintf(config.ReturnStreamResponseTemplate, response.String())), -1)
    }
}

// 处理缓存未命中的情况,调用fetchAndProcessEmbeddings函数向量化query
func handleCacheMiss(key string, err error, response resp.Value, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, queryString string, stream bool) {
    if err != nil {
        log.Warnf("redis get key:%s failed, err:%v", key, err)
    }
    if response.IsNull() {
        log.Warnf("cache miss, key:%s", key)
    }
    fetchAndProcessEmbeddings(key, ctx, config, log, queryString, stream)
}

// 调用文本向量化接口向量化query, 向量化成功后调用processFetchedEmbeddings函数处理向量化结果
func fetchAndProcessEmbeddings(key string, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, queryString string, stream bool) {
    Emb_url, Emb_requestBody, Emb_headers := ConstructTextEmbeddingParameters(&config, log, []string{queryString})
    config.DashVectorInfo.DashScopeClient.Post(
        Emb_url,
        Emb_headers,
        Emb_requestBody,
        func(statusCode int, responseHeaders http.Header, responseBody []byte) {
            // log.Infof("statusCode:%d, responseBody:%s", statusCode, string(responseBody))
            log.Infof("Successfully fetched embeddings for key: %s", key)
            if statusCode != 200 {
                log.Errorf("Failed to fetch embeddings, statusCode: %d, responseBody: %s", statusCode, string(responseBody))
                ctx.SetContext(QueryEmbeddingKey, nil)
                proxywasm.ResumeHttpRequest()
            } else {
                processFetchedEmbeddings(key, responseBody, ctx, config, log, stream)
            }
        },
        10000)
}

// 先将向量化的结果存入上下文ctx变量,其次发起向量搜索请求
func processFetchedEmbeddings(key string, responseBody []byte, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool) {
    text_embedding_raw, _ := ParseTextEmbedding(responseBody)
    text_embedding := text_embedding_raw.Output.Embeddings[0].Embedding
    // ctx.SetContext(CacheKeyContextKey, text_embedding)
    ctx.SetContext(QueryEmbeddingKey, text_embedding)
    ctx.SetContext(CacheKeyContextKey, key)
    performQueryAndRespond(key, text_embedding, ctx, config, log, stream)
}

// 调用向量搜索接口搜索最相似的key,搜索成功后调用redisSearchHandler函数获取最相似的key的结果
func performQueryAndRespond(key string, text_embedding []float64, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool) {
    vector_url, vector_request, vector_headers, err := ConstructEmbeddingQueryParameters(config, text_embedding)
    if err != nil {
        log.Errorf("Failed to perform query, err: %v", err)
        proxywasm.ResumeHttpRequest()
        return
    }
    config.DashVectorInfo.DashVectorClient.Post(
        vector_url,
        vector_headers,
        vector_request,
        func(statusCode int, responseHeaders http.Header, responseBody []byte) {
            log.Infof("statusCode:%d, responseBody:%s", statusCode, string(responseBody))
            query_resp, err_query := ParseQueryResponse(responseBody)
            if err_query != nil {
                log.Errorf("Failed to parse response: %v", err)
                proxywasm.ResumeHttpRequest()
                return
            }
            if len(query_resp.Output) < 1 {
                log.Warnf("query response is empty")
                uploadQueryEmbedding(ctx, config, log, key, text_embedding)
                return
            }
            most_similar_key := query_resp.Output[0].Fields["query"].(string)
            log.Infof("most similar key:%s", most_similar_key)
            most_similar_score := query_resp.Output[0].Score
            if most_similar_score < 0.1 {
                ctx.SetContext(CacheKeyContextKey, nil)
                redisSearchHandler(most_similar_key, ctx, config, log, stream, false)
            } else {
                log.Infof("the most similar key's score is too high, key:%s, score:%f", most_similar_key, most_similar_score)
                uploadQueryEmbedding(ctx, config, log, key, text_embedding)
                proxywasm.ResumeHttpRequest()
                return
            }
        },
        100000)
}

// 未命中cache,则将新的query embedding和对应的key存入向量数据库
func uploadQueryEmbedding(ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, key string, text_embedding []float64) error {
    vector_url, vector_body, err := ConsturctEmbeddingInsertParameters(&config, log, text_embedding, key)
    if err != nil {
        log.Errorf("Failed to construct embedding insert parameters: %v", err)
        proxywasm.ResumeHttpRequest()
        return nil
    }
    err = config.DashVectorInfo.DashVectorClient.Post(
        vector_url,
        [][2]string{
            {"Content-Type", "application/json"},
            {"dashvector-auth-token", config.DashVectorInfo.DashVectorKey},
        },
        vector_body,
        func(statusCode int, responseHeaders http.Header, responseBody []byte) {
            if statusCode != 200 {
                log.Errorf("Failed to upload query embedding: %s", responseBody)
            } else {
                log.Infof("Successfully uploaded query embedding for key: %s", key)
            }
            proxywasm.ResumeHttpRequest()
        },
        10000,
    )
    if err != nil {
        log.Errorf("Failed to upload query embedding: %v", err)
        proxywasm.ResumeHttpRequest()
        return nil
    }
    return nil
}

// ===================== 以上是主要逻辑 =====================

此外,该逻辑只能在返回值为 types.Action 的函数中使用,例如 onHttpResponseBody 这样的流式处理函数无法以类似方式处理。尽管可以确保请求被发送出去,但由于没有阻塞操作,无法调用回调函数。如果有需要,可以参考 wasm-go/pkg/wrapper/http_wrapper.go,添加信号变量进行修改。

总结

本文的完整代码已发布在 GitHub [ 5] 。本文提供的思路仅为抛砖引玉,如何在实际场景中解决复杂的缓存需求仍需各位的智慧。祝大家在比赛中取得理想的成绩!

相关链接:

[1] Higress + LobeChat 快速搭建私人 GPT 助理

https://github.com/alibaba/higress/issues/1023

[2] 向量生成器:阿里灵积通用文本向量接口

https://help.aliyun.com/zh/dashscope/developer-reference/text-embedding-quick-start

[3] 向量数据库:阿里向量检索服务 DashVector

https://www.aliyun.com/product/ai/dashvector

[4] 如何在插件中请求外部服务

https://higress.io/docs/latest/user/wasm-go/

[5] GitHub

https://github.com/Suchun-sv/ai-cache-Demo

标签:embedding,网关,log,AI,Higress,wrapper,key,query,config
From: https://www.cnblogs.com/alisystemsoftware/p/18384381

相关文章

  • 所以你被要求对AI内容进行“人性化处理”
    过去12个月里,“如何让AI内容更人性化”的搜索量增长了943%。越来越多的人试图“通过人为检查”,无论是为了让AI内容不那么糟糕,还是为了蒙混过AI内容检测工具。“使AI内容人性化”这一过程已成为那些渴望增长的公司的默认内容策略。我看到到处都是被压抑的作家,他们询问如何......
  • 用AI图像与Filmora提升你的视觉效果
    发现Filmora的AI图像这一革命性功能如何转变视觉效果的呈现。作为WondershareFilmora广泛AI工具套件的一部分,AI图像脱颖而出,让用户只需点击几下即可增强图像质量并创造引人入胜的视觉效果。除了AI图像,Filmora还提供AI语音增强、AI肖像、AI视频翻译和AI帧插补,确保了满足各种编......
  • 界面控件Telerik UI for ASP.NET Core 2024 Q2亮点 - AI与UI的融合
    TelerikUIforASP.NETCore是用于跨平台响应式Web和云开发的最完整的UI工具集,拥有超过60个由KendoUI支持的ASP.NET核心组件。它的响应式和自适应的HTML5网格,提供从过滤、排序数据到分页和分层数据分组等100多项高级功能。本文将介绍界面组件TelerikUIforASP.NETCore在今年......
  • 初步学习async/await,Task.GetAwaiter,Task.Result
    初步学习async/await,Task.GetAwaiter,Task.Result   网上关于async/await的知识有很多,看了很多但不如自己实践一遍来得快,所以这里记录下我的理解和大家学习下。  首先以最简单的同步方法来开始如下privatestaticvoidTest(){Console.Wr......
  • 秃姐学AI系列之:残差网络 ResNet
    目录残差网络——ResNet残差块思想ResNet块细节ResNet架构总结代码实现残差块两种ResNet块的情况 ResNet模型QA由上图发现,只有当较复杂的函数类包含较小的函数类时,才能确保提高它们的性能。对于深度神经网络,如果我们能将新添加的层训练成恒等映射(identityfu......
  • 秃姐学AI系列之:批量归一化 + 代码实现
    目录批量归一化核心想法批归一化在做什么总结代码实现从零实现创建一个正确的BatchNorm层应用BatchNorm于LeNet模型简单实现QA批量归一化训练深层神经网络是十分困难的,特别是在较短的时间内使他们收敛更加棘手。因为数据在网络最开始,而损失在结尾。训练的过程是......
  • ai取名生成器在哪?让你的命名过程更轻松
    创建产品或企业时,一个响亮的名字往往能让你一秒记住。但寻找一个完美的名字却是一项艰巨的任务,足以让人头疼不已。不过别担心,今天就要向你介绍一些ai取名生成器在线免费工具。只需一句话,它们就能为你提供一系列充满创意的名称。还在为取名而烦恼的小伙伴,快看过来吧~▌ai取......
  • 掀起社交娱乐新浪潮!AI如何应用到短视频APP?
    随着人工智能技术的迅速发展和全球社交媒体用户的增长,AI视频生成应用正逐渐成为短视频社交媒体中的关键工具。AI工具不仅可以提高内容的创造效率,还能为用户带来全新的互动体验。人工智能(AI)已经成为我们日常生活和工作中不可或缺的一部分,随着数据的爆炸性增长和计算能力的提......