第 12 期 | Reducers 的黑魔法:如何安全的增量更新数据

更新于 2026/4/14

使用 operator.add 和自定义聚合函数解决多节点并发写入 Array 问题

各位未来的顶尖 AI 架构师们,欢迎回到我们的《LangGraph 多智能体专家课》!我是你们的老朋友。

在上一期中,我们成功让咱们的「AI 万能内容创作机构 (AI Content Agency)」运转了起来。我们的 Planner(主编)可以把任务拆解,交给 Researcher(研究员)去搜集资料,最后丢给 Writer(主笔)成文。

但是,不知道你们在课后自己跑代码的时候,有没有遇到一个让人血压飙升的灵异事件: “明明我派出了 3 个 Researcher 分别去 Google、Twitter 和 Arxiv 搜集资料,为什么最后传给 Writer 的资料,永远只有其中 1 个人的?!另外 2 个人的劳动成果被狗吃了吗?”

恭喜你,你踩到了多智能体并发状态管理中最经典的坑——状态覆盖 (State Overwrite)

今天这节课,我就带你们揭开 LangGraph 中最优雅、但也最容易被新手忽略的黑魔法:Reducers(归约器)。我们将彻底解决多节点并发写入的问题,让你的智能体团队学会“团队协作”,而不是“互相拆台”。


🎯 本期学习目标

在今天这堂课结束时,你将掌握以下硬核技能:

  1. 理解 State 更新的底层逻辑:搞懂 LangGraph 默认的“最后写入者赢 (Last Write Wins)”机制为何会导致数据丢失。
  2. 掌握内建 Reducer 的使用:熟练运用 typing.Annotatedoperator.add 实现列表的线程安全增量追加。
  3. 编写高阶自定义 Reducer:手写聚合函数,不仅能合并数据,还能在合并时完成“去重”、“空值过滤”等数据清洗工作。
  4. 重构 Agency 的并发搜索流:将我们的 Researcher 升级为多线程并发搜索架构,为下一步的复杂成文打下坚实的数据基础。

📖 原理解析

1. 为什么你的数据会丢失?(The Overwrite Problem)

在 LangGraph 中,整个系统的灵魂就是 State(状态)。你可以把 State 想象成一张在各个 Agent 之间传递的“共享白板”。

默认情况下,当一个节点执行完毕并返回一个字典时,LangGraph 会拿着这个字典去更新白板。它的默认更新逻辑极其简单粗暴:直接覆盖 (Overwrite)

想象一下这个场景: Planner 说:“去查一下关于‘量子计算’的最新进展。” 于是系统并发启动了三个 Researcher:

  • Researcher_Google 查到了 A,返回 {"research_materials": ["A"]}
  • Researcher_Arxiv 查到了 B,返回 {"research_materials": ["B"]}
  • Researcher_Twitter 查到了 C,返回 {"research_materials": ["C"]}

因为它们是并发执行的,谁最后把结果拍在白板上,白板上就只剩下谁的结果。如果 Twitter 节点晚了 1 毫秒执行完,那么 research_materials 的值就会被覆盖成 ["C"]。A 和 B 彻底灰飞烟灭。

这就是经典的竞态条件 (Race Condition)

2. Reducer 的黑魔法:从“覆盖”到“合并”

为了解决这个问题,LangGraph 引入了 Reducer 的概念。 Reducer 的本质是一个拦截器/聚合器。它告诉 LangGraph:“嘿,当有节点想要更新这个字段时,不要直接覆盖,请用我提供的方法,把新数据和老数据揉在一起。”

在 Python 中,我们通过 typing.Annotated 来为 State 的字段绑定 Reducer。

我们来看一下我们今天要重构的 AI Agency 核心工作流:

