首页 > 其他分享 >袋鼠云数栈产品中 AI+ 实现原理剖析

袋鼠云数栈产品中 AI+ 实现原理剖析

时间:2024-01-25 14:11:07浏览次数:35  
标签:function 云数栈 袋鼠 const AI current message data event

我们是袋鼠云数栈 UED 团队,致力于打造优秀的一站式数据中台产品。我们始终保持工匠精神,探索前端道路,为社区积累并传播经验价值。

本文作者:修能

生产力工具 + AI 是不可逆转的趋势,慢慢的大模型能力通过 AI Agent 落地的工程化能力也开始趋于成熟。作为大数据产品的数栈也必然是需要借助 AI 能力提升产品竞争力。
去年 12 月,我们在产品中上线了 AI+ 的功能,借助已经开源的大模型的能力,帮助我们探索和落地更多地应用场景。在初版 AI+ 的功能中,我们实现了基础功能的通话。

SSE

在 ChatGPT 中,我们在等待大模型生成回答的时间通常不需要很久。这是因为 ChatGPT 通过 server-sent events(SSE)来实现将生成的部分回答通过事件流传递到前端。而这就让前端不必等回答全部生成后再获取,也就使得不需要请求等待很久。

SSE 是一种基于 HTTP 协议的单向通信机制,用于服务端向客户端推送数据。

SSE WebSocket
基于 HTTP 协议 基于 TCP 连接,本身是一种协议
单向通信 双向通信
简单易用 复杂

入门使用

// 创建 SSE 的实例
const evtSource = new EventSource("//api.example.com/ssedemo.php", {
  withCredentials: true,
});

// 添加监听事件
evtSource.onmessage = (event) => {
  const newElement = document.createElement("li");
  const eventList = document.getElementById("list");

  newElement.textContent = `message: ${event.data}`;
  eventList.appendChild(newElement);
};

// 错误处理
evtSource.onerror = (err) => {
  console.error("EventSource failed:", err);
};

// 关闭事件流
evtSource.close();

需要注意的是,SSE 请求的服务端响应信息头的 MIME 类型必须是text/event-stream,否则会无法监听到事件。
另外,由于是基于 HTTP 协议的,所以在 HTTP/1.1 或更低的时候,会受浏览器最大连接数的限制。


Fields

收到的消息格式一定是具有以下字段的某种组合,其他字段名都将忽略,每行一个:

  • event
  • data
  • id
  • retry
: this is a test stream // 第一条消息,这会被解析会注释

data: some text // 第二条消息

data: another message // 第三条消息
data: with two lines

event: userconnect // 第四条消息
data: {"username": "bobby", "time": "02:33:48"}

如上所示,默认浏览器的 EventSource API 虽然可用,但是限制比较多。

  1. 只支持 url 和 withCredentials 参数。不支持往 body 里传参数。而通常来说 URL 是有最大长度限制的。
  2. 无法自定义请求头。
  3. 只能发起 GET 请求。

其实,我们也可以通过 Fetch 来实现 SSE 的通信,只不过需要额外自行处理数据流的传递。

实现

首先,我们借助 Fetch 的能力来实现请求。

const response = await fetch(url, options);

通过接受用户提供的 url 和 options 发起一个 fetch 的请求。
然后,我们需要排除掉非 SSE 的请求类型,我们可以直接拿响应的 header 中拿 content-type进行判断。

const contentType = response.headers.get('content-type');
if (!contentType?.startsWith('text/event-stream')) {
    throw new Error('SSE 请求必须设置 content-type 为 text/event-stream');
}

接着,我们业务场景中通常直接通过 response.json()获取 JSON 格式的数据了,但这里我们由于是事件流,所以我们通过 response.body 拿到的是一个 ReadableStream。我们需要借助相关的 API 进行流的读取。

const reader = response.body.getReader();
let result: ReadableStreamDefaultReadResult<Uint8Array>;
while (!(result = await reader.read()).done) {
  	// 假定每一次 read 的 value 都是完整的消息
    onmessage(onChunk(result.value));
}

其中 onChunk 函数就是处理事件流中的每一份数据的。

// 伪代码
function onChunk(arr: Uint8Array){
  const links = seekLinks();
  // 待完善
}

在实现 seekLinks 方法之前,我们需要先知道到什么时候算每一行的结束。


从 Fields 可以知道,每一行是以\n作为区分的。

function seekLinks(arr: Uint8Array){
  const lines = [];
  const buffer = arr;
  const bufLength = buffer.length;
  let position = 0;
  let lineStart = 0;
  while(position < bufLength){
    // '\n'.charCodeAt() === 10;
    if(buffer[position] === 10){
      lines.push(buffer.slice(lineStart, position));
      lineStart = position;
    };
    position += 1;
  }
  return lines;
}

在获取到所有行后,针对每一行做处理。

