首页 > 其他分享 >LangChain表达式LCEL(四)

LangChain表达式LCEL(四)

时间:2024-03-19 22:34:24浏览次数:17  
标签:name countries chunk LangChain Chat LCEL model events 表达式

使用LangChain进行流式处理

流式处理对于基于 LLM 的应用程序对最终用户的响应至关重要,重要的 LangChain 原语,如 LLMs、解析器、提示、检索器和代理实现了 LangChain Runnable 接口。

该接口提供了两种常见的流式内容的方法:

  1. sync stream 和 async astream:流式处理的默认实现,从链中流式传输最终输出
  2. async astream_events 和 async astream_log:这些方法提供了一种从链中流式传输中间步骤最终输出的方式。

Using Stream

所有Runnable对象都实现了一个名为stream的同步方法和一个名为astream的异步实体,这些方法旨在以的形式流式传输最终输出,只要可用就会产生每个块。

只有在程序中的所有步骤都知道如何处理输入流时,即一次处理一个输入块,并生成相应的输出块时,才能进行流式处理。

LLMs和Chat Models

大型语言模型及其聊天变体是基于 LLM 的应用程序的主要瓶颈。

大型语言模型生成对查询的完整响应可能需要几秒钟。这远远慢于应用程序对最终用户响应感觉灵敏的**~200-300 ms**的阈值。

让应用程序感觉响应更快的关键策略是显示中间进度即,通过 token 流式传输模型令牌的输出。

from langchain_openai import ChatOpenAI

model = ChatOpenAI()
chunks =[]
async for chunk in model.astream("你好!"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)
|你|好|!|有|什|么|可以|帮|助|你|的|吗|?||
chunks[0]
AIMessageChunk(content='')

我们得到了一个叫做 AIMessageChunk 的东西。这个块代表了一个 AIMessage 的一部分。

消息块是可以添加的 – 可以简单地将它们相加以获得到目前为止响应的状态!

Chains

事实上,所有LLM申请都涉及更多步骤,而不仅仅是调用语言模型

使用 LangChain 表达式语言(LCEL)构建一个简单的链,它结合了提示、模型和解析器,并验证流式传输是否有效。

我们使用 StrOutputParser 来解析模型的输出。这是一个简单的解析器,它从 AIMessageChunk 中提取内容字段,为我们提供模型返回的token

LCEL 是一种通过将不同的 LangChain 原语链接在一起来指定“程序”的声明方式。使用 LCEL 创建的链受益于streamastream 的自动实现,允许最终输出的流式传输。事实上,用 LCEL 创建的链实现了整个标准 Runnable 接口。

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

prompt = ChatPromptTemplate.from_template("告诉我一个关于 {topic} 的笑话")
model = ChatOpenAI()
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "鹦鹉"}):
    print(chunk, end="|", flush=True)
|为|什|么|鹦|鹉|会|成|为|最|佳|演|说|家|?

|因|为|它|们|总|是|有|话|可|说|,|而|且|还|可以|模|仿|人|类|的|语|言|!|

不必使用 LangChain 表达语言来使用 LangChain,您可以依赖于标准的 命令式 编程方法,通过在每个组件上分别调用 invokebatchstream,将结果分配给变量,然后根据需要在下游使用它们。

Working with Input Streams

如果想要在生成时从输出中流式传输 JSON 怎么办?

如果您依赖于 json.loads 来解析部分 json,那么解析将失败,因为部分 json 不会是有效的 json。

有一种方法,解析器操作输入流,尝试将部分json“自动完成”为有效状态。

from langchain_core.output_parsers import JsonOutputParser

chain = (
    model | JsonOutputParser()
)  # 由于 Langchain 旧版本中的一个错误,JsonOutputParser 未能从某些模型中流式传输结果
async for text in chain.astream(
    '以 JSON 格式输出法国、西班牙和日本的国家及其人口的列表。使用一个带有“countries”外键的字典,其中包含一个国家列表。每个国家应该有“name”和“population”关键字。'
):
    print(text, flush=True)
{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': '法'}]}
{'countries': [{'name': '法国'}]}
{'countries': [{'name': '法国', 'population': 670}]}
{'countries': [{'name': '法国', 'population': 670128}]}
{'countries': [{'name': '法国', 'population': 67012883}]}
{'countries': [{'name': '法国', 'population': 67012883}, {}]}
{'countries': [{'name': '法国', 'population': 67012883}, {'name': ''}]}
{'countries': [{'name': '法国', 'population': 67012883}, {'name': '西'}]}
{'countries': [{'name': '法国', 'population': 67012883}, {'name': '西班'}]}
{'countries': [{'name': '法国', 'population': 67012883}, {'name': '西班牙'}]}

使用前面的示例,并在末尾附加一个提取函数,用于从最终的JSON中提取国家/地区名称。

链中的任何步骤,如果操作的是最终输入而不是对输入流,都可能通过stream或 破坏流功能astream

稍后,我们将讨论astream_events流式传输中间步骤结果的 API。

即使该链包含仅对最终输入进行操作的步骤,该 API 也会流式传输中间步骤的结果。

from langchain_core.output_parsers import (
    JsonOutputParser,
)


# 一个操作最终输入而不是输入流的函数
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names


chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
    '以 JSON 格式输出法国、西班牙和日本的国家及其人口的列表。使用一个带有“countries”外键的字典,其中包含一个国家列表。每个国家应该有“name”和“population”关键字。'
):
    print(text, end="|", flush=True)