graph TD
    classDef planner fill:#f9f0ff,stroke:#d8b4e2,stroke-width:2px;
    classDef researcher fill:#e6f7ff,stroke:#91d5ff,stroke-width:2px;
    classDef writer fill:#f6ffed,stroke:#b7eb8f,stroke-width:2px;
    classDef state fill:#fffb8f,stroke:#d4b106,stroke-width:2px,stroke-dasharray: 5 5;

    START((开始)) --> Planner[Planner Agent
生成多个搜索关键词]:::planner Planner --> |Fan-out 并发| R1[Researcher: 谷歌搜索]:::researcher Planner --> |Fan-out 并发| R2[Researcher: 论文检索]:::researcher Planner --> |Fan-out 并发| R3[Researcher: 社交媒体]:::researcher R1 -.-> |返回结果 A| State_Materials[(State: research_materials
Reducer: operator.add)]:::state R2 -.-> |返回结果 B| State_Materials R3 -.-> |返回结果 C| State_Materials State_Materials -.-> |Fan-in 聚合: A+B+C| Writer[Writer Agent
根据所有材料撰写初稿]:::writer Writer --> END((结束))

在上面的架构中,当并发的 Researcher 返回结果时,带有 Reducer 的 research_materials 字段会像一个蓄水池一样,把 A、B、C 全部安全地收集起来,最后作为一个完整的列表喂给 Writer。


💻 实战代码演练

废话不多说,Show me the code。我们将使用 Python 来实现这个带有并发 Researcher 的 AI Agency 流程。

第一步:环境与基础包导入

import operator
from typing import Annotated, TypedDict, List, Any
from langgraph.graph import StateGraph, START, END

# 模拟一下 LLM 的调用延迟
import time
import random

第二步:定义带有 Reducer 的 State(核心重点!)

敲黑板!这里是本节课的核心。我们要定义一个 AgencyState

# --- 自定义 Reducer 函数 ---
# 为什么不直接用 operator.add?
# 因为 operator.add 遇到 None + List 会直接报错 TypeError!
# 在真实业务中,某个 Researcher 可能会失败返回 None,我们需要更鲁棒的合并逻辑。
def safe_merge_materials(left: List[str], right: List[str] | None) -> List[str]:
    """
    自定义的 Reducer:安全合并列表,并附带去重功能。
    left: 状态中原本存在的数据
    right: 节点新返回的数据
    """
    if not left:
        left = []
    if not right:
        return left
    
    # 合并并去重,保持基本顺序(真实的去重可能更复杂,这里做简单演示)
    merged = left + right
    # 使用 dict.fromkeys 去重同时保持顺序
    return list(dict.fromkeys(merged))

# --- 定义图的状态 ---
class AgencyState(TypedDict):
    # 任务主题(普通字段,默认覆盖)
    topic: str 
    
    # 分配给 Researcher 的具体搜索任务(普通字段,覆盖)
    search_tasks: List[str] 
    
    # 收集到的素材(魔法字段!使用 Annotated 绑定自定义的 safe_merge_materials)
    # 当任何节点返回 {"research_materials": ["新数据"]} 时,
    # LangGraph 会自动执行: new_state = safe_merge_materials(old_state, "新数据")
    research_materials: Annotated[List[str], safe_merge_materials]
    
    # 最终成文(普通字段,覆盖)
    final_article: str

讲师锐评: 看到 Annotated 的威力了吗?很多新手直接写 research_materials: List[str],然后死活搞不懂为什么并发时数据会丢。记住,在 LangGraph 中,想增量更新,必用 Annotated!

第三步:编写节点逻辑 (Nodes)

接下来,我们实现 Planner、多个 Researcher 和 Writer。为了让你看清执行顺序,我在代码里加了详细的打印。

def planner_node(state: AgencyState):
    """
    Planner 节点:接收大主题,拆分成多个具体的搜索任务。
    """
    print(f"👨‍💼 [Planner] 正在拆解主题: {state['topic']}")
    
    # 模拟 LLM 思考过程,直接硬编码拆解结果
    tasks = [
        f"{state['topic']} 的历史发展",
        f"{state['topic']} 的最新技术突破",
        f"{state['topic']} 的商业应用案例"
    ]
    
    print(f"👨‍💼 [Planner] 拆解完毕,派发 {len(tasks)} 个并发搜索任务。")
    # 这里返回的是普通的覆盖型字段
    return {"search_tasks": tasks}

