第 11 期 | 并行执行 (Parallel Execution):分治机制

更新于 2026/4/14

同学们好,欢迎回到《LangGraph 多智能体专家课》。我是你们的导师。

在上一期中,我们成功给「AI 万能内容创作机构 (AI Content Agency)」引入了记忆机制,让我们的 Agent 拥有了上下文连贯的能力。但是,随着我们机构接到的订单越来越复杂,客户的要求越来越高,我们遇到了一个严重的性能瓶颈

想象一下这个场景:客户要求写一篇《2024 年中美日三国 AI 产业发展报告》。 按照我们之前的线性工作流:Planner 拆解出三个子话题(中国、美国、日本) $\rightarrow$ Researcher 去查中国的资料 $\rightarrow$ 等查完了,再去查美国的 $\rightarrow$ 再去查日本的 $\rightarrow$ 最终交给 Writer。

这叫什么?这叫串行执行(Sequential Execution)。如果每个国家的资料收集需要 10 秒,光是 Research 环节就要耗费 30 秒!在真实的商业环境中,你的客户会等到失去耐心,你的计算资源也没有被充分利用。

真正的顶级内容机构是怎么做的?主编(Planner)把任务拆解后,同时派出 3 个研究员(Researcher),小张查中国,小李查美国,小王查日本。大家兵分三路,谁查完谁把资料扔到汇总池里,最后作家(Writer)一并拿走写稿。

这就是我们今天的主题:并行执行(Parallel Execution)与分治机制(Divide and Conquer)。在分布式系统里,我们称之为 Fan-out(扇出)与 Fan-in(扇入)

准备好重构我们的工作流了吗?发车!


🎯 本期学习目标

通过本期的实战,你将掌握以下高阶技能:

  1. 理解 Fan-out & Fan-in 架构:掌握多智能体协同中的动态分发与结果聚合原理。
  2. 驾驭 LangGraph Send API:告别静态的条件分支,学会如何根据运行时数据动态创建多个节点实例。
  3. 掌握并发状态聚合 (State Reduction):解决多节点同时写入 State 时产生的数据覆盖(Overwrite)灾难。
  4. 重构 Agency 核心流:为我们的内容机构装上“多线程”引擎,效率提升 N 倍。

📖 原理解析

在 LangGraph 中实现并行,有两种方式。 第一种是静态并行:一个节点直接连向固定的三个节点,这很简单,但不够灵活。因为在我们的 Agency 中,Planner 拆解出的话题数量是动态的(有时是 2 个,有时是 5 个)。

因此,我们需要第二种:动态并行(Dynamic Parallelism)

LangGraph 为此提供了一个极其优雅的魔法武器:Send 对象。 当你从一个节点或条件边返回一个 [Send("node_name", payload), Send("node_name", payload)] 列表时,LangGraph 会瞬间克隆出对应数量的 node_name 节点,并将不同的 payload 并发地喂给它们。

这就好比 Planner 站在办公室中央大喊一声:“来 3 个研究员!” 瞬间 3 个 Researcher 拔地而起,拿着各自的子话题去干活。

敲黑板!看下面这张我们 Agency 重构后的 Mermaid 架构图:

graph TD
    Start((Start)) --> Planner[Planner Node\n主编拆解话题]
    
    Planner -- "Fan-out (扇出)\n返回 [Send, Send, Send]" --> Dispatch{Dispatch Router}
    
    Dispatch -.->|Send payload 1| Researcher1[Researcher Instance 1\n小张查话题A]
    Dispatch -.->|Send payload 2| Researcher2[Researcher Instance 2\n小李查话题B]
    Dispatch -.->|Send payload 3| Researcher3[Researcher Instance 3\n小王查话题C]
    
    Researcher1 -.->|Fan-in (扇入)\nAppend to List| Aggregate((State Aggregation))
    Researcher2 -.->|Fan-in (扇入)\nAppend to List| Aggregate
    Researcher3 -.->|Fan-in (扇入)\nAppend to List| Aggregate
    
    Aggregate --> Writer[Writer Node\n作家整合撰写]
    Writer --> End((End))

    classDef core fill:#2d3436,stroke:#74b9ff,stroke-width:2px,color:#fff;
    classDef agent fill:#0984e3,stroke:#fff,stroke-width:2px,color:#fff;
    classDef router fill:#d63031,stroke:#fff,stroke-width:2px,color:#fff;
    
    class Planner,Writer core;
    class Researcher1,Researcher2,Researcher3 agent;
    class Dispatch router;