Generator Functions:生成器函数

使用可以操作输入流的生成器函数来修复流式处理

生成器函数(使用yield的函数)允许编写对能够操作输入流的代码

from langchain_core.output_parsers import JsonOutputParser

async def _extract_country_names_streaming(input_stream):
    """A function that operates on input streams."""
    country_names_so_far = set()

    async for input in input_stream:
        if not isinstance(input, dict):
            continue

        if "countries" not in input:
            continue

        countries = input["countries"]

        if not isinstance(countries, list):
            continue

        for country in countries:
            name = country.get("name")
            if not name:
                continue
            if name not in country_names_so_far:
                yield name
                country_names_so_far.add(name)


chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
    print(text, end="|", flush=True)

因为上面的代码依赖于 JSON 自动完成,所以您可能会看到国家/地区的部分名称(例如,SpSpain),这不是提取结果所想要的!

我们关注的是流媒体概念,而不一定是链条的结果。

Non-streaming components

非流式组件

一些内置组件,如检索器,不提供任何 streaming。如果我们尝试对它们进行stream会发生什么?

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
    ["harrison worked at kensho", "harrison likes spicy food"],
    embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks

只有从该组件产生的最终结果被流式传输了。

并非所有组件都必须实现流式传输——在某些情况下,流式传输要么是不必要的、困难的,要么就是没有意义。

使用非流式组件构建的 LCEL 链在很多情况下仍然能够进行流式传输,部分输出的流式传输在链中最后一个非流式步骤之后开始。

retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | model
    | StrOutputParser()
)

for chunk in retrieval_chain.stream(
    "Where did harrison work? " "Write 3 made up sentences about this place."
):
    print(chunk, end="|", flush=True)
 Based| on| the| given| context|,| the| only| information| provided| about| where| Harrison| worked| is| that| he| worked| at| Ken|sh|o|.| Since| there| are| no| other| details| provided| about| Ken|sh|o|,| I| do| not| have| enough| information| to| write| 3| additional| made| up| sentences| about| this| place|.| I| can| only| state| that| Harrison| worked| at| Ken|sh|o|.||

Using Stream Events

Event Streaming is a beta API.

为了让 astream_events API 正常工作:

  • 尽可能在代码中使用 async(例如,异步工具等)
  • 如果定义自定义函数/可运行对象,请传播回调
  • 每当使用非 LCEL 的可运行对象时,请确保在 LLM 上调用 .astream() 而不是 .ainvoke,以强制 LLM 流式传输令牌。
Event Reference

下表是参考表,显示了各种 Runnable 对象可能发出的一些事件。

当流正确实现时,直到输入流完全被消耗之后,才知道可运行的输入。这意味着inputs通常仅包含end事件而不是start事件。