def researcher_node(state: AgencyState, task: str):
    """
    Researcher 节点:这其实是一个节点工厂,我们会根据任务动态生成节点。
    注意:在实际的 LangGraph 中,并发通常结合 Send API 来做动态路由。
    为了本期 Reducer 概念的清晰,我们使用静态并发演示。
    """
    print(f"🔍 [Researcher] 开始搜索子任务: {task}...")
    # 模拟网络请求延迟
    time.sleep(random.uniform(0.5, 1.5))
    
    # 模拟搜索到的数据
    mock_result = f"关于【{task}】的深度报告片段"
    
    # 有时候搜索可能会失败(模拟空数据,测试我们的 safe_merge_materials)
    if random.random() < 0.1: 
        print(f"⚠️ [Researcher] 搜索 {task} 失败,未找到资料。")
        return {"research_materials": None}
        
    print(f"✅ [Researcher] 搜索完成: {task}")
    
    # 重点:往 research_materials 字段写入。
    # 因为绑了 Reducer,这里的列表会被追加到原有列表中,而不是覆盖!
    return {"research_materials": [mock_result]}

def writer_node(state: AgencyState):
    """
    Writer 节点:收集所有的素材,写出最终文章。
    """
    materials = state.get("research_materials", [])
    print(f"\n✍️ [Writer] 收到 {len(materials)} 份研究素材,开始撰写...")
    
    if not materials:
        article = "抱歉,由于缺乏素材,无法成文。"
    else:
        # 模拟成文过程
        content = "\n".join([f"- {m}" for m in materials])
        article = f"《关于 {state['topic']} 的深度解析》\n\n综合多方资料,我们得出以下结论:\n{content}\n\n(全文完)"
        
    print(f"✍️ [Writer] 撰写完毕!")
    return {"final_article": article}

第四步:组装 Graph 并处理并发

这里有一个 LangGraph 的高阶技巧:如何让节点并行执行?答案是:从同一个节点出发,连多条边到不同的节点,然后再汇总到一个节点。

def build_agency_graph():
    builder = StateGraph(AgencyState)
    
    # 添加节点
    builder.add_node("planner", planner_node)
    
    # 为了演示并发,我们手动创建 3 个固定的 Researcher 节点实例
    # 真实场景中,你会使用 Send() API 进行动态并发(我们会在后续课程讲)
    builder.add_node("researcher_1", lambda state: researcher_node(state, state["search_tasks"][0]))
    builder.add_node("researcher_2", lambda state: researcher_node(state, state["search_tasks"][1]))
    builder.add_node("researcher_3", lambda state: researcher_node(state, state["search_tasks"][2]))
    
    builder.add_node("writer", writer_node)
    
    # 定义控制流 (Edges)
    builder.add_edge(START, "planner")
    
    # Fan-out: Planner 结束后,同时触发 3 个 Researcher
    builder.add_edge("planner", "researcher_1")
    builder.add_edge("planner", "researcher_2")
    builder.add_edge("planner", "researcher_3")
    
    # Fan-in: 3 个 Researcher 都结束后,才触发 Writer
    # 注意:LangGraph 会自动等待所有前置节点完成才会执行目标节点
    builder.add_edge(["researcher_1", "researcher_2", "researcher_3"], "writer")
    
    builder.add_edge("writer", END)
    
    return builder.compile()

# --- 模拟运行 ---
if __name__ == "__main__":
    app = build_agency_graph()
    
    print("🚀 启动 AI Content Agency 工作流...\n" + "="*40)
    
    initial_state = {"topic": "2024年室温超导"}
    
    # 运行整个图
    final_state = app.invoke(initial_state)
    
    print("\n" + "="*40)
    print("🎉 最终交付结果:\n")
    print(final_state["final_article"])
    print("="*40)