工作流解析:

  1. Planner 确定文章大纲,生成一个 sub_topics 列表。
  2. Dispatch Router 遍历这个列表,生成多个 Send 对象,触发 Fan-out
  3. 多个 Researcher 实例并行运行(LangGraph 底层会自动处理并发)。
  4. 每个 Researcher 运行结束后,将结果写入 State。这里的 State 必须使用特定的 Reducer(如 operator.add)来实现 Fan-in 聚合,否则后执行完的 Researcher 会把先执行完的结果覆盖掉。
  5. 所有 Researcher 都完成后,流程才会流转到 Writer

💻 实战代码演练

废话不多说,Show me the code。 为了让大家能直接运行体验,本期的 Demo 我使用 Mock(模拟)的 LLM 调用,通过 time.sleep 来展示并行的威力。

第一步:定义支持聚合的 State

这是最容易踩坑的地方!我们在 State 中定义 research_results 时,必须使用 Annotatedoperator.add。这告诉 LangGraph:“当多个节点同时往这个字段写数据时,不要覆盖,而是把它们**拼接(Append)**起来!”

import operator
import time
from typing import Annotated, TypedDict, List
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send

# ==========================================
# 1. 定义状态 (State Definition)
# ==========================================
class AgencyState(TypedDict):
    main_topic: str                  # 客户给定的主话题
    sub_topics: List[str]            # Planner 拆解出的子话题列表
    # ⚠️ 核心点:使用 operator.add 确保并行写入时结果被追加到列表中,而不是互相覆盖
    research_results: Annotated[List[str], operator.add] 
    final_article: str               # 最终生成的文章

# 为了给 Send 传递特定参数,我们定义一个 Researcher 专用的 State
class ResearcherState(TypedDict):
    sub_topic: str

第二步:编写节点逻辑 (Nodes)

接下来,我们来实现 Agency 的三个核心员工:主编 (Planner)、研究员 (Researcher) 和作家 (Writer)。

# ==========================================
# 2. 定义节点逻辑 (Node Functions)
# ==========================================

def planner_node(state: AgencyState):
    """
    主编节点:负责将主话题拆解为多个子话题。
    The Planner: Breaks down the main topic into sub-topics.
    """
    print(f"👨‍💼 [Planner] 正在拆解主话题: '{state['main_topic']}'...")
    # 模拟 LLM 拆解逻辑
    mock_sub_topics = [
        f"{state['main_topic']} 的历史背景",
        f"{state['main_topic']} 的核心技术",
        f"{state['main_topic']} 的未来趋势"
    ]
    print(f"👨‍💼 [Planner] 拆解完成,生成了 {len(mock_sub_topics)} 个子话题。准备分发!")
    
    # 将拆解的子话题更新到 State 中
    return {"sub_topics": mock_sub_topics}


def researcher_node(state: ResearcherState):
    """
    研究员节点:负责针对【单个】子话题进行深度研究。
    The Researcher: Conducts deep research on a SINGLE sub-topic.
    """
    topic = state["sub_topic"]
    print(f"   🔍 [Researcher] 收到任务,开始搜索: '{topic}'...")
    
    # 模拟耗时的网络搜索和阅读过程 (用 sleep 模拟)
    # 注意:在真实的 LangGraph 异步执行中,这里的阻塞会被高效处理
    time.sleep(2) 
    
    mock_result = f"【关于 '{topic}' 的研究报告】:这是一些非常有价值的深度信息..."
    print(f"   ✅ [Researcher] 完成搜索: '{topic}'")
    
    # ⚠️ 核心点:返回字典的 key 必须与 AgencyState 中需要聚合的字段名一致
    # 因为我们使用了 operator.add,这里返回的列表会被追加到总列表中
    return {"research_results": [mock_result]}


