LangGraph 工作流编排:从 DAG 到循环 Agent
AI 导读
LangGraph 工作流编排:从 DAG 到循环 Agent 概述 LangGraph 是 LangChain 团队推出的 Agent 工作流编排框架。与传统的 DAG(有向无环图)工作流引擎不同,LangGraph 原生支持循环(Cycles),这使得它能够表达"Agent 反复推理直到满意"的模式。 核心设计理念:将 Agent 的行为建模为一个状态机(State...
LangGraph 工作流编排:从 DAG 到循环 Agent
概述
LangGraph 是 LangChain 团队推出的 Agent 工作流编排框架。与传统的 DAG(有向无环图)工作流引擎不同,LangGraph 原生支持循环(Cycles),这使得它能够表达"Agent 反复推理直到满意"的模式。
核心设计理念:将 Agent 的行为建模为一个状态机(State Machine),每个节点是一个计算步骤,边是条件转移,状态在节点之间流转。
核心概念
LangGraph 核心概念
|
├── State(状态)
│ └── 在节点之间传递的数据容器
|
├── Node(节点)
│ └── 执行一个计算步骤(LLM 调用/工具执行/数据处理)
|
├── Edge(边)
│ ├── Normal Edge - 无条件转移
│ └── Conditional Edge - 基于状态的条件路由
|
├── Graph(图)
│ └── 由节点和边组成的工作流
|
└── Checkpointer(检查点)
└── 持久化状态,支持断点恢复和 Human-in-the-Loop
安装
pip install langgraph langchain-openai langchain-core
基础:构建第一个 Graph
定义状态
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
# 状态定义:使用 TypedDict
class AgentState(TypedDict):
# add_messages 是一个 reducer:新消息追加到列表,而非替换
messages: Annotated[list, add_messages]
# 普通字段:直接覆盖
current_step: str
iteration_count: int
定义节点
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def chatbot_node(state: AgentState) -> dict:
"""聊天节点:调用 LLM 生成回复"""
messages = state["messages"]
response = llm.invoke(messages)
return {
"messages": [response],
"current_step": "chatbot",
}
def tool_executor_node(state: AgentState) -> dict:
"""工具执行节点:执行 LLM 请求的工具调用"""
last_message = state["messages"][-1]
results = []
for tool_call in last_message.tool_calls:
result = execute_tool(tool_call["name"], tool_call["args"])
results.append(ToolMessage(
content=str(result),
tool_call_id=tool_call["id"],
))
return {"messages": results}
构建 Graph
from langgraph.graph import StateGraph, START, END
# 创建 graph builder
builder = StateGraph(AgentState)
# 添加节点
builder.add_node("chatbot", chatbot_node)
builder.add_node("tools", tool_executor_node)
# 添加边
builder.add_edge(START, "chatbot") # 入口 -> 聊天
# 条件边:根据 LLM 输出决定下一步
def should_use_tools(state: AgentState) -> str:
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools" # 有工具调用 -> 执行工具
return END # 无工具调用 -> 结束
builder.add_conditional_edges("chatbot", should_use_tools)
builder.add_edge("tools", "chatbot") # 工具执行后 -> 回到聊天(循环)
# 编译 graph
graph = builder.compile()
# 可视化
print(graph.get_graph().draw_mermaid())
运行 Graph
# 同步执行
result = graph.invoke({
"messages": [HumanMessage(content="北京今天天气怎么样?")],
"current_step": "",
"iteration_count": 0,
})
print(result["messages"][-1].content)
# 流式执行
for event in graph.stream({
"messages": [HumanMessage(content="分析一下特斯拉的股价趋势")],
}):
for node_name, output in event.items():
print(f"[{node_name}]", output)
# 异步执行
import asyncio
async def run():
result = await graph.ainvoke({
"messages": [HumanMessage(content="Hello")],
})
return result
asyncio.run(run())
进阶:带工具的 ReAct Agent
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# 定义工具
@tool
def search(query: str) -> str:
"""搜索网络获取最新信息"""
return f"搜索结果:关于 '{query}' 的最新信息..."
@tool
def calculator(expression: str) -> float:
"""计算数学表达式"""
return eval(expression)
@tool
def get_weather(city: str) -> str:
"""获取城市天气"""
return f"{city}:晴,25度,东南风3级"
# 创建 ReAct Agent(LangGraph 内置)
model = ChatOpenAI(model="gpt-4o")
agent = create_react_agent(
model=model,
tools=[search, calculator, get_weather],
state_modifier="你是一个全能助手,善于使用工具来回答问题。",
)
# 运行
result = agent.invoke({
"messages": [HumanMessage(content="北京今天天气如何?如果温度超过30度,计算一下需要开空调几小时(假设每小时2度电)才能把室温降到25度")]
})
Human-in-the-Loop(人工介入)
中断节点
from langgraph.checkpoint.memory import MemorySaver
# 创建检查点存储
memory = MemorySaver()
# 编译时指定中断点
graph = builder.compile(
checkpointer=memory,
interrupt_before=["tools"], # 在执行工具前中断,等待人工确认
)
# 第一次运行:会在 tools 节点前停下
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke(
{"messages": [HumanMessage(content="帮我发送一封邮件给张三")]},
config=config,
)
# 查看 Agent 准备执行的工具调用
print("Agent wants to call:", result["messages"][-1].tool_calls)
# [{"name": "send_email", "args": {"to": "zhangsan@...", "content": "..."}}]
# 人工审核后继续执行
# 方式一:直接继续
graph.invoke(None, config=config)
# 方式二:修改后继续
graph.update_state(
config,
{"messages": [ToolMessage(content="已确认发送", tool_call_id="xxx")]},
as_node="tools",
)
graph.invoke(None, config=config)
时间旅行(回溯)
# 查看所有检查点
checkpoints = list(memory.list(config))
for cp in checkpoints:
print(f"Step: {cp.metadata.get('step')}, "
f"Node: {cp.metadata.get('source')}")
# 回溯到特定检查点
old_config = {
"configurable": {
"thread_id": "user-123",
"checkpoint_id": checkpoints[2].config["configurable"]["checkpoint_id"],
}
}
# 从该检查点重新执行
result = graph.invoke(
{"messages": [HumanMessage(content="换一个方案")]},
config=old_config,
)
多 Agent 工作流
监督者模式(Supervisor)
from typing import Literal
class SupervisorState(TypedDict):
messages: Annotated[list, add_messages]
next_agent: str
def supervisor_node(state: SupervisorState) -> dict:
"""监督者:决定下一个执行的 Agent"""
system_prompt = """你是一个任务调度器。根据当前对话状态,
选择下一个应该执行的 Agent:
- researcher: 需要搜索或调研信息时
- coder: 需要编写或修改代码时
- reviewer: 需要审查代码或方案时
- FINISH: 任务已完成
仅输出 Agent 名称。"""
messages = [
SystemMessage(content=system_prompt),
*state["messages"],
]
response = llm.invoke(messages)
next_agent = response.content.strip()
return {"next_agent": next_agent}
def researcher_node(state: SupervisorState) -> dict:
researcher_llm = llm.bind(system_message="你是研究员...")
response = researcher_llm.invoke(state["messages"])
return {"messages": [AIMessage(content=response.content, name="researcher")]}
def coder_node(state: SupervisorState) -> dict:
coder_llm = llm.bind(system_message="你是开发者...")
response = coder_llm.invoke(state["messages"])
return {"messages": [AIMessage(content=response.content, name="coder")]}
def reviewer_node(state: SupervisorState) -> dict:
reviewer_llm = llm.bind(system_message="你是审查员...")
response = reviewer_llm.invoke(state["messages"])
return {"messages": [AIMessage(content=response.content, name="reviewer")]}
# 构建图
builder = StateGraph(SupervisorState)
builder.add_node("supervisor", supervisor_node)
builder.add_node("researcher", researcher_node)
builder.add_node("coder", coder_node)
builder.add_node("reviewer", reviewer_node)
builder.add_edge(START, "supervisor")
# 监督者条件路由
def route_supervisor(state: SupervisorState) -> str:
return state["next_agent"]
builder.add_conditional_edges(
"supervisor",
route_supervisor,
{
"researcher": "researcher",
"coder": "coder",
"reviewer": "reviewer",
"FINISH": END,
},
)
# 所有 Agent 执行后回到监督者
builder.add_edge("researcher", "supervisor")
builder.add_edge("coder", "supervisor")
builder.add_edge("reviewer", "supervisor")
graph = builder.compile()
层级 Agent(Sub-Graph)
# 子图:研究团队内部的工作流
def build_research_subgraph():
class ResearchState(TypedDict):
messages: Annotated[list, add_messages]
research_notes: list[str]
builder = StateGraph(ResearchState)
builder.add_node("search", search_node)
builder.add_node("analyze", analyze_node)
builder.add_node("summarize", summarize_node)
builder.add_edge(START, "search")
builder.add_edge("search", "analyze")
def needs_more_search(state):
if len(state["research_notes"]) < 3:
return "search" # 信息不足,继续搜索
return "summarize" # 信息充足,生成摘要
builder.add_conditional_edges("analyze", needs_more_search)
builder.add_edge("summarize", END)
return builder.compile()
# 在主图中使用子图
research_subgraph = build_research_subgraph()
main_builder = StateGraph(MainState)
main_builder.add_node("research", research_subgraph) # 子图作为节点
main_builder.add_node("write", write_node)
main_builder.add_edge(START, "research")
main_builder.add_edge("research", "write")
main_builder.add_edge("write", END)
持久化与部署
数据库持久化
# SQLite 持久化
from langgraph.checkpoint.sqlite import SqliteSaver
with SqliteSaver.from_conn_string("checkpoints.db") as memory:
graph = builder.compile(checkpointer=memory)
result = graph.invoke(
{"messages": [HumanMessage(content="Hello")]},
config={"configurable": {"thread_id": "user-456"}},
)
# PostgreSQL 持久化(生产推荐)
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost:5432/langgraph"
) as memory:
graph = builder.compile(checkpointer=memory)
LangGraph Platform 部署
# langgraph.json - 部署配置
"""
{
"dependencies": ["langchain-openai", "langchain-community"],
"graphs": {
"agent": "./agent.py:graph"
},
"env": ".env"
}
"""
# 部署到 LangGraph Cloud
# langgraph up # 本地开发服务器
# langgraph deploy # 部署到云端
# 本地开发
pip install langgraph-cli
langgraph dev --config langgraph.json
# API 调用
curl -X POST http://localhost:8123/runs/stream \
-H "Content-Type: application/json" \
-d '{
"assistant_id": "agent",
"input": {"messages": [{"role": "user", "content": "Hello"}]},
"config": {"configurable": {"thread_id": "abc123"}}
}'
与其他框架对比
| 特性 | LangGraph | LangChain LCEL | AutoGen | CrewAI |
|---|---|---|---|---|
| 循环支持 | 原生 | 不支持 | 对话循环 | 不支持 |
| 状态管理 | 显式 TypedDict | 隐式 | 消息历史 | 隐式 |
| 持久化 | Checkpointer | 无 | 无 | 无 |
| Human-in-Loop | 原生支持 | 无 | 有 | 无 |
| 可视化 | Mermaid 图 | 无 | 无 | 无 |
| 部署 | LangGraph Platform | LangServe | 无 | 无 |
| 学习曲线 | 中等 | 低 | 低 | 低 |
最佳实践
- 状态设计:使用 TypedDict 明确定义状态字段,用 Annotated + reducer 处理列表追加
- 节点粒度:每个节点做一件事,保持可测试和可替换
- 条件边路由:基于状态字段路由,避免复杂的嵌套逻辑
- 检查点:生产环境必须配置持久化 checkpointer,否则重启丢失所有状态
- 错误处理:在节点内部处理异常,通过状态字段传递错误信息
- 迭代上限:所有循环必须设置
recursion_limit,防止无限循环
Maurice | [email protected]
深度加工(NotebookLM 生成)
基于本文内容生成的 PPT 大纲、博客摘要、短视频脚本与 Deep Dive 播客,用于多场景复用
PPT 大纲(5-8 张幻灯片) 点击展开
LangGraph 工作流编排:从 DAG 到循环 Agent — ppt
幻灯片 1:LangGraph 概述与核心理念
- LangGraph 是由 LangChain 团队推出的 Agent 工作流编排框架 [1]。
- 打破传统限制:与传统的 DAG(有向无环图)工作流引擎不同,LangGraph 原生支持循环(Cycles),允许 Agent 反复推理直到获得满意的结果 [1]。
- 核心设计理念:将 Agent 的行为抽象并建模为一个状态机(State Machine) [1]。
- 运行机制:在工作流中,每个节点代表一个计算步骤,边负责条件转移,而状态数据则在节点之间不断流转 [1]。
幻灯片 2:LangGraph 的核心概念
- State(状态):在节点之间传递的数据容器,通常使用
TypedDict进行定义和类型管理 [1]。 - Node(节点):执行具体的计算步骤,例如调用大语言模型(LLM)、执行外部工具或处理数据 [1]。
- Edge(边):定义图的工作流向,包含无条件转移的普通边(Normal Edge)以及基于状态路由的条件边(Conditional Edge) [1]。
- Checkpointer(检查点):用于持久化保存执行状态,支持断点恢复机制和人工介入(Human-in-the-Loop) [1]。
幻灯片 3:构建基础 Graph 工作流
- 定义图与节点:通过
StateGraph构建图,并添加聊天节点和工具执行节点等处理逻辑 [1, 2]。 - 配置条件路由:利用条件边(Conditional Edges),可以根据 LLM 的输出(例如是否需要调用工具)自动决定下一步的流向 [2, 3]。
- 编译与可视化:调用
compile()方法编译图后,支持通过draw_mermaid()将工作流导出为可视化的图表 [3]。 - 灵活的执行方式:编译后的图支持同步执行、异步执行(
ainvoke)以及流式输出(stream),适应不同场景的需求 [3]。
幻灯片 4:人工介入与时间旅行(回溯)
- 设置中断点:通过结合内存存储(
MemorySaver)并在编译时指定interrupt_before参数,可以在关键节点(如执行工具前)暂停工作流 [4]。 - 人工审核与继续:系统暂停后,人工可以查看 Agent 准备执行的动作,确认或修改状态后让工作流继续执行 [4, 5]。
- 获取状态快照:系统会为工作流的每一个步骤保存检查点(Checkpoints),可以通过配置文件查看所有历史节点 [5]。
- 时间旅行(回溯):允许提取特定的历史检查点配置,并从该检查点状态重新启动执行,方便切换方案或纠正错误 [5]。
幻灯片 5:多 Agent 高级工作流模式
- 监督者模式(Supervisor):通过引入一个“调度器”节点,根据当前对话状态动态选择并路由到下一个该执行的专业 Agent(如研究员、开发者或审查员) [5-7]。
- 状态共享:在监督者模式中,所有子 Agent 执行完毕后会将控制权交回给监督者,实现多角色的循环协作 [7]。
- 层级 Agent(Sub-Graph):LangGraph 支持将某一部分复杂的业务逻辑(例如内部研究团队的信息搜索与摘要生成)独立封装为子图 [7]。
- 模块化集成:构建好的子图可以作为单个 Node 节点无缝添加到主图(Main Graph)中,极大地提升了大型工作流的可维护性 [8]。
幻灯片 6:状态持久化与平台部署
- 本地与生产数据库:支持使用
SqliteSaver进行本地存储,生产环境中推荐使用PostgresSaver连接 PostgreSQL 数据库实现持久化 [8]。 - 部署配置:通过编写
langgraph.json文件,可以统一管理项目的依赖、环境变量及图的入口文件 [9]。 - LangGraph Platform:支持将应用快速部署到 LangGraph Cloud,或使用 LangGraph CLI 启动本地开发服务器 [9]。
- API 交互:部署后,能够通过标准的 REST API 发起调用请求并获取流式响应 [9]。
幻灯片 7:框架对比与最佳实践
- 核心优势对比:相比于 LangChain LCEL、AutoGen 或 CrewAI,LangGraph 提供显式状态管理,并且原生支持循环机制、人工介入及可视化管理 [9]。
- 状态与节点设计:强烈建议使用
TypedDict定义状态字段,利用reducer处理列表数据追加;确保每个节点粒度细化,只做一件事 [1, 9]。 - 安全性与防范无限循环:在执行循环工作流时,所有循环都必须设置
recursion_limit上限,以防系统陷入死循环 [9, 10]。 - 异常处理与持久化:必须在节点内部处理异常并通过状态字段传递;生产环境中必须配置持久化 Checkpointer,否则重启会丢失所有状态 [9, 10]。
博客摘要 + 核心看点 点击展开
LangGraph 工作流编排:从 DAG 到循环 Agent — summary
SEO 友好博客摘要
本文全面解析 LangGraph 工作流编排框架,探讨其如何打破传统 DAG 限制,通过原生支持循环(Cycles)实现 Agent 的反复推理与复杂交互 [1]。文章深入讲解了基于状态机的核心架构,包括节点、条件边与状态传递 [1],并展示了从构建基础 Graph 到进阶的 Human-in-the-Loop(人工介入)与时间旅行回溯机制 [1-3]。此外,博文还详细解析了多 Agent 协作(监督者模式与层级子图)以及 PostgreSQL 等数据库的持久化部署方案 [3-5]。想掌握新一代大模型复杂业务编排,这篇文章不容错过!
核心看点
- 突破传统 DAG 局限:原生支持循环与状态机建模,轻松实现 Agent 的反复推理与复杂逻辑流转 [1]。
- 人工介入与时间旅行:基于检查点持久化机制,支持工作流断点暂停、人工审核确认及状态回溯 [1-3]。
- 多 Agent 协作编排:内置监督者模式与层级子图架构,高效构建并持久化复杂的智能体工作流 [3-5]。
60 秒短视频脚本 点击展开
LangGraph 工作流编排:从 DAG 到循环 Agent — video
这是一份为您定制的 60 秒短视频脚本,严格按照您的字数与结构要求编写:
【钩子开场】(13字)
大模型如何像人一样循环思考?
【核心解说 1】(28字)
它突破单向执行,采用状态机原生支持循环,让 AI 反复推理直至满意。[1]
【核心解说 2】(30字)
借检查点机制,它原生支持人工介入审核,甚至能时间旅行回溯历史状态。[1-3]
【核心解说 3】(30字)
完美支持多智能体协同,借监督者路由或层级子图编排轻松驾驭复杂任务。[3-5]
【收束结尾】
LangGraph,构建复杂 Agent 工作流的终极利器![1, 6]
课后巩固
与本文内容匹配的闪卡与测验,帮助巩固所学知识
延伸阅读
根据本文主题,为你推荐相关的学习资料