// 伪代码
function onChunk(arr: Uint8Array){
  const links = seekLinks();
  const decoder = new TextDecoder();
  let message = {
    data: '',
    event: '',
    id: '',
    retry: undefined,
  }:
  links.forEach((line) => {
    // ':'.charCodeAt() === 58;
    const colon = line.findIndex(l => l === 58);
    const fieldArr = line.slice(0, colon);
    const valueArr = line.slice(colon);
    if(colon === -1){
      // 当冒号作为开头的时候,解析成注释
      return;
    }
    const field = decoder.decode(fieldArr);
    const value = decoder.decode(valueArr);
    switch (field) {
      case 'data':
          message.data = message.data
              ? message.data + '\n' + value
              : value;
          break;
      case 'event':
          message.event = value;
          break;
      case 'id':
          message.id = value;
          break;
      case 'retry':
          const retry = parseInt(value, 10);
          message.retry = retry
          break;
  	}
  });
  return message;
}

大致完成了最简单的基础功能的解析,而以上伪代码参考 fetch-event-source 的源码。


借助 fetch-event-source 的能力,在数栈产品中调用的方式和 HTTP 请求基本保持一致。

function sse(url: string, params: any, options: FetchEventSourceInit) {
  const headers = {
    'Content-Type': 'application/json',
    accept: 'text/event-stream',
  };
  fetchEventSource(url, {
    method: 'POST',
    body: JSON.stringify(params),
    headers,
    ...options,
  });
}

打字机效果

接着,我们实现具备科技感的打字机效果:

输出

这里我们不能直接将响应的消息直接打印到屏幕上,因为响应的消息通常是好多字,这样子会导致打字机效果显得非常卡顿,用户体验不佳。
在数栈产品中,我们通过将响应的消息收集到暂存区中,然后通过每秒从暂存区中取出若干个字符打印到屏幕上,优化打字机卡顿的效果。

function AIGC(){
   const typing = useTyping({
      // 暂存区启动后,每个 delay 的时间都会执行该方法将消息打印到屏幕上
      onTyping(val) {
        // ...
      },
  });
	const handleChat = (message: string) => {
      // 标志暂存区需要开始存响应的消息了
      typing.start();
      requestChat(params, {
        onmessage(event: { data: string }) {
           	const { data } = event;
            // 把响应的消息存入暂存区中
            typing.push(data);
        },
        onclose() {
            // 关闭或失败的话,释放暂存区的数据
            typing.close();
        },
        one rror() {
            typing.close();
        },
    });
  };
}

其中,相关暂存区的代码整理成 useTyping 实现。

export default function useTyping({
    onTyping,
    onEnd,
}: {
    onTyping: (val: string) => void;
    onEnd: () => void;
}) {
    const interval = useRef<number>();
    const queue = useRef<string>('');
    const isStart = useRef<boolean>(false);

    function startTyping() {
        if (interval.current) return;
        let index = 0;
        interval.current = window.setInterval(() => {
            if (index < queue.current.length) {
                const str = queue.current;
                onTyping(str.slice(0, index + 1));
                index++;
            } else if (!isStart.current) {
                // 如果发送了全部的消息且信号关闭,则清空队列
                window.clearInterval(interval.current);
                interval.current = 0;
                onEnd();
            }
            // 如果发送了全部的消息,但是信号没有关闭,则什么都不做继续轮训等待新的消息
        }, 50);
    }

    useEffect(() => {
        return () => {
            window.clearInterval(interval.current);
            interval.current = 0;
        };
    }, []);

    function start() {
        isStart.current = true;
        window.clearInterval(interval.current);
        interval.current = 0;
        queue.current = '';
    }

    function push(str: string) {
        if (!isStart.current) return;
        queue.current += str.replace(/\\n/g, '\n');
        startTyping();
    }

    // 关闭的时候不需要清空队列,因为可能还有一些消息没有发送完毕,统一等消息发送完毕后关闭
    function close() {
        isStart.current = false;
    }

    return { start, push, close };
}

光标

在实现了打字机效果后,我们还需要添加一个闪烁的光标。
原理比较简单,就是在消息区域的最后一个元素的末尾添加元素即可。

.markdown {
  >*:last-child::after {
    content: " ";
    width: 2px;
    height: 13px;
    transform: translate(1px, 2px);
    font-family: Menlo, Monaco, "Courier New", monospace;
    font-weight: normal;
    font-size: 0;
    font-feature-settings: "liga" 0, "calt" 0;
    line-height: 13px;
    letter-spacing: 0;
    display: inline-block;
    visibility: hidden;
    animation: blinker 1s step-end infinite;
    background: #000;
  }

  @keyframes blinker {
    0% {
      visibility: inherit;
    }
    50% {
      visibility: hidden;
    }
    100% {
      visibility: inherit;
    }
  }
}

