第 11 期 | 并行执行 (Parallel Execution):分治机制
同学们好,欢迎回到《LangGraph 多智能体专家课》。我是你们的导师。
在上一期中,我们成功给「AI 万能内容创作机构 (AI Content Agency)」引入了记忆机制,让我们的 Agent 拥有了上下文连贯的能力。但是,随着我们机构接到的订单越来越复杂,客户的要求越来越高,我们遇到了一个严重的性能瓶颈。
想象一下这个场景:客户要求写一篇《2024 年中美日三国 AI 产业发展报告》。 按照我们之前的线性工作流:Planner 拆解出三个子话题(中国、美国、日本) $\rightarrow$ Researcher 去查中国的资料 $\rightarrow$ 等查完了,再去查美国的 $\rightarrow$ 再去查日本的 $\rightarrow$ 最终交给 Writer。
这叫什么?这叫串行执行(Sequential Execution)。如果每个国家的资料收集需要 10 秒,光是 Research 环节就要耗费 30 秒!在真实的商业环境中,你的客户会等到失去耐心,你的计算资源也没有被充分利用。
真正的顶级内容机构是怎么做的?主编(Planner)把任务拆解后,同时派出 3 个研究员(Researcher),小张查中国,小李查美国,小王查日本。大家兵分三路,谁查完谁把资料扔到汇总池里,最后作家(Writer)一并拿走写稿。
这就是我们今天的主题:并行执行(Parallel Execution)与分治机制(Divide and Conquer)。在分布式系统里,我们称之为 Fan-out(扇出)与 Fan-in(扇入)。
准备好重构我们的工作流了吗?发车!
🎯 本期学习目标
通过本期的实战,你将掌握以下高阶技能:
- 理解 Fan-out & Fan-in 架构:掌握多智能体协同中的动态分发与结果聚合原理。
- 驾驭 LangGraph
SendAPI:告别静态的条件分支,学会如何根据运行时数据动态创建多个节点实例。 - 掌握并发状态聚合 (State Reduction):解决多节点同时写入 State 时产生的数据覆盖(Overwrite)灾难。
- 重构 Agency 核心流:为我们的内容机构装上“多线程”引擎,效率提升 N 倍。
📖 原理解析
在 LangGraph 中实现并行,有两种方式。 第一种是静态并行:一个节点直接连向固定的三个节点,这很简单,但不够灵活。因为在我们的 Agency 中,Planner 拆解出的话题数量是动态的(有时是 2 个,有时是 5 个)。
因此,我们需要第二种:动态并行(Dynamic Parallelism)。
LangGraph 为此提供了一个极其优雅的魔法武器:Send 对象。
当你从一个节点或条件边返回一个 [Send("node_name", payload), Send("node_name", payload)] 列表时,LangGraph 会瞬间克隆出对应数量的 node_name 节点,并将不同的 payload 并发地喂给它们。
这就好比 Planner 站在办公室中央大喊一声:“来 3 个研究员!” 瞬间 3 个 Researcher 拔地而起,拿着各自的子话题去干活。
敲黑板!看下面这张我们 Agency 重构后的 Mermaid 架构图:
graph TD
Start((Start)) --> Planner[Planner Node\n主编拆解话题]
Planner -- "Fan-out (扇出)\n返回 [Send, Send, Send]" --> Dispatch{Dispatch Router}
Dispatch -.->|Send payload 1| Researcher1[Researcher Instance 1\n小张查话题A]
Dispatch -.->|Send payload 2| Researcher2[Researcher Instance 2\n小李查话题B]
Dispatch -.->|Send payload 3| Researcher3[Researcher Instance 3\n小王查话题C]
Researcher1 -.->|Fan-in (扇入)\nAppend to List| Aggregate((State Aggregation))
Researcher2 -.->|Fan-in (扇入)\nAppend to List| Aggregate
Researcher3 -.->|Fan-in (扇入)\nAppend to List| Aggregate
Aggregate --> Writer[Writer Node\n作家整合撰写]
Writer --> End((End))
classDef core fill:#2d3436,stroke:#74b9ff,stroke-width:2px,color:#fff;
classDef agent fill:#0984e3,stroke:#fff,stroke-width:2px,color:#fff;
classDef router fill:#d63031,stroke:#fff,stroke-width:2px,color:#fff;
class Planner,Writer core;
class Researcher1,Researcher2,Researcher3 agent;
class Dispatch router;工作流解析:
- Planner 确定文章大纲,生成一个
sub_topics列表。 - Dispatch Router 遍历这个列表,生成多个
Send对象,触发 Fan-out。 - 多个 Researcher 实例并行运行(LangGraph 底层会自动处理并发)。
- 每个 Researcher 运行结束后,将结果写入 State。这里的 State 必须使用特定的 Reducer(如
operator.add)来实现 Fan-in 聚合,否则后执行完的 Researcher 会把先执行完的结果覆盖掉。 - 所有 Researcher 都完成后,流程才会流转到 Writer。
💻 实战代码演练
废话不多说,Show me the code。
为了让大家能直接运行体验,本期的 Demo 我使用 Mock(模拟)的 LLM 调用,通过 time.sleep 来展示并行的威力。
第一步:定义支持聚合的 State
这是最容易踩坑的地方!我们在 State 中定义 research_results 时,必须使用 Annotated 和 operator.add。这告诉 LangGraph:“当多个节点同时往这个字段写数据时,不要覆盖,而是把它们**拼接(Append)**起来!”
import operator
import time
from typing import Annotated, TypedDict, List
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
# ==========================================
# 1. 定义状态 (State Definition)
# ==========================================
class AgencyState(TypedDict):
main_topic: str # 客户给定的主话题
sub_topics: List[str] # Planner 拆解出的子话题列表
# ⚠️ 核心点:使用 operator.add 确保并行写入时结果被追加到列表中,而不是互相覆盖
research_results: Annotated[List[str], operator.add]
final_article: str # 最终生成的文章
# 为了给 Send 传递特定参数,我们定义一个 Researcher 专用的 State
class ResearcherState(TypedDict):
sub_topic: str
第二步:编写节点逻辑 (Nodes)
接下来,我们来实现 Agency 的三个核心员工:主编 (Planner)、研究员 (Researcher) 和作家 (Writer)。
# ==========================================
# 2. 定义节点逻辑 (Node Functions)
# ==========================================
def planner_node(state: AgencyState):
"""
主编节点:负责将主话题拆解为多个子话题。
The Planner: Breaks down the main topic into sub-topics.
"""
print(f"👨💼 [Planner] 正在拆解主话题: '{state['main_topic']}'...")
# 模拟 LLM 拆解逻辑
mock_sub_topics = [
f"{state['main_topic']} 的历史背景",
f"{state['main_topic']} 的核心技术",
f"{state['main_topic']} 的未来趋势"
]
print(f"👨💼 [Planner] 拆解完成,生成了 {len(mock_sub_topics)} 个子话题。准备分发!")
# 将拆解的子话题更新到 State 中
return {"sub_topics": mock_sub_topics}
def researcher_node(state: ResearcherState):
"""
研究员节点:负责针对【单个】子话题进行深度研究。
The Researcher: Conducts deep research on a SINGLE sub-topic.
"""
topic = state["sub_topic"]
print(f" 🔍 [Researcher] 收到任务,开始搜索: '{topic}'...")
# 模拟耗时的网络搜索和阅读过程 (用 sleep 模拟)
# 注意:在真实的 LangGraph 异步执行中,这里的阻塞会被高效处理
time.sleep(2)
mock_result = f"【关于 '{topic}' 的研究报告】:这是一些非常有价值的深度信息..."
print(f" ✅ [Researcher] 完成搜索: '{topic}'")
# ⚠️ 核心点:返回字典的 key 必须与 AgencyState 中需要聚合的字段名一致
# 因为我们使用了 operator.add,这里返回的列表会被追加到总列表中
return {"research_results": [mock_result]}
def writer_node(state: AgencyState):
"""
作家节点:收集所有研究员的报告,撰写最终文章。
The Writer: Aggregates all research reports and writes the final article.
"""
print(f"\n✍️ [Writer] 收到所有研究资料,开始疯狂码字...")
results = state.get("research_results", [])
print(f"✍️ [Writer] 共收到 {len(results)} 份研究报告。")
# 模拟写作过程
article = f"# {state['main_topic']} 深度解析\n\n"
for i, res in enumerate(results, 1):
article += f"## 章节 {i}\n{res}\n\n"
print(f"✍️ [Writer] 完稿!")
return {"final_article": article}
第三步:见证奇迹的路由 (The Dispatch Router)
这是 Fan-out 的灵魂所在。我们不直接用边连接 Planner 和 Researcher,而是使用一个条件路由函数,返回 Send 对象。
# ==========================================
# 3. 动态分发路由 (Dynamic Dispatch Router)
# ==========================================
def dispatch_researchers(state: AgencyState):
"""
分发器:根据 Planner 生成的子话题数量,动态创建对应数量的 Researcher 实例。
The Dispatcher: Dynamically creates Researcher instances based on sub_topics.
"""
sub_topics = state.get("sub_topics", [])
# ⚠️ 核心点:为每个子话题创建一个 Send 对象
# Send("目标节点名称", 传递给该节点的 State Payload)
sends = []
for topic in sub_topics:
sends.append(Send("researcher", {"sub_topic": topic}))
return sends
第四步:组装图并运行 (Build & Run)
# ==========================================
# 4. 构建并编译 Graph (Build the Graph)
# ==========================================
builder = StateGraph(AgencyState)
# 添加节点
builder.add_node("planner", planner_node)
builder.add_node("researcher", researcher_node)
builder.add_node("writer", writer_node)
# 设置控制流
builder.add_edge(START, "planner")
# Planner 结束后,触发动态分发 (Fan-out)
builder.add_conditional_edges(
"planner",
dispatch_researchers,
# 这里声明 Send 可能会路由到的节点
["researcher"]
)
# 所有的 researcher 节点执行完毕后,统一流向 writer (Fan-in)
builder.add_edge("researcher", "writer")
builder.add_edge("writer", END)
# 编译图
graph = builder.compile()
# ==========================================
# 5. 测试运行 (Test Run)
# ==========================================
if __name__ == "__main__":
print("🚀 启动 AI Content Agency 并行工作流...\n")
initial_state = {
"main_topic": "量子计算",
"research_results": [] # 初始化为空列表
}
# 记录开始时间
start_time = time.time()
# 运行 Graph
final_state = graph.invoke(initial_state)
# 记录结束时间
end_time = time.time()
print("\n" + "="*40)
print("🏆 最终产出文章预览:")
print("="*40)
print(final_state["final_article"])
print("="*40)
print(f"⏱️ 总耗时: {end_time - start_time:.2f} 秒")
# 如果是串行,3个话题每个sleep 2秒,至少需要 6 秒。
# 因为是并行,总耗时应该在 2 秒多一点!
当你运行这段代码时,你会看到控制台打印出如下迷人的输出:
🚀 启动 AI Content Agency 并行工作流...
👨💼 [Planner] 正在拆解主话题: '量子计算'...
👨💼 [Planner] 拆解完成,生成了 3 个子话题。准备分发!
🔍 [Researcher] 收到任务,开始搜索: '量子计算 的历史背景'...
🔍 [Researcher] 收到任务,开始搜索: '量子计算 的核心技术'...
🔍 [Researcher] 收到任务,开始搜索: '量子计算 的未来趋势'...
✅ [Researcher] 完成搜索: '量子计算 的历史背景'
✅ [Researcher] 完成搜索: '量子计算 的未来趋势'
✅ [Researcher] 完成搜索: '量子计算 的核心技术'
✍️ [Writer] 收到所有研究资料,开始疯狂码字...
✍️ [Writer] 共收到 3 份研究报告。
✍️ [Writer] 完稿!
========================================
🏆 最终产出文章预览:
========================================
# 量子计算 深度解析
## 章节 1
【关于 '量子计算 的历史背景' 的研究报告】:这是一些非常有价值的深度信息...
## 章节 2
【关于 '量子计算 的核心技术' 的研究报告】:这是一些非常有价值的深度信息...
## 章节 3
【关于 '量子计算 的未来趋势' 的研究报告】:这是一些非常有价值的深度信息...
========================================
⏱️ 总耗时: 2.03 秒
同学们,看到最后的耗时了吗?2.03 秒! 3 个耗时 2 秒的任务并发执行,总时间依然是 2 秒。这就是并行分治的绝对力量。你的 Agency 现在的吞吐量已经达到了工业级水准。
坑与避坑指南
作为你们的导师,我不仅要教你们怎么写出能跑的代码,更要告诉你们在生产环境里会流哪些血。使用 并行机制 时,有三个“天坑”必须防范:
💣 坑一:忘记使用 Reducer 导致状态被无情覆写 (State Overwrite)
症状:派出了 5 个研究员,最后 Writer 只收到了一份报告。
诊断:在 TypedDict 中定义 research_results: list[str] 时,没有加 Annotated[..., operator.add]。LangGraph 默认行为是覆盖(Overwrite)。5 个并行节点同时返回 {"research_results": [...]},最后执行完的那个节点会把前面 4 个的心血全部抹杀。
解药:永远记得给 Fan-in 的目标字段加上 Reducer。
💣 坑二:并发风暴引发 API 限流 (Rate Limit 429 Error)
症状:Planner 拆解出了 20 个子话题,瞬间派出了 20 个 Researcher 请求 OpenAI API,直接被 OpenAI 报 HTTP 429 Too Many Requests 拒绝服务。
诊断:Fan-out 极其凶猛,缺乏并发控制。
解药:在生产环境中,如果你不知道 Planner 会生成多少个任务,千万不要裸奔。你应该在调用 LLM 的底层逻辑中加入重试机制(Retry with Exponential Backoff),比如使用 tenacity 库;或者在 LangGraph 的 RunnableConfig 中限制最大并发数(max_concurrency)。
💣 坑三:木桶效应与死锁风险 (The Slowest Intern Effect)
症状:总耗时异常长。 诊断:Fan-in 机制的特点是必须等待所有派出的 Send 节点都执行完毕,才会走向下一个节点(Writer)。如果小张、小李 2 秒查完了,但小王遇到了网络卡顿,卡了 60 秒,那么整个系统都要等小王 60 秒。 解药:给 Researcher 节点的 LLM 调用或搜索工具设置严格的 Timeout(超时限制)。如果某个 Researcher 超时,应捕获异常并返回一个“该话题暂无数据”的 fallback 结果,决不能让整个 Graph 挂起。
📝 本期小结
今天,我们为「AI 万能内容创作机构」装上了并行处理的涡轮增压引擎。
我们学习了:
- Fan-out(扇出):通过条件边返回
Send对象列表,动态克隆节点。 - Fan-in(扇入):通过
Annotated和operator.add,安全地聚合并发节点产生的数据。 - 性能飞跃:将原本 $O(N)$ 的时间复杂度,在资源允许的情况下压缩到了近似 $O(1)$。
现在,我们的 Planner 运筹帷幄,Researcher 们并驾齐驱,Writer 下笔如有神。但是,如果你仔细看生成的文章,你会发现:如果 Researcher 查回来的资料是垃圾,Writer 就会写出垃圾代码(Garbage in, garbage out)。
机构里是不是还少了一个把关的人? 没错,下一期(第 12 期),我们将引入 Human-in-the-loop (人类在环) 机制。在 Writer 动笔之前,或者交稿之后,我们将让**真实世界的主编(你)**介入工作流,进行审批、打回或修改。
让 Agent 狂奔的同时,把方向盘握在人类手里。我们下期见!👋