首页 > 其他分享 >LangGraph进阶:条件边与工具调用Agent实现

LangGraph进阶:条件边与工具调用Agent实现

时间:2024-11-11 16:20:16浏览次数:1  
标签:status return 进阶 LangGraph tool AgentState Agent state str

在前两篇文章中,我们讨论了LCEL和AgentExecutor的局限性,以及LangGraph的基础概念。今天,我们将深入探讨LangGraph的高级特性,重点关注条件边的使用和如何实现一个完整的工具调用Agent。

条件边的高级用法

条件边是LangGraph中最强大的特性之一,它允许我们基于状态动态决定执行流程。让我们深入了解一些高级用法。

1. 多条件路由

from typing import List, Dict, Literal
from pydantic import BaseModel
from langgraph.graph import StateGraph, END

class AgentState(BaseModel):
    messages: List[Dict[str, str]] = []
    current_input: str = ""
    tools_output: Dict[str, str] = {}
    status: str = "RUNNING"
    error_count: int = 0

def route_by_status(state: AgentState) -> Literal["process", "retry", "error", "end"]:
    """复杂的路由逻辑"""
    if state.status == "SUCCESS":
        return "end"
    elif state.status == "ERROR":
        if state.error_count >= 3:
            return "error"
        return "retry"
    elif state.status == "NEED_TOOL":
        return "process"
    return "process"

# 构建图结构
workflow = StateGraph(AgentState)

# 添加条件边
workflow.add_conditional_edges(
    "check_status",
    route_by_status,
    {
        "process": "execute_tool",
        "retry": "retry_handler",
        "error": "error_handler",
        "end": END
    }
)

2. 并行执行

LangGraph支持并行执行多个节点,这在处理复杂任务时特别有用:

async def parallel_tools_execution(state: AgentState) -> AgentState:
    """并行执行多个工具"""
    tools = identify_required_tools(state.current_input)
    
    async def execute_tool(tool):
        result = await tool.ainvoke(state.current_input)
        return {tool.name: result}
    
    # 并行执行所有工具
    results = await asyncio.gather(*[execute_tool(tool) for tool in tools])
    
    # 合并结果
    tools_output = {}
    for result in results:
        tools_output.update(result)
    
    return AgentState(
        messages=state.messages,
        current_input=state.current_input,
        tools_output=tools_output,
        status="SUCCESS"
    )

实现完整的工具调用Agent

让我们通过实现一个完整的工具调用Agent来展示LangGraph的强大功能。这个Agent能够:

  1. 理解用户输入
  2. 选择合适的工具
  3. 执行工具调用
  4. 生成最终响应

1. 定义状态和工具

from typing import List, Dict, Optional
from pydantic import BaseModel
from langchain.tools import BaseTool
from langchain.tools.calculator import CalculatorTool
from langchain.tools.wikipedia import WikipediaQueryRun
from langchain_core.language_models import ChatOpenAI

class Tool(BaseModel):
    name: str
    description: str
    func: callable

class AgentState(BaseModel):
    messages: List[Dict[str, str]] = []
    current_input: str = ""
    thought: str = ""
    selected_tool: Optional[str] = None
    tool_input: str = ""
    tool_output: str = ""
    final_answer: str = ""
    status: str = "STARTING"

# 定义可用工具
tools = [
    Tool(
        name="calculator",
        description="用于执行数学计算",
        func=CalculatorTool()
    ),
    Tool(
        name="wikipedia",
        description="用于查询维基百科信息",
        func=WikipediaQueryRun()
    )
]

2. 实现核心节点

async def think(state: AgentState) -> AgentState:
    """思考下一步行动"""
    prompt = f"""
    基于用户输入和当前对话历史,思考下一步行动。
    用户输入: {state.current_input}
    可用工具: {[t.name + ': ' + t.description for t in tools]}
    
    请决定:
    1. 是否需要使用工具
    2. 如果需要,选择哪个工具
    3. 使用什么参数调用工具
    
    以JSON格式返回: {{"thought": "思考过程", "need_tool": true/false, "tool": "工具名", "tool_input": "参数"}}
    """
    
    llm = ChatOpenAI(temperature=0)
    response = await llm.ainvoke(prompt)
    result = json.loads(response)
    
    return AgentState(
        **state.dict(),
        thought=result["thought"],
        selected_tool=result.get("tool"),
        tool_input=result.get("tool_input"),
        status="NEED_TOOL" if result["need_tool"] else "GENERATE_RESPONSE"
    )