事件名称输入输出
on_chat_model_start[model name]{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream[model name]AIMessageChunk(content=“hello”)
on_chat_model_end[model name]{“messages”: [[SystemMessage, HumanMessage]]}{“generations”: […], “llm_output”: None, …}
on_llm_start[model name]{‘input’: ‘hello’}
on_llm_stream[model name]‘Hello’
on_llm_end[model name]‘Hello human!’
ChatModel

首先查看聊天模型产生的事件。

events = []
async for event in model.astream_events("hello", version="v1"):
    events.append(event)

由于这是一个测试版的API,使用version=“v1”参数将使我们能够最大程度地减少对代码的此类破坏性更改。

看一下一些开始事件和一些结束事件

events[:3]
[{'event': 'on_chat_model_start',
  'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
  'name': 'ChatAnthropic',
  'tags': [],
  'metadata': {},
  'data': {'input': 'hello'}},
 {'event': 'on_chat_model_stream',
  'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
  'tags': [],
  'metadata': {},
  'name': 'ChatAnthropic',
  'data': {'chunk': AIMessageChunk(content=' Hello')}},
 {'event': 'on_chat_model_stream',
  'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
  'tags': [],
  'metadata': {},
  'name': 'ChatAnthropic',
  'data': {'chunk': AIMessageChunk(content='!')}}]
events[-2:]
[{'event': 'on_chat_model_stream',
  'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
  'tags': [],
  'metadata': {},
  'name': 'ChatAnthropic',
  'data': {'chunk': AIMessageChunk(content='')}},
 {'event': 'on_chat_model_end',
  'name': 'ChatAnthropic',
  'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
  'tags': [],
  'metadata': {},
  'data': {'output': AIMessageChunk(content=' Hello!')}}]
Chain

解析流式JSON的标准链

chain = (
    model | JsonOutputParser()
)  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models

events = [
    event
    async for event in chain.astream_events(
        'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
        version="v1",
    )
]

如果您检查前几个事件,您会发现有 3 个不同的开始事件,而不是2 个开始事件。

三个启动事件对应:

  1. 链(模型+解析器)
  2. 该模型
  3. 解析器
events[:3]
[{'event': 'on_chain_start',
  'run_id': 'b1074bff-2a17-458b-9e7b-625211710df4',
  'name': 'RunnableSequence',
  'tags': [],
  'metadata': {},
  'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}},
 {'event': 'on_chat_model_start',
  'name': 'ChatAnthropic',
  'run_id': '6072be59-1f43-4f1c-9470-3b92e8406a99',
  'tags': ['seq:step:1'],
  'metadata': {},
  'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}}},
 {'event': 'on_parser_start',
  'name': 'JsonOutputParser',
  'run_id': 'bf978194-0eda-4494-ad15-3a5bfe69cd59',
  'tags': ['seq:step:2'],
  'metadata': {},
  'data': {}}]

使用此 API 从模型和解析器获取流事件的输出,忽略开始事件、结束事件和链中的事件。

num_events = 0

async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v1",
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    if num_events > 30:
        # Truncate the output
        print("...")
        break
Chat model chunk: ' Here'
Chat model chunk: ' is'
Chat model chunk: ' the'
Chat model chunk: ' JSON'
Chat model chunk: ' with'
Chat model chunk: ' the'
Chat model chunk: ' requested'
Chat model chunk: ' countries'
Chat model chunk: ' and'
Chat model chunk: ' their'
Chat model chunk: ' populations'
Chat model chunk: ':'
Chat model chunk: '\n\n```'
Chat model chunk: 'json'
Parser chunk: {}
Chat model chunk: '\n{'
Chat model chunk: '\n '
Chat model chunk: ' "'
Chat model chunk: 'countries'
Chat model chunk: '":'
Parser chunk: {'countries': []}
Chat model chunk: ' ['
Chat model chunk: '\n   '
Parser chunk: {'countries': [{}]}
Chat model chunk: ' {'
...

由于模型和解析器都支持流式传输,因此我们可以实时看到来自两个组件的流式传输事件

Filtering Events

过滤事件

由于此 API 产生如此多的事件,因此能够过滤事件非常有用。

可以按组件名称、组件标签或组件类型进行过滤。

  • By Name

    chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
        {"run_name": "my_parser"}
    )
    
    max_events = 0
    async for event in chain.astream_events(
        'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
        version="v1",
        include_names=["my_parser"],	# 只输出名为my_parser的事件
    ):
        print(event)
        max_events += 1
        if max_events > 10:
            # Truncate output
            print("...")
            break
    
  • By Type

    chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
        {"run_name": "my_parser"}
    )
    
    max_events = 0
    async for event in chain.astream_events(
        'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
        version="v1",
        include_types=["chat_model"],	# 只输出类型为chat_model的事件
    ):
        print(event)
        max_events += 1
        if max_events > 10:
            # Truncate output
            print("...")
            break
    
  • By Tags

    标签由给定可运行对象的子组件继承。 如果您使用标签进行过滤,请确保这是您想要的。

    chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})
    
    max_events = 0
    async for event in chain.astream_events(
        'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
        version="v1",
        include_tags=["my_chain"],	# 只输出tags为my_chain的事件
    ):
        print(event)
        max_events += 1
        if max_events > 10:
            # Truncate output
            print("...")
            break
    
Non-streaming components

非流式组件

某些组件由于不对输入流进行操作而无法很好地进行流传输,虽然这些组件在使用 astream 时可能会中断最终输出的流式传输,但 astream_events 仍然会从支持流式传输的中间步骤中产生流式传输事件!

通过 astream_events 我们仍然可以看到来自模型和解析器的流输出

num_events = 0

async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v1",
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    if num_events > 30:
        # Truncate the output
        print("...")
        break