当你运行这段代码时,你会看到类似这样的输出:

🚀 启动 AI Content Agency 工作流...
========================================
👨‍💼 [Planner] 正在拆解主题: 2024年室温超导
👨‍💼 [Planner] 拆解完毕,派发 3 个并发搜索任务。
🔍 [Researcher] 开始搜索子任务: 2024年室温超导 的历史发展...
🔍 [Researcher] 开始搜索子任务: 2024年室温超导 的最新技术突破...
🔍 [Researcher] 开始搜索子任务: 2024年室温超导 的商业应用案例...
✅ [Researcher] 搜索完成: 2024年室温超导 的历史发展
✅ [Researcher] 搜索完成: 2024年室温超导 的商业应用案例
✅ [Researcher] 搜索完成: 2024年室温超导 的最新技术突破

✍️ [Writer] 收到 3 份研究素材,开始撰写...
✍️ [Writer] 撰写完毕!

========================================
🎉 最终交付结果:

《关于 2024年室温超导 的深度解析》

综合多方资料,我们得出以下结论:
- 关于【2024年室温超导 的历史发展】的深度报告片段
- 关于【2024年室温超导 的商业应用案例】的深度报告片段
- 关于【2024年室温超导 的最新技术突破】的深度报告片段

(全文完)
========================================

看!3 份资料,一份都没少!这就是 Reducer 的魔力。


坑与避坑指南

作为你们的导师,我绝不能只教你们写 Demo。在实际的生产环境中,Reducer 是最容易引发 Bug 的地方。以下是我总结的 3 个高频踩坑点,务必记在你的小本本上

💣 坑一:盲目信任 operator.add

很多教程(包括官方的早期文档)会教你直接写 Annotated[list, operator.add]危险! 如果某个节点因为网络异常、LLM 幻觉等原因返回了 {"research_materials": None}operator.add(list, None) 会直接抛出 TypeError,导致整个图崩溃。 避坑法则:永远像我上面的代码一样,写一个 safe_merge 包装函数,处理好 None 的情况。

💣 坑二:在 Reducer 中进行原地修改 (In-place Mutation)

如果你在自定义 Reducer 里写了类似 left.append(right) 或者 left.extend(right) 的代码,然后 return left。在某些并发或带有历史回溯 (Time Travel) 的场景下,你会遇到极其诡异的幽灵 Bug。 避坑法则:Reducer 必须是纯函数 (Pure Function)!永远返回一个新的对象。用 left + right 会创建一个新列表,这是安全的;用 append 是危险的。

💣 坑三:以为 Reducer 只对并发有效

这是一个常见的认知误区。Reducer 不仅仅是为了并发存在的。即使你是线性的流程(Node A -> Node B -> Node C),如果你想让每个节点都往一个列表中追加日志或消息记录(比如 messages 列表),你也必须使用 Reducer。没有 Reducer,后一个节点的数据永远会覆盖前一个。


📝 本期小结

今天,我们深入探讨了 LangGraph 中 State 更新的底层逻辑。

  1. 我们明白了,没有魔法的 State 只是一个无情的“覆盖机器”。
  2. 我们引入了 AnnotatedReducer,赋予了 State 增量更新的能力。
  3. 我们手写了安全的列表合并函数,完美解决了 AI Agency 中多个 Researcher 并发搜集资料导致的数据丢失问题。

现在,我们的 AI Content Agency 已经具备了强大的多线程并发信息采集能力

但是,细心的同学可能发现了:我们今天的并发是“静态”的,也就是 Planner 必须固定分配 3 个任务给 3 个写死的节点。如果 Planner 动态决定要搜集 5 个或者 10 个方向呢?

在下一期 《第 13 期 | Send API 的降维打击:如何实现真正的动态并发路由》 中,我将带你们解锁 LangGraph 的终极并发武器,让你的 Agent 具备真正的动态伸缩能力!

各位架构师,代码记得敲,我们下期见!🚀