async def execute_tool(state: AgentState) -> AgentState:
    """执行工具调用"""
    tool = next((t for t in tools if t.name == state.selected_tool), None)
    if not tool:
        return AgentState(
            **state.dict(),
            status="ERROR",
            thought="Selected tool not found"
        )
    
    try:
        result = await tool.func.ainvoke(state.tool_input)
        return AgentState(
            **state.dict(),
            tool_output=str(result),
            status="GENERATE_RESPONSE"
        )
    except Exception as e:
        return AgentState(
            **state.dict(),
            status="ERROR",
            thought=f"Tool execution failed: {str(e)}"
        )

async def generate_response(state: AgentState) -> AgentState:
    """生成最终响应"""
    prompt = f"""
    基于以下信息生成对用户的回复:
    用户输入: {state.current_input}
    思考过程: {state.thought}
    工具输出: {state.tool_output}
    
    请生成一个清晰、有帮助的回复。
    """
    
    llm = ChatOpenAI(temperature=0.7)
    response = await llm.ainvoke(prompt)
    
    return AgentState(
        **state.dict(),
        final_answer=response,
        status="SUCCESS"
    )

3. 构建完整的工作流

# 创建图结构
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("think", think)
workflow.add_node("execute_tool", execute_tool)
workflow.add_node("generate_response", generate_response)

# 定义路由函数
def route_next_step(state: AgentState) -> str:
    if state.status == "ERROR":
        return "error"
    elif state.status == "NEED_TOOL":
        return "execute_tool"
    elif state.status == "GENERATE_RESPONSE":
        return "generate_response"
    elif state.status == "SUCCESS":
        return "end"
    return "think"

# 添加条件边
workflow.add_conditional_edges(
    "think",
    route_next_step,
    {
        "execute_tool": "execute_tool",
        "generate_response": "generate_response",
        "error": "error_handler",
        "end": END
    }
)

workflow.add_conditional_edges(
    "execute_tool",
    route_next_step,
    {
        "generate_response": "generate_response",
        "error": "error_handler",
        "end": END
    }
)

workflow.add_edge("generate_response", END)

# 编译图
app = workflow.compile()

4. 使用示例

async def run_agent(user_input: str):
    state = AgentState(current_input=user_input)
    final_state = await app.ainvoke(state)
    return final_state.final_answer

# 使用示例
async def main():
    questions = [
        "计算23乘以45等于多少?",
        "谁发明了相对论?",
        "计算圆周率乘以10等于多少?"
    ]
    
    for question in questions:
        print(f"\n问题: {question}")
        answer = await run_agent(question)
        print(f"回答: {answer}")

# 运行示例
import asyncio
asyncio.run(main())

高级特性和技巧

1. 状态持久化

LangGraph支持将状态持久化到数据库中:

from langgraph.persist import GraphStatePersist

class DBStatePersist(GraphStatePersist):
    async def persist(self, state: AgentState):
        # 实现状态持久化逻辑
        pass
    
    async def load(self) -> AgentState:
        # 实现状态加载逻辑
        pass

# 使用持久化
workflow = StateGraph(AgentState, persist_handler=DBStatePersist())

2. 流式输出

支持实时显示执行过程:

async def stream_handler(state: AgentState):
    """处理流式输出"""
    yield f"思考中: {state.thought}\n"
    if state.selected_tool:
        yield f"使用工具: {state.selected_tool}\n"
    if state.tool_output:
        yield f"工具输出: {state.tool_output}\n"
    yield f"最终答案: {state.final_answer}\n"

# 在图中启用流式输出
workflow.set_stream_handler(stream_handler)

3. 错误处理和重试机制

async def error_handler(state: AgentState) -> AgentState:
    """处理错误情况"""
    if state.error_count < 3:
        return AgentState(
            **state.dict(),
            error_count=state.error_count + 1,
            status="RETRY"
        )
    return AgentState(
        **state.dict(),
        final_answer="抱歉,我无法完成这个任务。",
        status="ERROR"
    )

# 添加错误处理节点
workflow.add_node("error_handler", error_handler)

结语

通过这个完整的工具调用Agent实现,我们可以看到LangGraph在处理复杂LLM应用时的强大能力:

  1. 灵活的流程控制: 通过条件边实现复杂的决策逻辑
  2. 状态管理: 清晰的状态定义和转换
  3. 错误处理: 完善的错误处理和重试机制
  4. 可扩展性: 易于添加新的工具和功能