Chat model chunk: ' Here'
Chat model chunk: ' is'
Chat model chunk: ' the'
Chat model chunk: ' JSON'
Chat model chunk: ' with'
Chat model chunk: ' the'
Chat model chunk: ' requested'
Chat model chunk: ' countries'
Chat model chunk: ' and'
Chat model chunk: ' their'
Chat model chunk: ' populations'
Chat model chunk: ':'
Chat model chunk: '\n\n```'
Chat model chunk: 'json'
Parser chunk: {}
Chat model chunk: '\n{'
Chat model chunk: '\n '
Chat model chunk: ' "'
Chat model chunk: 'countries'
Chat model chunk: '":'
Parser chunk: {'countries': []}
Chat model chunk: ' ['
Chat model chunk: '\n   '
Parser chunk: {'countries': [{}]}
Chat model chunk: ' {'
Chat model chunk: '\n     '
Chat model chunk: ' "'
...
Propagating Callbacks

传播回调

如果在工具中使用调用可运行对象,则需要将回调传播到可运行对象;否则,不会生成任何流事件。

当使用 RunnableLambdas 或 @chain 装饰器时,回调会在幕后自动传播。

标签:name,countries,chunk,LangChain,Chat,LCEL,model,events,表达式
From: https://blog.csdn.net/weixin_43787408/article/details/136857479

相关文章

  • langchain chatchat运行机制源码解析
    langchainchatchat的简介就不多说了,大家可以去看github官网介绍,虽然当前版本停止了更新,下个版本还没有出来,但作为学习还是很好的。一、关键启动过程:1、start_main_server入口2、run_controller启动fastchatcontroller端口200013、run_openai_api启动fastchat对外提供的类......
  • 150. 逆波兰表达式求值c
    intf(inta,intb,charc){if(c=='+')returna+b;if(c=='-')returna-b;if(c=='/')returna/b;returna*b;}intevalRPN(char**tokens,inttokensSize){int*stack=(int*)malloc(sizeof(int)*tokensSize);......
  • java核心技术卷1 第六章:接口、lambda表达式与内部类
    接口接口不是类,而是描述了符合这个接口的类应该做什么,描述了一组抽象的需求,而没有指定怎么做接口中的所有方法自动是public,接口中声明方法不需要加public(java规范,减少不必要的冗余声明,即使一些程序员为了清晰习惯加上public)实现接口时,需要加上public,不然默认将权限设为了defa......
  • Python——Regular Expression(正则表达式)RE
    正则表达式是一种强大的文本处理工具,它使用一种特殊的语法来匹配、查找以及替换字符串中的字符组合。在Python中,正则表达式,"re模块"。英文叫做"RegularExpression"。re模块是Python中用于处理正则表达式的标准库。它提供了多个函数来执行正则表达式的匹配、查找、替换和分割操......
  • GPT实战系列-LangChain的Prompt提示模版构建
    GPT实战系列-LangChain的Prompt提示模版构建LangChainGPT实战系列-LangChain如何构建基通义千问的多工具链GPT实战系列-构建多参数的自定义LangChain工具GPT实战系列-通过Basetool构建自定义LangChain工具方法GPT实战系列-一种构建LangChain自定义Tool工具的简单方法G......
  • 洛谷-P1449 后缀表达式
    目录 何为后缀表达式?模拟过程AC代码采用STL的stack题目链接:P1449后缀表达式-洛谷|计算机科学教育新生态(luogu.com.cn) 何为后缀表达式?那后缀表达式是怎么算的呢那显然就需要引用最开始说的栈了因为后缀表表达式本来就是栈的一种应用那么现在来说说后缀表......
  • 正则表达式(java)
    正则表达式多用于字符串匹配,检索。基础符号[]意义:或a[bc]等于ab或ac()意义:和(abc)只等于abc[^]意义:否a[^bc]表示不等于ab或ac*意义:零或更多ab*等于a,ab,abb...+意义:1或更多ab+等于abb,abbb...[0-9]+等于长度大于一的数字序列。[A-Z]+等于长度大......
  • 表达式求值
    #include<iostream>#include<unordered_map>usingnamespacestd;constintN=10010;charop[N];intnum[N];intidx1=-1,idx2=-1;//模拟栈voidcal(){intb=num[idx2--];inta=num[idx2--];//注意a,b顺序charc=op[idx1--];intx=0;i......
  • 实验一 c语言开发环境使用和数据类型、运算符、表达式
    task1`#include<stdio.h>include<stdlib.h>intmain(){printf("o\to\n");printf("<H>\t<H>\n");printf("II\tII\n");system("pause:");return0;}`task2`#include......
  • 实验1 C语言开发环境使用和数据类型、运算符、表达式
    点击查看代码#include<stdio.h>#include<stdlib.h>intmain(){ printf("oo\n"); printf("<H><H>\n"); printf("IIII\n"); system("pause"); return0;}点击查看代码#include<stdio......