当然,这里有一些问题,在 markdown 解析出 Code Block 的时候会导致光标错位,这个问题 ChatGPT 同样也有。


那么到这里,我们就实现了一个具备基础功能的 AI+ 的需求。

最后

欢迎关注【袋鼠云数栈UED团队】~
袋鼠云数栈 UED 团队持续为广大开发者分享技术成果,相继参与开源了欢迎 star

标签:function,云数栈,袋鼠,const,AI,current,message,data,event
From: https://www.cnblogs.com/dtux/p/17987027

相关文章

  • 《幻兽帕鲁》爆火,大厂坐不住了:这游戏是 AI 设计的?丨 RTE 开发者日报 Vol.134
       开发者朋友们大家好: 这里是「RTE开发者日报」,每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享RTE(RealTimeEngagement)领域内「有话题的新闻」、「有态度的观点」、「有意思的数据」、「有思考的文章」、「有看点的会议」,但内容仅代表......
  • Web server failed to start. Port 8080 was already in use.
    场景上午在启动程序时,发现端口被占用,启动失败了***************************APPLICATIONFAILEDTOSTART***************************Description:Webserverfailedtostart.Port8080wasalreadyinuse.Action:Identifyandstoptheprocessthat'slisteni......
  • 媒体声音|PolarDB 再升级:欢迎来到云数据库 x AI 新时代
    以下文章来源于产业家,作者产业媒体 作者|思杭编辑|皮爷出品|产业家 “搭积木”、“自动驾驶”、“三层解耦”,这些形象的标签成了1月17日阿里云开发者生态大会当天最出圈的词汇。 会上,一名小学生受邀上台演示了数据库查询的场景。一种“全民编程”的气氛向现场观众......
  • 《SAIS Supervising and Augmenting Intermediate Steps for Document-Level Relation
    代码 原文地址 预备知识:1.什么是标记索引(tokenindices)?标记索引是一种用于表示文本中的单词或符号的数字编码。它们可以帮助计算机理解和处理自然语言。例如,假如有一个字典{"我":1,"是":2,"Bing":3,".":4},那么文本"我是Bing."的标记索引就是[1,2,3,4]。不同的模......
  • Docker启动Nacos报错:Nacos Server did not start because dumpservice bean construct
    一、表象重启服务器之后Docker运行Nacos容器,启动成功,但是外网无法访问。查看了一下Nacos启动日志(dockerlogsnacos容器名)二、分析很明显是数据库配``置问题。。如果是数据库配置的问题,可以着重检查以下信息尤其是MySQL内网Host,查询方式见Docker安装Nacos三、解决我已......
  • 2024AAAI_SGNet Structure Guided Network via Gradient-Frequency Awareness for Dep
    1.任务描述: 给定输入LR深度图和HRRGB图像,引导DSR目的是在ground-truth深度图监督的条件下,预测HR深度图2.Network本文提出的SGNet主要包括两部分,即梯度校准模块(GCM)和频率感知模块(FAM)。首先将RGB图像和上采样后的LR深度图送入到GCM,利用RGB丰富的梯度信息在梯度域中......
  • 新火种AI|周鸿祎和傅盛“泯恩仇”!这就是AI的力量
    作者:小岩编辑:彩云互联网时期留下的“恩怨情仇”,都将在大模型时代得到解决。没错,这次的主角依然是周鸿祎。与程前互怼的热度刚刚冷却,周鸿祎就立刻参加了傅盛的圆桌论坛。而这次对谈的吸睛程度,绝不逊于上次,因为这是周鸿祎傅盛两师徒,继16年前决裂之后的首次同台。两人此次可以破冰,都得......
  • k8s - Promtail 重写日志标签名
    1.Promtail重写日志标签名默认Promtail会导出Pod中的一些元数据,可以通过访问Promtail的web界面,获取可以拿到的原始标签#获取promtailPod的PodIP地址kubectlgetpo-A-owide|greppromtail#输出#lokipromtail-s2c2x......
  • 中国大模型迎来“95后” 百度奖学金发掘百位“未来AI技术领袖”
    在人工智能掀起的科技革命和产业变革浪潮下,大模型成为最受关注的研究领域。1月22日,第十一届百度奖学金颁奖典礼在北京举行,来自全球顶尖高校及科研机构的10位“未来AI技术领袖”脱颖而出,他们平均年龄仅27岁,其中8人聚焦大模型领域。百度首席技术官王海峰致辞并颁奖。百度首席技术......
  • 脚本exlpain结果与optimizer_trace结果不一致
    先说结论:表数据量太少,使用索引的效率不如全表扫描。表信息:CREATETABLE`w_map_cell`(`id`int(11)NOTNULLAUTO_INCREMENTCOMMENT'主键',`shelf_id`bigint(11)DEFAULTNULLCOMMENT'货架id',`cell_no`varchar(50)DEFAULTNULLCOMMENT'储位编号',`cell_name......