LangGraph不仅解决了传统方法的局限性,还提供了一个更加强大和灵活的框架来构建复杂的LLM应用。通过合理使用这些高级特性,我们可以构建出更加智能和可靠的AI应用。

标签:status,return,进阶,LangGraph,tool,AgentState,Agent,state,str
From: https://www.cnblogs.com/muzinan110/p/18540008

相关文章

  • Intro to LLM Agents with Langchain: When RAG is Not Enough
    https://towardsdatascience.com/intro-to-llm-agents-with-langchain-when-rag-is-not-enough-7d8c08145834Asalways,youcanfindthecodeonGitHub,andhereareseparateColabNotebooks:PlanningandreasoningDifferenttypesofmemoriesVarioustypesof......
  • JavaScript语法进阶:事件监听与处理
    一、概述事件定义了用户与网页交互时产生的各种操作。例如,单击按钮时,就产生一个事件,告诉浏览器发生了需要进行处理的单击操作。为了使对象能够对某一事件做出响应,就必须编写事件处理函数。事件处理函数是一段独立的程序代码,它在对象检测到某个特定事件时执行(响应该事件)。一个......
  • 如果你搞不懂排序,看这篇文章就对了,初学者也看得懂,其三:进阶插入排序——希尔排序
    目录一.希尔排序1.1希尔排序的原理1.2希尔排序的代码思路1.3希尔排序的代码实现1.1希尔排序的原理希尔排序法又称缩小增量法。希尔排序法的基本思想是:先选定一个整数,把待排序文件中所有记录分成个组,所有距离为的记录分在同一组内,并对每一组内的记录进行排序。然后,取,重......
  • 【大模型应用开发 动手做AI Agent】Agent的感知力:语言交互能力和多模态能力
    AIAgent,语言交互,多模态感知,大模型应用,自然语言处理,计算机视觉1.背景介绍在人工智能领域,AIAgent(智能代理)作为一种能够感知环境、做出决策并与环境交互的智能体,扮演着越来越重要的角色。一个强大的AIAgent需要具备敏锐的感知能力,才能有效地理解和响应周围世......
  • 【大模型应用开发 动手做AI Agent】Agent带来新的商业模式和变革
    大模型、AIAgent、应用开发、商业模式、变革、智能化、自动化1.背景介绍近年来,人工智能(AI)技术取得了飞速发展,特别是大模型的涌现,为AI应用带来了前所未有的机遇。大模型,是指参数规模庞大、训练数据海量的人工智能模型,具备强大的泛化能力和学习能力,能够在自然语言处理、......
  • ArkTS的进阶语法-1(泛型,工具类型,空安全)
     目录ArkTS的进阶语法1.泛型......
  • JVM 进阶:深入理解与高级调优
    在学习了JVM的基础知识后,接下来我们将深入了解JVM的内部工作原理、高级优化方法和性能调优技巧,这些内容将帮助你更好地管理Java应用的性能,尤其是在面对大规模应用和高并发场景时。一、深入了解JVM内存结构JVM内存结构的划分和管理直接关系到Java程序的运行效率,深......
  • 【模块一】kubernetes容器编排进阶实战之kubeadm部署kubernetes
    kubeadm部署kubernetes准备环境主机名IP地址k8s-master1        10.0.0.121k8s-node110.0.0.101k8s-node210.0.0.102k8s-node310.0.0.103注:提前安装好docker或者containerd环境安装kubeadm、kubectl、kubelet#分别在所有主机依次执行一下命令apt-getupdate&&......
  • 给Agent加上数据库功能会怎样?
    基于LLM大语言人工智能模型在自然语义理解、代码编写、结构化与非结构化数据处理、对话式人机交互等方面的优势,可以看到这一技术在传统数据分析(BI)场景的巨大应用潜力。AI+数据库,无疑是让Agent的能力更上一层楼,无论你是需要实时掌握业务进展数据的管理者;还是需要处理繁琐的数......
  • WPF 集合操作进阶:提取特定字段、转换 ObservableCollection 和过滤数据
    文章目录1.引言2.从List<T>提取特定字段3.将List<T>转换为observableCollection<T>4.过滤List<T>集合5.总结6.完整示例代码1.引言在C#开发中,集合操作是非常常见的任务,特别是在数据处理和用户界面设计中。本文将介绍如何从List<T>中提取......