https://www.alang.ai/langchain/101/lc05
一:LCEL入门
LangChain 的设计围绕着让 AI 应用开发者能够方便地将多个流程连缀成一个 AI 应用的业务逻辑,包括 Chain 与 Agent。每个流程都被封装成一个runnable
(langchain_core.runnables
),包括提示语模板、模型调用、输出解析器、工具调用等。
(一)调用链流程
以下面例子为例: 应用逻辑有三个流程:prompt
、chatmodel
、outputparser
。用|
来将它们连接成一个调用链。|
的工作逻辑类似于 Linux 里的管道操作符。前一流程的输出被作为下一流程的输入。然后,通过invoke调用chain得到结果
# 用 Pydantic 定义输出的 JSON 格式
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.output_parsers import JsonOutputParser
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
# Define your desired data structure.
class Joke(BaseModel):
setup: str = Field(description="question to set up a joke")
punchline: str = Field(description="answer to resolve the joke")
chatmodel = ChatOpenAI()
joke_query = "Tell me a joke."
# Set up a parser + inject instructions into the prompt template.
parser = JsonOutputParser(pydantic_object=Joke)
prompt_template = ChatPromptTemplate.from_messages(
[
("system", "Answer the user query.\n{format_instructions}"),
("user", "{query}")
])
chain = prompt_template | chatmodel | parser
chain.invoke(
{"query": joke_query,
"format_instructions": parser.get_format_instructions()
})
整体调用链流程图如下:
1.prompt_template
是一个提示语模板,它接受输入的参数,生成一个 ChatPromptValue
2.当 prompt_value
被作为 ChatModel 的输入时,它将被转换成一个 BaseMessage
。然后模型做出预测,在 LangChain 中,它返回的是 AIMessage
。
3.模型的输出被作为输出解析器的输入,我们这里使用的是StrOutputParser
,它将 AIMessage
解析为 string
(二)chain中各个流程被封装成runnable
1.prompt模板
class BasePromptTemplate(
RunnableSerializable[Dict, PromptValue], Generic[FormatOutputType], ABC
)
/ \
|
|
|
class BaseChatPromptTemplate(BasePromptTemplate, ABC)
/ \
|
|
|
class ChatPromptTemplate(BaseChatPromptTemplate)
2.模型调用
class BaseLanguageModel(
RunnableSerializable[LanguageModelInput, LanguageModelOutputVar], ABC
)
/ \
|
|
|
class BaseChatModel(BaseLanguageModel[BaseMessage], ABC)
/ \
|
|
|
class BaseChatOpenAI(BaseChatModel)
/ \
|
|
|
class ChatOpenAI(BaseChatOpenAI)
3.输出解析器
class BaseOutputParser(
BaseLLMOutputParser, RunnableSerializable[LanguageModelOutput, T]
)
/ \
|
|
|
class BaseTransformOutputParser(BaseOutputParser[T])
/ \
|
|
|
class BaseCumulativeTransformOutputParser(BaseTransformOutputParser[T])
/ \
|
|
|
class JsonOutputParser(BaseCumulativeTransformOutputParser[Any])
其中RunnableSerializable是继承自Runnable:
class RunnableSerializable(Serializable, Runnable[Input, Output]):
所有的runnable
类都可以用管道操作符连接起来!
(三)runnable
原理简析:无非就是重写了“|”方法
在init方法中将传递的列表划分为first、last和middle;在调用steps时,全部返回
重写“|”方法,将所有的流程放入steps中,执行invoke时,循环调用对应流程的invoke,将当前流程的输出作为下一个流程的输入
二:Runnable入门
从前面简析中,我们可以了解Runnable的大概流程。这里以__or__里面的coerce_to_runnable方法入手,看看它做了什么来引入runnable里面的其他部分:1.RunnableLike是一个Union集合,可以是里面的所有类型:
Runnable、Callable(传参是[input]列表,输出是Output)、Mapping(Key是字符串,Val是any);其中Input、Output是任意类型2.在coerce_to_runnable中判断了参数类型
- 是Runnable直接返回;
- 如果是异步生成器函数(定义为async def,函数中包含yield)调用RunnableGenerator;
- 如果是其他的可回调函数,调用RunnableLambda将之转换为Runnable;
- 如果是字典类型会调用RunnableParallel,并行运行字典里面的映射(执行value),返回输出的Runnable;注意:虽然RunnableLike表示map传递的value可以是any,但是在RunnableParallel里面还是限制了的,所以我们如果传递字典作为Runnable,那么value需要符合是Runnable、Callable、Map之一才行
(一)LCEL主要组成单元
1.RunnableSequence的概念
其实我们前面看重写_or_方法里面就是调用的RunnableSequence,顺序执行系列流程,前一个的输出作为下一个的输入。使用“|”运算符或通过构造将可运行项列表传递给RunnableSequence2.RunnableParallel的使用
RunnableParallel并行执行系列流程,为每个流程提供相同的输入。可以通过构造或者使用字典的方式来进行实例化。RunnableParallel原语本质上是一个dict,value是runnables类型的(或可以转换为runnables,如函数-->RunnableLambda)。它并行运行所有的value,每个value都使用RunnableParallel的整体输入(前一个流程的输出)进行调用。最后的返回值是一个dict,每个value的结果都在其相应的key下。from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
def mul_three(x: int) -> int:
return x * 3
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
runnable_3 = RunnableLambda(mul_three)
sequence = runnable_1 | { # this dict is coerced to a RunnableParallel
"mul_two": runnable_2,
"mul_three": runnable_3,
}
# Or equivalently:
# sequence = runnable_1 | RunnableParallel(
# {"mul_two": runnable_2, "mul_three": runnable_3}
# )
# Also equivalently:
# sequence = runnable_1 | RunnableParallel(
# mul_two=runnable_2,
# mul_three=runnable_3,
# )
sequence.invoke(1)
(二)其他Runnable子类补充
1.RunnableLambda的使用
对比RunnableGenerator来看,RunnableLambda不适合支持流式的处理,下面使用一个将模型输入大小写翻转的例子:from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import AIMessage
def parse(ai_message: AIMessage) -> str:
"""Parse the AI message."""
return ai_message.content.swapcase()
chatmodel = ChatOpenAI()
chain = chatmodel | parse
res = chain.invoke("hello")
print(res)
我们既可以通过前面案例的RunnableLambda进行构造,也可以直接传递func来进行实例化
2.RunnableGenerator的使用(更适合流式的处理)
from typing import Iterable
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableGenerator
from langchain_core.messages import AIMessageChunk
def streaming_parse(chunks: Iterable[AIMessageChunk]) -> Iterable[str]:
for chunk in chunks:
yield chunk.content.swapcase()
streaming_parse = RunnableGenerator(streaming_parse)
chatmodel = ChatOpenAI()
chain = chatmodel | streaming_parse
for chunk in chain.stream("tell me about yourself in one sentence"):
print(chunk, end="|", flush=True)
接收Chat model的输出AIMessageChunk
迭代器,遍历对每次Chat model的流式返回进行处理,即streaming_parse
方法是对每一块chunk进行处理,前面的RunnableLambda的parse
方法是对LLM返回的完整数据进行处理。
3.RunnablePassthrough的使用
RunnablePassthrough通常和RunnableParallel一起使用(作为dict的value);RunnablePassthrough通常用于将输入数据不变的传递:runnable = {"pass":RunnablePassthrough(),"modify":lambda x:x["num"]+1} | llm | outputParser
runnable.invoke({"num":1})
第一步就是RunnableParallel调用并行执行,对于"pass"其value就是输入,"modify"其value是修改后的,第一步结果是
{'passed': {'num': 1}, 'modified': 2}
补充1:RunnablePassthrough是传递了所有的invoke参数到RunnableParallel的每一个value中,通过python的itemgetter方法,单独获取某一个值即可实现传递单独参数到RunnableParallel的对应value中去
vectorstore = FAISS.from_texts(
["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
template = """Answer the question based only on the following context:
{context}
Question: {question}
Answer in the following language: {language}
"""
prompt = ChatPromptTemplate.from_template(template)
chain = (
{
"context": itemgetter("question") | retriever, #通过question去检索最符合的片段
"question": itemgetter("question"),
"language": itemgetter("language"),
}
| prompt
| model
| StrOutputParser()
)
chain.invoke({"question": "where did harrison work", "language": "italian"})
补充2:RunnablePassthrough可以通过assign静态方法对输入的数据进行修改
runnable = RunnableParallel(
extra=RunnablePassthrough.assign(mult=lambda x: x["num"] * 3),
modified=lambda x: x["num"] + 1,
)
runnable.invoke({"num": 1})
相当于对extra的value-->{"num": 1},新增了一个key:mult,extra的value变为{"num": 1,"mult":3}
(三)用 Graph 形式查看 Chain
我们可以将 Chain 的调用过程打印出来查看。我们使用grandalf
库完成这一任务,需先用如下命令安装 pip install grandalf
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
vectorstore = FAISS.from_texts(
["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
model = ChatOpenAI()
retrieval_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| model
| StrOutputParser()
)
retrieval_chain.invoke("where did harrison work?")
retrieval_chain.get_graph().print_ascii()
三:Runnable扩展
除了上面的子类之外,作为LCEL最重要的部分,还包括很多其他东西,陆续补充在这里(一)RunnableWithMessageHistory的使用
RunnableWithMessageHistory包装另一个Runnable并为其管理聊天消息历史;它负责读取和更新聊天消息历史记录。
class Runnable(Generic[Input, Output], ABC)
/ \
|
|
|
class RunnableSerializable(Serializable, Runnable[Input, Output])
/ \
|
|
|
class RunnableBindingBase(RunnableSerializable[Input, Output])
/ \
|
|
|
class RunnableWithMessageHistory(RunnableBindingBase)
具体来说,RunnableWithMessageHistory在将消息传递给Runnable之前加载会话中以前的消息,并在调用Runnable之后将生成的响应保存为消息。RunnableWithMessageHistory还通过用session_id保存每个会话来实现启用多个会话,因此它希望在调用Runnable时在配置中传递session_id,并使用它来查找相关的会话历史记录。
在第五期 LangChain学习中有提及其使用方法,实践中主要如下:
from langchain_core.runnables.history import RunnableWithMessageHistory
with_message_history = RunnableWithMessageHistory(
# 要管理上下文的runnable(chain/agent)
runnable,
# 回调函数,传入session id,返回上文
get_session_history,
# 其他的参数,包括I/O参数,历史上下文参数
...
)
with_message_history.invoke(
# I/O参数
{"ability": "math", "input": "What does cosine mean?"},
# 指定“session_id”的配置,它控制要加载的会话
config={"configurable": {"session_id": "abc123"}},
)
因此我们只需要考虑两个方面:1.如何去加载、存储历史消息?2.被管理的Runnable是什么,输入/输出有限制吗?
-
如何去加载、存储历史消息?
class SQLChatMessageHistory(BaseChatMessageHistory)
class InMemoryChatMessageHistory(BaseChatMessageHistory, BaseModel)
要实现我们自定义的BaseChatMessageHistory,前文也有提及:
class InMemoryHistory(BaseChatMessageHistory, BaseModel):
messages: List[BaseMessage] = Field(default_factory=list)
def add_messages(self, messages: List[BaseMessage]) -> None:
self.messages.extend(messages)
print("-------------------")
print(self.messages)
print("-------------------")
def clear(self) -> None:
self.messages = []
#这里全局变量来存储聊天消息历史记录
store = {}
-
被管理的Runnable是什么,Runnable的输入/输出有限制吗(不要太在意)
- 一系列的BaseMessages
- 一个字典,其中一个key对应的value是一系列的BaseMessages
- 一个字典,其中一个key对应的value是最新字符串消息/一系列的BaseMessages,另一个单独的key携带历史消息
就是有输入就行呗
标签:value,Runnable,langchain,runnable,LangChain,LCEL,import,class
From: https://www.cnblogs.com/ssyfj/p/18308244