def writer_node(state: AgencyState):
    """
    作家节点:收集所有研究员的报告,撰写最终文章。
    The Writer: Aggregates all research reports and writes the final article.
    """
    print(f"\n✍️ [Writer] 收到所有研究资料,开始疯狂码字...")
    results = state.get("research_results", [])
    
    print(f"✍️ [Writer] 共收到 {len(results)} 份研究报告。")
    
    # 模拟写作过程
    article = f"# {state['main_topic']} 深度解析\n\n"
    for i, res in enumerate(results, 1):
        article += f"## 章节 {i}\n{res}\n\n"
        
    print(f"✍️ [Writer] 完稿!")
    return {"final_article": article}

第三步:见证奇迹的路由 (The Dispatch Router)

这是 Fan-out 的灵魂所在。我们不直接用边连接 Planner 和 Researcher,而是使用一个条件路由函数,返回 Send 对象。

# ==========================================
# 3. 动态分发路由 (Dynamic Dispatch Router)
# ==========================================

def dispatch_researchers(state: AgencyState):
    """
    分发器:根据 Planner 生成的子话题数量,动态创建对应数量的 Researcher 实例。
    The Dispatcher: Dynamically creates Researcher instances based on sub_topics.
    """
    sub_topics = state.get("sub_topics", [])
    
    # ⚠️ 核心点:为每个子话题创建一个 Send 对象
    # Send("目标节点名称", 传递给该节点的 State Payload)
    sends = []
    for topic in sub_topics:
        sends.append(Send("researcher", {"sub_topic": topic}))
        
    return sends

第四步:组装图并运行 (Build & Run)

# ==========================================
# 4. 构建并编译 Graph (Build the Graph)
# ==========================================
builder = StateGraph(AgencyState)

# 添加节点
builder.add_node("planner", planner_node)
builder.add_node("researcher", researcher_node)
builder.add_node("writer", writer_node)

# 设置控制流
builder.add_edge(START, "planner")

# Planner 结束后,触发动态分发 (Fan-out)
builder.add_conditional_edges(
    "planner", 
    dispatch_researchers, 
    # 这里声明 Send 可能会路由到的节点
    ["researcher"] 
)

# 所有的 researcher 节点执行完毕后,统一流向 writer (Fan-in)
builder.add_edge("researcher", "writer")
builder.add_edge("writer", END)

# 编译图
graph = builder.compile()

# ==========================================
# 5. 测试运行 (Test Run)
# ==========================================
if __name__ == "__main__":
    print("🚀 启动 AI Content Agency 并行工作流...\n")
    
    initial_state = {
        "main_topic": "量子计算",
        "research_results": [] # 初始化为空列表
    }
    
    # 记录开始时间
    start_time = time.time()
    
    # 运行 Graph
    final_state = graph.invoke(initial_state)
    
    # 记录结束时间
    end_time = time.time()
    
    print("\n" + "="*40)
    print("🏆 最终产出文章预览:")
    print("="*40)
    print(final_state["final_article"])
    print("="*40)
    print(f"⏱️ 总耗时: {end_time - start_time:.2f} 秒")
    # 如果是串行,3个话题每个sleep 2秒,至少需要 6 秒。
    # 因为是并行,总耗时应该在 2 秒多一点!

当你运行这段代码时,你会看到控制台打印出如下迷人的输出:

🚀 启动 AI Content Agency 并行工作流...

👨‍💼 [Planner] 正在拆解主话题: '量子计算'...
👨‍💼 [Planner] 拆解完成,生成了 3 个子话题。准备分发!
   🔍 [Researcher] 收到任务,开始搜索: '量子计算 的历史背景'...
   🔍 [Researcher] 收到任务,开始搜索: '量子计算 的核心技术'...
   🔍 [Researcher] 收到任务,开始搜索: '量子计算 的未来趋势'...
   ✅ [Researcher] 完成搜索: '量子计算 的历史背景'
   ✅ [Researcher] 完成搜索: '量子计算 的未来趋势'
   ✅ [Researcher] 完成搜索: '量子计算 的核心技术'

