多智能体编排模式:从串行到蜂群
AI 导读
多智能体编排模式:从串行到蜂群 串行/并行/层级/蜂群四大编排模式、LangGraph 状态机编排、CrewAI 角色协作与故障处理实战 引言 单一 Agent 的能力上限由其上下文窗口和工具集决定。当任务复杂度超过单 Agent 的能力边界时,需要多个 Agent 协作完成——就像一个团队比一个人能处理更复杂的项目一样。 多智能体编排(Multi-Agent...
多智能体编排模式:从串行到蜂群
串行/并行/层级/蜂群四大编排模式、LangGraph 状态机编排、CrewAI 角色协作与故障处理实战
引言
单一 Agent 的能力上限由其上下文窗口和工具集决定。当任务复杂度超过单 Agent 的能力边界时,需要多个 Agent 协作完成——就像一个团队比一个人能处理更复杂的项目一样。
多智能体编排(Multi-Agent Orchestration)的核心问题是:如何让多个 Agent 高效协作,同时保持可控性和可观测性?本文系统对比四种编排模式,并给出 LangGraph 和 CrewAI 的实战代码。
编排模式全景
四种核心模式
模式 1: 串行 (Sequential / Pipeline)
Agent A ──→ Agent B ──→ Agent C ──→ 结果
特点: 简单可预测, 总延迟 = 各阶段之和
模式 2: 并行 (Parallel / Fan-out Fan-in)
┌──→ Agent B ──┐
Agent A ─┤ ├──→ Agent D (汇总)
└──→ Agent C ──┘
特点: 高吞吐, 总延迟 = 最慢分支
模式 3: 层级 (Hierarchical / Manager-Worker)
┌── Agent B
Agent A ────┤── Agent C
(Manager) └── Agent D
特点: 动态分派, Manager 协调全局
模式 4: 蜂群 (Swarm / Handoff)
Agent A ←──→ Agent B
↕ ↕
Agent C ←──→ Agent D
特点: 去中心化, 任意节点可交接
模式选型对比
| 维度 | 串行 | 并行 | 层级 | 蜂群 |
|---|---|---|---|---|
| 复杂度 | 低 | 中 | 中 | 高 |
| 延迟 | 高(顺序累加) | 低(最慢分支) | 中 | 不确定 |
| 可控性 | 高 | 高 | 中 | 低 |
| 灵活性 | 低 | 中 | 高 | 极高 |
| 调试难度 | 低 | 中 | 中 | 高 |
| 适用场景 | 固定流程 | 独立子任务 | 复杂项目管理 | 对话式协作 |
| 示例 | 翻译流水线 | 多源研究 | 软件开发团队 | 客服路由 |
LangGraph 编排实战
串行流水线
# Multi-agent translation pipeline
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
class TranslationState(TypedDict):
source_text: str
source_lang: str
target_lang: str
initial_translation: str
review_feedback: str
final_translation: str
quality_score: float
messages: Annotated[list, operator.add]
# Agent 1: Translator
async def translate(state: TranslationState) -> dict:
prompt = f"""Translate the following {state['source_lang']} text to {state['target_lang']}.
Focus on accuracy and naturalness.
Text: {state['source_text']}"""
result = await call_llm(prompt, model="gpt-4o")
return {"initial_translation": result, "messages": [f"Translator: completed"]}
# Agent 2: Reviewer
async def review(state: TranslationState) -> dict:
prompt = f"""Review this translation for accuracy, fluency, and cultural appropriateness.
Source ({state['source_lang']}): {state['source_text']}
Translation ({state['target_lang']}): {state['initial_translation']}
Provide specific feedback on:
1. Accuracy errors
2. Fluency issues
3. Cultural adaptation needs"""
feedback = await call_llm(prompt, model="claude-sonnet-4-20250514")
return {"review_feedback": feedback, "messages": [f"Reviewer: completed"]}
# Agent 3: Editor
async def edit(state: TranslationState) -> dict:
prompt = f"""Improve this translation based on the reviewer's feedback.
Original: {state['source_text']}
Current translation: {state['initial_translation']}
Feedback: {state['review_feedback']}
Produce the final polished translation."""
final = await call_llm(prompt, model="gpt-4o")
return {"final_translation": final, "messages": [f"Editor: completed"]}
# Agent 4: Quality Assessor
async def assess_quality(state: TranslationState) -> dict:
prompt = f"""Rate this translation on a scale of 0-1.
Source: {state['source_text']}
Translation: {state['final_translation']}
Respond with only a number."""
score = float(await call_llm(prompt, model="gpt-4o-mini"))
return {"quality_score": score, "messages": [f"QA: score={score:.2f}"]}
# Build graph
workflow = StateGraph(TranslationState)
workflow.add_node("translate", translate)
workflow.add_node("review", review)
workflow.add_node("edit", edit)
workflow.add_node("assess", assess_quality)
workflow.add_edge(START, "translate")
workflow.add_edge("translate", "review")
workflow.add_edge("review", "edit")
workflow.add_edge("edit", "assess")
# Conditional: re-translate if quality too low
def should_retry(state: TranslationState) -> str:
if state.get("quality_score", 0) < 0.85:
return "translate" # Retry from scratch
return END
workflow.add_conditional_edges("assess", should_retry)
app = workflow.compile()
并行研究
# Parallel research with fan-out/fan-in
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
class ResearchState(TypedDict):
query: str
web_results: str
academic_results: str
code_results: str
synthesis: str
messages: Annotated[list, operator.add]
# Parallel research agents
async def search_web(state: ResearchState) -> dict:
results = await web_search(state["query"])
summary = await call_llm(
f"Summarize these web results for: {state['query']}\n{results}"
)
return {"web_results": summary, "messages": ["Web researcher: done"]}
async def search_academic(state: ResearchState) -> dict:
papers = await arxiv_search(state["query"])
summary = await call_llm(
f"Summarize these academic papers for: {state['query']}\n{papers}"
)
return {"academic_results": summary, "messages": ["Academic researcher: done"]}
async def search_code(state: ResearchState) -> dict:
repos = await github_search(state["query"])
summary = await call_llm(
f"Summarize these code repositories for: {state['query']}\n{repos}"
)
return {"code_results": summary, "messages": ["Code researcher: done"]}
# Synthesis agent (fan-in)
async def synthesize(state: ResearchState) -> dict:
prompt = f"""Synthesize findings from multiple sources about: {state['query']}
Web findings: {state['web_results']}
Academic findings: {state['academic_results']}
Code examples: {state['code_results']}
Produce a comprehensive analysis."""
result = await call_llm(prompt, model="claude-sonnet-4-20250514")
return {"synthesis": result, "messages": ["Synthesizer: done"]}
# Build parallel graph
workflow = StateGraph(ResearchState)
workflow.add_node("search_web", search_web)
workflow.add_node("search_academic", search_academic)
workflow.add_node("search_code", search_code)
workflow.add_node("synthesize", synthesize)
# Fan-out: all three search agents run in parallel
workflow.add_edge(START, "search_web")
workflow.add_edge(START, "search_academic")
workflow.add_edge(START, "search_code")
# Fan-in: all three must complete before synthesis
workflow.add_edge("search_web", "synthesize")
workflow.add_edge("search_academic", "synthesize")
workflow.add_edge("search_code", "synthesize")
workflow.add_edge("synthesize", END)
app = workflow.compile()
层级管理模式
# Hierarchical: Manager delegates to specialized workers
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Literal
class ProjectState(TypedDict):
task_description: str
plan: str
subtasks: list[dict]
results: dict[str, str]
final_output: str
async def manager_plan(state: ProjectState) -> dict:
"""Manager analyzes task and creates execution plan."""
prompt = f"""You are a project manager. Analyze this task and break it into subtasks.
Assign each subtask to the most appropriate specialist: researcher, coder, writer, reviewer.
Task: {state['task_description']}
Output as JSON: [{{"subtask": "...", "assignee": "researcher|coder|writer|reviewer", "priority": 1}}]"""
plan = await call_llm(prompt, model="claude-sonnet-4-20250514")
subtasks = parse_json(plan)
return {"plan": plan, "subtasks": subtasks}
async def route_to_worker(state: ProjectState) -> str:
"""Route to the next unfinished subtask's assignee."""
for task in state.get("subtasks", []):
if task["subtask"] not in state.get("results", {}):
return task["assignee"]
return "manager_review"
async def researcher(state: ProjectState) -> dict:
task = get_current_task(state, "researcher")
result = await call_llm(f"Research: {task['subtask']}")
results = state.get("results", {})
results[task["subtask"]] = result
return {"results": results}
async def coder(state: ProjectState) -> dict:
task = get_current_task(state, "coder")
result = await call_llm(f"Write code for: {task['subtask']}")
results = state.get("results", {})
results[task["subtask"]] = result
return {"results": results}
async def writer(state: ProjectState) -> dict:
task = get_current_task(state, "writer")
result = await call_llm(f"Write documentation for: {task['subtask']}")
results = state.get("results", {})
results[task["subtask"]] = result
return {"results": results}
async def manager_review(state: ProjectState) -> dict:
"""Manager reviews all results and produces final output."""
prompt = f"""Review all completed subtasks and produce the final deliverable.
Original task: {state['task_description']}
Plan: {state['plan']}
Results: {state['results']}"""
final = await call_llm(prompt, model="claude-sonnet-4-20250514")
return {"final_output": final}
# Build hierarchical graph
workflow = StateGraph(ProjectState)
workflow.add_node("manager_plan", manager_plan)
workflow.add_node("researcher", researcher)
workflow.add_node("coder", coder)
workflow.add_node("writer", writer)
workflow.add_node("manager_review", manager_review)
workflow.add_edge(START, "manager_plan")
workflow.add_conditional_edges("manager_plan", route_to_worker)
workflow.add_conditional_edges("researcher", route_to_worker)
workflow.add_conditional_edges("coder", route_to_worker)
workflow.add_conditional_edges("writer", route_to_worker)
workflow.add_edge("manager_review", END)
app = workflow.compile()
故障处理
Agent 级别容错
# Resilient agent wrapper
from langgraph.graph import StateGraph
import asyncio
class AgentError(Exception):
def __init__(self, agent_name: str, original_error: Exception):
self.agent_name = agent_name
self.original_error = original_error
super().__init__(f"Agent {agent_name} failed: {original_error}")
def resilient_agent(
agent_fn,
name: str,
max_retries: int = 2,
timeout_seconds: int = 60,
fallback_fn=None,
):
"""Wrap an agent function with retry, timeout, and fallback."""
async def wrapper(state):
last_error = None
for attempt in range(max_retries + 1):
try:
result = await asyncio.wait_for(
agent_fn(state),
timeout=timeout_seconds,
)
return result
except asyncio.TimeoutError:
last_error = TimeoutError(f"Timeout after {timeout_seconds}s")
except Exception as e:
last_error = e
if attempt < max_retries:
await asyncio.sleep(2 ** attempt) # Exponential backoff
# All retries exhausted, try fallback
if fallback_fn:
try:
return await fallback_fn(state)
except Exception:
pass
raise AgentError(name, last_error)
wrapper.__name__ = name
return wrapper
全局状态恢复
# Checkpoint and recovery with LangGraph persistence
from langgraph.checkpoint.sqlite import SqliteSaver
# Create checkpointer
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
# Compile graph with checkpointing
app = workflow.compile(checkpointer=checkpointer)
# Run with thread_id for persistence
config = {"configurable": {"thread_id": "project-123"}}
# If interrupted, resume from last checkpoint
try:
result = await app.ainvoke(initial_state, config)
except Exception as e:
print(f"Interrupted at: {e}")
# Resume from checkpoint
state = await app.aget_state(config)
print(f"Last completed step: {state.next}")
result = await app.ainvoke(None, config) # Resume
编排模式选型决策
选型决策树:
任务特征分析
│
├─ 步骤固定,顺序明确?
│ └─ YES → 串行流水线
│
├─ 多个独立子任务?
│ └─ YES → 并行 (Fan-out/Fan-in)
│
├─ 需要动态分派和协调?
│ ├─ 有明确的协调者角色?
│ │ └─ YES → 层级 (Manager-Worker)
│ └─ 任务在 Agent 间流转?
│ └─ YES → 蜂群 (Swarm/Handoff)
│
└─ 混合模式?
└─ 大多数实际系统是多种模式的组合
总结
- 从简单模式开始:串行流水线是最容易理解和调试的模式,除非有明确的并行需求,否则先用串行。
- 并行要有汇总点:Fan-out 必须配合 Fan-in,否则结果散落各处无法整合。
- 层级模式最像人类团队:Manager 负责规划和协调,Worker 负责执行,适合复杂项目。
- 蜂群模式灵活但难调试:去中心化意味着执行路径不确定,需要完善的日志和追踪。
- 容错是必需的:每个 Agent 都可能失败,重试、超时和降级机制必须从一开始就设计进去。
Maurice | [email protected]
深度加工(NotebookLM 生成)
基于本文内容生成的 PPT 大纲、博客摘要、短视频脚本与 Deep Dive 播客,用于多场景复用
PPT 大纲(5-8 张幻灯片) 点击展开
多智能体编排模式:从串行到蜂群 — ppt
这是一份基于您提供的关于“多智能体编排模式”文章的 PPT 大纲,共包含 7 张幻灯片,采用 Markdown 格式输出:
幻灯片 1:多智能体编排模式引言
- 单智能体的局限性:单一 Agent 的能力上限受限于其上下文窗口和可用工具集,面对复杂任务时显得力不从心 [1]。
- 多智能体协作的核心问题:在让多个 Agent 协作完成复杂项目时,最大的挑战是如何保证协作的高效性,同时维持系统的可控性与可观测性 [1]。
- 四大核心编排模式:解决复杂任务的四种主要模式包括串行(Sequential)、并行(Parallel)、层级(Hierarchical)和蜂群(Swarm) [1]。
幻灯片 2:模式一 —— 串行编排 (Sequential)
- 核心特点:架构简单且行为可预测,系统总延迟等于各个处理阶段延迟的总和 [1]。
- 优缺点分析:该模式具有很高的可控性,调试难度低,但灵活性较弱 [1]。
- 最佳适用场景:适用于步骤明确、流程固定的任务,例如包含初译、审查、编辑到质量评估环节的自动翻译流水线 [1-4]。
幻灯片 3:模式二 —— 并行编排 (Parallel)
- 核心特点:拥有高吞吐量,系统的总延迟仅取决于执行最慢的那个分支 [1]。
- 关键设计原则 (Fan-out / Fan-in):必须将任务发散(Fan-out)给不同的 Agent 执行,并且必须设置一个汇总点(Fan-in)来整合散落的结果 [1, 5]。
- 最佳适用场景:适合处理相互独立的子任务,例如同时派出不同 Agent 去搜索网络、学术论文和代码库,最后统一由合成 Agent 输出深度分析报告的研究场景 [4, 6, 7]。
幻灯片 4:模式三 —— 层级编排 (Hierarchical)
- 核心特点:采用“管理者-执行者”结构,由 Manager 负责全局的协调和任务的动态分派,工作模式最接近真实的人类团队协作 [1, 5]。
- 执行流程:管理者首先分析任务并生成拆分计划,随后将子任务路由给专业执行者(如研究员、程序员、作家),最终汇总并审查最终交付物 [8, 9]。
- 最佳适用场景:非常适合复杂的项目管理与统筹,例如由多个专业角色组成的软件开发团队 [1]。
幻灯片 5:模式四 —— 蜂群编排 (Swarm)
- 核心特点:完全去中心化的架构,任意节点(Agent)之间都可以自由进行任务的交接(Handoff) [1]。
- 优缺点分析:提供了极高的系统灵活性,但牺牲了可控性;由于执行路径高度不确定,因此调试难度非常大,需要完善的日志和追踪机制 [1, 5]。
- 最佳适用场景:适合对话式协作以及复杂的客服请求路由分配等场景 [1]。
幻灯片 6:系统的故障处理与容错机制
- 容错的必要性:在多智能体系统中,每个 Agent 都有失败的风险,因此必须从设计初期就引入容错机制 [5]。
- Agent 级别的弹性和重试:需要为 Agent 封装重试(指数退避)、超时限制(Timeout)以及降级备选(Fallback)处理函数 [5, 10]。
- 全局状态恢复与持久化:系统应支持基于数据库(如 SQLite)的检查点(Checkpoint)机制,一旦任务中断,可以通过
thread_id从上一个成功完成的步骤继续执行,避免从头再来 [5]。
幻灯片 7:选型决策与最佳实践总结
- 从简单模式起步:串行流水线是最易于理解和调试的,除非有明确的并行执行需求,否则应优先尝试使用串行模式 [5]。
- 依据任务特征选型:步骤固定选串行,独立任务选并行,需协调全局选层级,依赖节点间流转选蜂群 [5]。
- 模式的混合使用:在大多数实际的复杂系统中,往往不需要拘泥于单一方案,而是会将多种编排模式组合使用以达到最优效果 [5]。
博客摘要 + 核心看点 点击展开
多智能体编排模式:从串行到蜂群 — summary
SEO 友好博客摘要
探索多智能体编排(Multi-Agent Orchestration)的核心秘籍!当单Agent面临能力瓶颈,多智能体协作成为必由之路[1]。本文深度解析串行、并行、层级与蜂群四大核心编排模式的优劣势及适用场景[1]。文章不仅提供了丰富的LangGraph框架实战代码演示[1, 2],更总结了实用的架构选型决策树与Agent容错降级、状态恢复机制指南[3]。为您打造高效、可控且健壮的AI智能体协作团队提供全方位的落地参考。
核心看点
- 四大核心模式对比:全景解析串行、并行、层级与蜂群编排的优劣及适用场景[1]。
- LangGraph代码实战:提供多种协同工作流的构建示例,助您快速上手开发[1, 2]。
- 选型决策与容错指南:内置架构选型树,并详解超时、重试等Agent级容错方案[3]。
60 秒短视频脚本 点击展开
多智能体编排模式:从串行到蜂群 — video
这是一份为您定制的 60 秒短视频脚本,严格按照您的字数和结构要求撰写,并附上了参考的来源标记:
【钩子开场】(13字)
单个AI搞不定复杂任务怎么办?[1]
【核心解说一】(26字)
多智能体专克复杂任务。串行宛如流水线,并行则多路并发处理再汇总。[1, 2]
【核心解说二】(26字)
层级模式最像人类团队运作,由管理者全局规划,动态分派给具体执行者。[1, 3, 4]
【核心解说三】(27字)
蜂群模式去中心化,交接灵活但难调试。此外各模式都必须加容错机制。[1, 4]
【结尾收束】
根据任务选对模式,从最简单的串行开始,打造你的高效AI团队吧![4]
课后巩固
与本文内容匹配的闪卡与测验,帮助巩固所学知识
延伸阅读
根据本文主题,为你推荐相关的学习资料