使用LangChain进行流式处理
流式处理对于基于 LLM 的应用程序对最终用户的响应至关重要,重要的 LangChain 原语,如 LLMs、解析器、提示、检索器和代理实现了 LangChain Runnable 接口。
该接口提供了两种常见的流式内容的方法:
- sync
stream
和 asyncastream
:流式处理的默认实现,从链中流式传输最终输出。 - async
astream_events
和 asyncastream_log
:这些方法提供了一种从链中流式传输中间步骤和最终输出的方式。
Using Stream
所有Runnable
对象都实现了一个名为stream
的同步方法和一个名为astream
的异步实体,这些方法旨在以块的形式流式传输最终输出,只要可用就会产生每个块。
只有在程序中的所有步骤都知道如何处理输入流时,即一次处理一个输入块,并生成相应的输出块时,才能进行流式处理。
LLMs和Chat Models
大型语言模型及其聊天变体是基于 LLM 的应用程序的主要瓶颈。
大型语言模型生成对查询的完整响应可能需要几秒钟。这远远慢于应用程序对最终用户响应感觉灵敏的**~200-300 ms**的阈值。
让应用程序感觉响应更快的关键策略是显示中间进度,即,通过 token 流式传输模型令牌的输出。
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
chunks =[]
async for chunk in model.astream("你好!"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
|你|好|!|有|什|么|可以|帮|助|你|的|吗|?||
chunks[0]
AIMessageChunk(content='')
我们得到了一个叫做 AIMessageChunk
的东西。这个块代表了一个 AIMessage
的一部分。
消息块是可以添加的 – 可以简单地将它们相加以获得到目前为止响应的状态!
Chains
事实上,所有LLM申请都涉及更多步骤,而不仅仅是调用语言模型
使用 LangChain 表达式语言(LCEL)构建一个简单的链,它结合了提示、模型和解析器,并验证流式传输是否有效。
我们使用 StrOutputParser
来解析模型的输出。这是一个简单的解析器,它从 AIMessageChunk
中提取内容字段,为我们提供模型返回的token
。
LCEL 是一种通过将不同的 LangChain 原语链接在一起来指定“程序”的声明方式。使用 LCEL 创建的链受益于
stream
和astream
的自动实现,允许最终输出的流式传输。事实上,用 LCEL 创建的链实现了整个标准 Runnable 接口。
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template("告诉我一个关于 {topic} 的笑话")
model = ChatOpenAI()
parser = StrOutputParser()
chain = prompt | model | parser
async for chunk in chain.astream({"topic": "鹦鹉"}):
print(chunk, end="|", flush=True)
|为|什|么|鹦|鹉|会|成|为|最|佳|演|说|家|?
|因|为|它|们|总|是|有|话|可|说|,|而|且|还|可以|模|仿|人|类|的|语|言|!|
不必使用 LangChain 表达语言来使用 LangChain,您可以依赖于标准的 命令式 编程方法,通过在每个组件上分别调用
invoke
、batch
或stream
,将结果分配给变量,然后根据需要在下游使用它们。
Working with Input Streams
如果想要在生成时从输出中流式传输 JSON 怎么办?
如果您依赖于 json.loads
来解析部分 json,那么解析将失败,因为部分 json 不会是有效的 json。
有一种方法,解析器操作输入流,尝试将部分json“自动完成”为有效状态。
from langchain_core.output_parsers import JsonOutputParser
chain = (
model | JsonOutputParser()
) # 由于 Langchain 旧版本中的一个错误,JsonOutputParser 未能从某些模型中流式传输结果
async for text in chain.astream(
'以 JSON 格式输出法国、西班牙和日本的国家及其人口的列表。使用一个带有“countries”外键的字典,其中包含一个国家列表。每个国家应该有“name”和“population”关键字。'
):
print(text, flush=True)
{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': '法'}]}
{'countries': [{'name': '法国'}]}
{'countries': [{'name': '法国', 'population': 670}]}
{'countries': [{'name': '法国', 'population': 670128}]}
{'countries': [{'name': '法国', 'population': 67012883}]}
{'countries': [{'name': '法国', 'population': 67012883}, {}]}
{'countries': [{'name': '法国', 'population': 67012883}, {'name': ''}]}
{'countries': [{'name': '法国', 'population': 67012883}, {'name': '西'}]}
{'countries': [{'name': '法国', 'population': 67012883}, {'name': '西班'}]}
{'countries': [{'name': '法国', 'population': 67012883}, {'name': '西班牙'}]}
使用前面的示例,并在末尾附加一个提取函数,用于从最终的JSON中提取国家/地区名称。
链中的任何步骤,如果操作的是最终输入而不是对输入流,都可能通过
stream
或 破坏流功能astream
。
稍后,我们将讨论
astream_events
流式传输中间步骤结果的 API。即使该链包含仅对最终输入进行操作的步骤,该 API 也会流式传输中间步骤的结果。
from langchain_core.output_parsers import (
JsonOutputParser,
)
# 一个操作最终输入而不是输入流的函数
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return ""
if "countries" not in inputs:
return ""
countries = inputs["countries"]
if not isinstance(countries, list):
return ""
country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names
chain = model | JsonOutputParser() | _extract_country_names
async for text in chain.astream(
'以 JSON 格式输出法国、西班牙和日本的国家及其人口的列表。使用一个带有“countries”外键的字典,其中包含一个国家列表。每个国家应该有“name”和“population”关键字。'
):
print(text, end="|", flush=True)
Generator Functions:生成器函数
使用可以操作输入流的生成器函数来修复流式处理
生成器函数(使用
yield
的函数)允许编写对能够操作输入流的代码
from langchain_core.output_parsers import JsonOutputParser
async def _extract_country_names_streaming(input_stream):
"""A function that operates on input streams."""
country_names_so_far = set()
async for input in input_stream:
if not isinstance(input, dict):
continue
if "countries" not in input:
continue
countries = input["countries"]
if not isinstance(countries, list):
continue
for country in countries:
name = country.get("name")
if not name:
continue
if name not in country_names_so_far:
yield name
country_names_so_far.add(name)
chain = model | JsonOutputParser() | _extract_country_names_streaming
async for text in chain.astream(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
print(text, end="|", flush=True)
因为上面的代码依赖于 JSON 自动完成,所以您可能会看到国家/地区的部分名称(例如,
Sp
和Spain
),这不是提取结果所想要的!我们关注的是流媒体概念,而不一定是链条的结果。
Non-streaming components
非流式组件
一些内置组件,如检索器,不提供任何 streaming
。如果我们尝试对它们进行stream
会发生什么?
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
vectorstore = FAISS.from_texts(
["harrison worked at kensho", "harrison likes spicy food"],
embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()
chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks
只有从该组件产生的最终结果被流式传输了。
并非所有组件都必须实现流式传输——在某些情况下,流式传输要么是不必要的、困难的,要么就是没有意义。
使用非流式组件构建的 LCEL 链在很多情况下仍然能够进行流式传输,部分输出的流式传输在链中最后一个非流式步骤之后开始。
retrieval_chain = (
{
"context": retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| model
| StrOutputParser()
)
for chunk in retrieval_chain.stream(
"Where did harrison work? " "Write 3 made up sentences about this place."
):
print(chunk, end="|", flush=True)
Based| on| the| given| context|,| the| only| information| provided| about| where| Harrison| worked| is| that| he| worked| at| Ken|sh|o|.| Since| there| are| no| other| details| provided| about| Ken|sh|o|,| I| do| not| have| enough| information| to| write| 3| additional| made| up| sentences| about| this| place|.| I| can| only| state| that| Harrison| worked| at| Ken|sh|o|.||
Using Stream Events
Event Streaming is a beta API.
为了让 astream_events
API 正常工作:
- 尽可能在代码中使用
async
(例如,异步工具等) - 如果定义自定义函数/可运行对象,请传播回调
- 每当使用非 LCEL 的可运行对象时,请确保在 LLM 上调用
.astream()
而不是.ainvoke
,以强制 LLM 流式传输令牌。
Event Reference
下表是参考表,显示了各种 Runnable 对象可能发出的一些事件。
当流正确实现时,直到输入流完全被消耗之后,才知道可运行的输入。这意味着
inputs
通常仅包含end
事件而不是start
事件。
事件 | 名称 | 块 | 输入 | 输出 |
on_chat_model_start | [model name] | {“messages”: [[SystemMessage, HumanMessage]]} | ||
on_chat_model_stream | [model name] | AIMessageChunk(content=“hello”) | ||
on_chat_model_end | [model name] | {“messages”: [[SystemMessage, HumanMessage]]} | {“generations”: […], “llm_output”: None, …} | |
on_llm_start | [model name] | {‘input’: ‘hello’} | ||
on_llm_stream | [model name] | ‘Hello’ | ||
on_llm_end | [model name] | ‘Hello human!’ |
ChatModel
首先查看聊天模型产生的事件。
events = []
async for event in model.astream_events("hello", version="v1"):
events.append(event)
由于这是一个测试版的API,使用
version=“v1”
参数将使我们能够最大程度地减少对代码的此类破坏性更改。
看一下一些开始事件和一些结束事件
events[:3]
[{'event': 'on_chat_model_start',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {},
'data': {'input': 'hello'}},
{'event': 'on_chat_model_stream',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'tags': [],
'metadata': {},
'name': 'ChatAnthropic',
'data': {'chunk': AIMessageChunk(content=' Hello')}},
{'event': 'on_chat_model_stream',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'tags': [],
'metadata': {},
'name': 'ChatAnthropic',
'data': {'chunk': AIMessageChunk(content='!')}}]
events[-2:]
[{'event': 'on_chat_model_stream',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'tags': [],
'metadata': {},
'name': 'ChatAnthropic',
'data': {'chunk': AIMessageChunk(content='')}},
{'event': 'on_chat_model_end',
'name': 'ChatAnthropic',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'tags': [],
'metadata': {},
'data': {'output': AIMessageChunk(content=' Hello!')}}]
Chain
解析流式JSON的标准链
chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
events = [
event
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
)
]
如果您检查前几个事件,您会发现有 3 个不同的开始事件,而不是2 个开始事件。
三个启动事件对应:
- 链(模型+解析器)
- 该模型
- 解析器
events[:3]
[{'event': 'on_chain_start',
'run_id': 'b1074bff-2a17-458b-9e7b-625211710df4',
'name': 'RunnableSequence',
'tags': [],
'metadata': {},
'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}},
{'event': 'on_chat_model_start',
'name': 'ChatAnthropic',
'run_id': '6072be59-1f43-4f1c-9470-3b92e8406a99',
'tags': ['seq:step:1'],
'metadata': {},
'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}}},
{'event': 'on_parser_start',
'name': 'JsonOutputParser',
'run_id': 'bf978194-0eda-4494-ad15-3a5bfe69cd59',
'tags': ['seq:step:2'],
'metadata': {},
'data': {}}]
使用此 API 从模型和解析器获取流事件的输出,忽略开始事件、结束事件和链中的事件。
num_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Chat model chunk: ' Here'
Chat model chunk: ' is'
Chat model chunk: ' the'
Chat model chunk: ' JSON'
Chat model chunk: ' with'
Chat model chunk: ' the'
Chat model chunk: ' requested'
Chat model chunk: ' countries'
Chat model chunk: ' and'
Chat model chunk: ' their'
Chat model chunk: ' populations'
Chat model chunk: ':'
Chat model chunk: '\n\n```'
Chat model chunk: 'json'
Parser chunk: {}
Chat model chunk: '\n{'
Chat model chunk: '\n '
Chat model chunk: ' "'
Chat model chunk: 'countries'
Chat model chunk: '":'
Parser chunk: {'countries': []}
Chat model chunk: ' ['
Chat model chunk: '\n '
Parser chunk: {'countries': [{}]}
Chat model chunk: ' {'
...
由于模型和解析器都支持流式传输,因此我们可以实时看到来自两个组件的流式传输事件
Filtering Events
过滤事件
由于此 API 产生如此多的事件,因此能够过滤事件非常有用。
可以按组件名称、组件标签或组件类型进行过滤。
-
By Name
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config( {"run_name": "my_parser"} ) max_events = 0 async for event in chain.astream_events( 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', version="v1", include_names=["my_parser"], # 只输出名为my_parser的事件 ): print(event) max_events += 1 if max_events > 10: # Truncate output print("...") break
-
By Type
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config( {"run_name": "my_parser"} ) max_events = 0 async for event in chain.astream_events( 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', version="v1", include_types=["chat_model"], # 只输出类型为chat_model的事件 ): print(event) max_events += 1 if max_events > 10: # Truncate output print("...") break
-
By Tags
标签由给定可运行对象的子组件继承。 如果您使用标签进行过滤,请确保这是您想要的。
chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]}) max_events = 0 async for event in chain.astream_events( 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', version="v1", include_tags=["my_chain"], # 只输出tags为my_chain的事件 ): print(event) max_events += 1 if max_events > 10: # Truncate output print("...") break
Non-streaming components
非流式组件
某些组件由于不对输入流进行操作而无法很好地进行流传输,虽然这些组件在使用 astream
时可能会中断最终输出的流式传输,但 astream_events
仍然会从支持流式传输的中间步骤中产生流式传输事件!
通过 astream_events 我们仍然可以看到来自模型和解析器的流输出
num_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Chat model chunk: ' Here'
Chat model chunk: ' is'
Chat model chunk: ' the'
Chat model chunk: ' JSON'
Chat model chunk: ' with'
Chat model chunk: ' the'
Chat model chunk: ' requested'
Chat model chunk: ' countries'
Chat model chunk: ' and'
Chat model chunk: ' their'
Chat model chunk: ' populations'
Chat model chunk: ':'
Chat model chunk: '\n\n```'
Chat model chunk: 'json'
Parser chunk: {}
Chat model chunk: '\n{'
Chat model chunk: '\n '
Chat model chunk: ' "'
Chat model chunk: 'countries'
Chat model chunk: '":'
Parser chunk: {'countries': []}
Chat model chunk: ' ['
Chat model chunk: '\n '
Parser chunk: {'countries': [{}]}
Chat model chunk: ' {'
Chat model chunk: '\n '
Chat model chunk: ' "'
...
Propagating Callbacks
传播回调
如果在工具中使用调用可运行对象,则需要将回调传播到可运行对象;否则,不会生成任何流事件。
标签:name,countries,chunk,LangChain,Chat,LCEL,model,events,表达式 From: https://blog.csdn.net/weixin_43787408/article/details/136857479当使用 RunnableLambdas 或 @chain 装饰器时,回调会在幕后自动传播。