✍️ [Writer] 收到所有研究资料,开始疯狂码字...
✍️ [Writer] 共收到 3 份研究报告。
✍️ [Writer] 完稿!

========================================
🏆 最终产出文章预览:
========================================
# 量子计算 深度解析

## 章节 1
【关于 '量子计算 的历史背景' 的研究报告】:这是一些非常有价值的深度信息...

## 章节 2
【关于 '量子计算 的核心技术' 的研究报告】:这是一些非常有价值的深度信息...

## 章节 3
【关于 '量子计算 的未来趋势' 的研究报告】:这是一些非常有价值的深度信息...

========================================
⏱️ 总耗时: 2.03 秒

同学们,看到最后的耗时了吗?2.03 秒! 3 个耗时 2 秒的任务并发执行,总时间依然是 2 秒。这就是并行分治的绝对力量。你的 Agency 现在的吞吐量已经达到了工业级水准。


坑与避坑指南

作为你们的导师,我不仅要教你们怎么写出能跑的代码,更要告诉你们在生产环境里会流哪些血。使用 并行机制 时,有三个“天坑”必须防范:

💣 坑一:忘记使用 Reducer 导致状态被无情覆写 (State Overwrite)

症状:派出了 5 个研究员,最后 Writer 只收到了一份报告。 诊断:在 TypedDict 中定义 research_results: list[str] 时,没有加 Annotated[..., operator.add]。LangGraph 默认行为是覆盖(Overwrite)。5 个并行节点同时返回 {"research_results": [...]},最后执行完的那个节点会把前面 4 个的心血全部抹杀。 解药:永远记得给 Fan-in 的目标字段加上 Reducer。

💣 坑二:并发风暴引发 API 限流 (Rate Limit 429 Error)

症状:Planner 拆解出了 20 个子话题,瞬间派出了 20 个 Researcher 请求 OpenAI API,直接被 OpenAI 报 HTTP 429 Too Many Requests 拒绝服务。 诊断:Fan-out 极其凶猛,缺乏并发控制。 解药:在生产环境中,如果你不知道 Planner 会生成多少个任务,千万不要裸奔。你应该在调用 LLM 的底层逻辑中加入重试机制(Retry with Exponential Backoff),比如使用 tenacity 库;或者在 LangGraph 的 RunnableConfig 中限制最大并发数(max_concurrency)。

💣 坑三:木桶效应与死锁风险 (The Slowest Intern Effect)

症状:总耗时异常长。 诊断:Fan-in 机制的特点是必须等待所有派出的 Send 节点都执行完毕,才会走向下一个节点(Writer)。如果小张、小李 2 秒查完了,但小王遇到了网络卡顿,卡了 60 秒,那么整个系统都要等小王 60 秒。 解药:给 Researcher 节点的 LLM 调用或搜索工具设置严格的 Timeout(超时限制)。如果某个 Researcher 超时,应捕获异常并返回一个“该话题暂无数据”的 fallback 结果,决不能让整个 Graph 挂起。


📝 本期小结

今天,我们为「AI 万能内容创作机构」装上了并行处理的涡轮增压引擎

我们学习了:

  1. Fan-out(扇出):通过条件边返回 Send 对象列表,动态克隆节点。
  2. Fan-in(扇入):通过 Annotatedoperator.add,安全地聚合并发节点产生的数据。
  3. 性能飞跃:将原本 $O(N)$ 的时间复杂度,在资源允许的情况下压缩到了近似 $O(1)$。

现在,我们的 Planner 运筹帷幄,Researcher 们并驾齐驱,Writer 下笔如有神。但是,如果你仔细看生成的文章,你会发现:如果 Researcher 查回来的资料是垃圾,Writer 就会写出垃圾代码(Garbage in, garbage out)。

机构里是不是还少了一个把关的人? 没错,下一期(第 12 期),我们将引入 Human-in-the-loop (人类在环) 机制。在 Writer 动笔之前,或者交稿之后,我们将让**真实世界的主编(你)**介入工作流,进行审批、打回或修改。

让 Agent 狂奔的同时,把方向盘握在人类手里。我们下期见!👋