第 10 期 | 精细流式输出:看着 Graph 怎么把活干完

更新于 2026/4/14

逐节点输出结果 ( stream_mode="updates" ) 让你实时观察到"主编打回"的每一次内部动态。

欢迎回到我们的《LangGraph 多智能体专家课》。我是你们的导师。

在前面的课程中,我们已经为「AI 万能内容创作机构 (AI Content Agency)」搭建了相当不错的班底:Planner 负责拆解任务,Researcher 负责搜集资料,Writer 负责奋笔疾书,Editor 负责严把质量关。

但是,最近有同学在群里跟我抱怨:“老师,我的多智能体系统跑起来太像一个黑盒了!特别是当 Editor(主编)对 Writer(写手)的文章不满意,打回去重写时,整个 Graph 可能会在后台循环跑上好几分钟。这期间前端页面毫无反应,老板盯着屏幕问我‘是不是死机了’,我只能尴尬地擦汗。”

这是一个非常经典的高级工程问题:可观测性与用户体验 (UX)

在传统的单次 LLM 调用中,我们可以用 Token 级别的流式输出(打字机效果)来缓解用户的焦虑。但在多智能体工作流(Graph)中,真正的“大动作”发生在节点流转之间。今天,我们就要撕开这个黑盒,利用 LangGraph 强大的 stream_mode="updates" 功能,让 Graph 的每一步执行都暴露在阳光下!


🎯 本期学习目标

上完这节课,你将不再是一个只能干等程序运行结束的“小白”,而是能精准掌控 Graph 运行节奏的架构师。具体收益如下:

  1. 破除黑盒焦虑:深刻理解多智能体执行过程中“状态切片”的概念。
  2. 掌握核心 API:彻底搞懂 stream_mode="updates"values 的底层差异。
  3. 实战 Agency 工作流:在 Writer 和 Editor 的相爱相杀(循环打回)中,精准捕获每一次节点状态更新,为你的前端提供实时反馈的数据源。
  4. 提升架构品味:学会如何优雅地解析 LangGraph 的生成器 (Generator) 输出。

📖 原理解析

在讲代码之前,我们先来聊聊“道”。

在 LangGraph 中,当我们调用 graph.invoke() 时,系统会憋着一口气,直到整个图走到 END 节点,才把最终的 State 吐给你。这就像你去餐厅点了一道“佛跳墙”,你在大厅等了两个小时,最后服务员端上来一锅成品。

但如果你调用的是 graph.stream(),你就像是坐在了开放式厨房的吧台前。

LangGraph 提供了几种不同的 stream_mode,最核心的有两个:

  • stream_mode="values":每次状态更新时,把完整的当前状态全量推给你。(相当于每做完一步,把整个厨房的现状拍张全景照片发给你)。
  • stream_mode="updates":每次节点执行完毕,只把该节点对状态做出的增量修改推给你。(相当于厨师大喊:“切菜岗完成,增加了一盘切好的葱花!”)。

在多智能体协同中,updates 模式是我们最常用的利器。因为它能让我们清晰地知道:“刚刚是谁干了活?干了什么活?”

让我们用一张 Mermaid 图来看看,在我们的 AI Content Agency 中,当引入了 stream_mode="updates" 后,数据流是怎么走的:

sequenceDiagram
    participant User as 用户 (Client)
    participant Graph as LangGraph 引擎
    participant Writer as 节点: Writer (写手)
    participant Editor as 节点: Editor (主编)

    User->>Graph: 发起任务 stream(..., stream_mode="updates")
    
    Graph->>Writer: 1. 执行写作
    Writer-->>Graph: 返回增量: {draft: "初稿v1", revision_count: 1}
    Graph-->>User: ⚡ yield {"Writer": {draft: "初稿v1", revision_count: 1}}
    
    Graph->>Editor: 2. 检查初稿
    Editor-->>Graph: 返回增量: {feedback: "缺乏深度", status: "REJECTED"}
    Graph-->>User: ⚡ yield {"Editor": {feedback: "缺乏深度", status: "REJECTED"}}
    
    Note over Graph, Editor: 条件边判断:被打回,流转回 Writer
    
    Graph->>Writer: 3. 根据反馈重写
    Writer-->>Graph: 返回增量: {draft: "二稿v2", revision_count: 2}
    Graph-->>User: ⚡ yield {"Writer": {draft: "二稿v2", revision_count: 2}}
    
    Graph->>Editor: 4. 再次检查
    Editor-->>Graph: 返回增量: {status: "APPROVED"}
    Graph-->>User: ⚡ yield {"Editor": {status: "APPROVED"}}
    
    Note over Graph, Editor: 条件边判断:通过,流转至 END
    
    Graph-->>User: 结束 Stream

看明白了吗?每一次 ⚡ yield,就是 LangGraph 向外部吐出的一次心跳。外部的业务代码只要监听这些心跳,就可以在前端画出漂亮的进度条或者实时日志:“主编正在审稿...”、“主编打回了文章,写手正在进行第 2 次修改...”。


💻 实战代码演练

废话不多说,Show me the code。 我们将用 Python 构建这段“写手与主编”的循环工作流,并用 stream_mode="updates" 将其捕获。

为了让大家直接能跑通,这里的 LLM 调用我用了模拟函数(Mock),重点展示流式输出的架构

1. 定义状态与节点逻辑

import time
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator

# ==========================================
# 1. 定义我们 Agency 的全局状态 (State)
# ==========================================
class AgencyState(TypedDict):
    topic: str
    draft: str
    feedback: str
    revision_count: Annotated[int, operator.add] # 使用 reducer,每次更新累加
    status: str # "PENDING", "REJECTED", "APPROVED"

# ==========================================
# 2. 定义节点 (Nodes)
# ==========================================
def writer_node(state: AgencyState):
    """写手节点:负责根据主题或反馈写文章"""
    topic = state.get("topic")
    feedback = state.get("feedback", "")
    current_count = state.get("revision_count", 0)
    
    # 模拟 LLM 思考与写作的耗时
    time.sleep(1.5) 
    
    if current_count == 0:
        new_draft = f"【初稿】关于 {topic} 的文章。内容比较浅显。"
    else:
        new_draft = f"【第{current_count + 1}稿】关于 {topic} 的文章。已根据反馈『{feedback}』进行了深度优化。"
        
    # 返回增量更新 (Updates)
    return {
        "draft": new_draft,
        "revision_count": 1 # 因为用了 operator.add,这里返回 1 就会在原基础上 +1
    }

def editor_node(state: AgencyState):
    """主编节点:负责审核文章"""
    draft = state.get("draft")
    current_count = state.get("revision_count", 0)
    
    # 模拟主编审稿耗时
    time.sleep(1)
    
    # 业务逻辑:前两次强制打回,第三次才给过 (模拟严苛的主编)
    if current_count < 3:
        return {
            "feedback": f"第 {current_count} 稿深度不够,给我回去重写!",
            "status": "REJECTED"
        }
    else:
        return {
            "feedback": "这次写得不错,可以发布了。",
            "status": "APPROVED"
        }

# ==========================================
# 3. 定义条件边 (Conditional Edge)
# ==========================================
def check_approval(state: AgencyState):
    """判断主编是否通过"""
    if state.get("status") == "APPROVED":
        return "approved"
    return "rejected"

2. 组装 Graph 并开启流式观测

接下来是见证奇迹的时刻。我们将图拼装起来,并使用 stream 方法。

# ==========================================
# 4. 组装 Graph
# ==========================================
workflow = StateGraph(AgencyState)

workflow.add_node("Writer", writer_node)
workflow.add_node("Editor", editor_node)

workflow.set_entry_point("Writer")

workflow.add_edge("Writer", "Editor")

# 如果打回,回到 Writer;如果通过,走向 END
workflow.add_conditional_edges(
    "Editor",
    check_approval,
    {
        "rejected": "Writer",
        "approved": END
    }
)

app = workflow.compile()

# ==========================================
# 5. 见证奇迹:使用 stream_mode="updates"
# ==========================================
def run_agency_with_stream():
    print("🚀 [Agency 系统启动] 接收到客户需求...")
    initial_state = {
        "topic": "LangGraph 流式输出原理解析",
        "revision_count": 0
    }
    
    # 核心在这里!调用 app.stream 并指定模式
    # app.stream 返回的是一个 Python 生成器 (Generator)
    stream_generator = app.stream(initial_state, stream_mode="updates")
    
    step = 1
    for event in stream_generator:
        # event 的结构是一个字典:{ "节点名称": { 状态增量 } }
        print(f"\n--- ⏳ 步骤 {step} ---")
        
        # 遍历 event 拿到节点名和输出
        for node_name, node_update in event.items():
            print(f"👀 观测到节点执行完毕: 【{node_name}】")
            
            # 根据不同的节点,我们可以做不同的前端 UI 渲染
            if node_name == "Writer":
                print(f"✍️ 写手提交了新稿件: {node_update.get('draft')}")
                print(f"🔄 当前修改次数: {node_update.get('revision_count')} (这是增量累加后的结果)")
                
            elif node_name == "Editor":
                status = node_update.get('status')
                if status == "REJECTED":
                    print(f"😡 主编大发雷霆,打回了稿件!反馈意见: {node_update.get('feedback')}")
                else:
                    print(f"🎉 主编非常满意,审核通过!反馈意见: {node_update.get('feedback')}")
                    
        step += 1
        time.sleep(0.5) # 稍微停顿一下,方便肉眼观察输出效果

    print("\n✅ [Agency 系统完毕] 最终文章已交付客户!")

if __name__ == "__main__":
    run_agency_with_stream()

3. 运行效果展示(终端输出模拟)

当你运行上面的代码时,你不再需要死等 5 秒钟才看到最终结果。你会看到终端每隔一两秒就会蹦出一条日志,生动地展示内部的“宫斗”戏码:

🚀 [Agency 系统启动] 接收到客户需求...

--- ⏳ 步骤 1 ---
👀 观测到节点执行完毕: 【Writer】
✍️ 写手提交了新稿件: 【初稿】关于 LangGraph 流式输出原理解析 的文章。内容比较浅显。
🔄 当前修改次数: 1 (这是增量累加后的结果)

--- ⏳ 步骤 2 ---
👀 观测到节点执行完毕: 【Editor】
😡 主编大发雷霆,打回了稿件!反馈意见: 第 1 稿深度不够,给我回去重写!

--- ⏳ 步骤 3 ---
👀 观测到节点执行完毕: 【Writer】
✍️ 写手提交了新稿件: 【第2稿】关于 LangGraph 流式输出原理解析 的文章。已根据反馈『第 1 稿深度不够,给我回去重写!』进行了深度优化。
🔄 当前修改次数: 2 (这是增量累加后的结果)

--- ⏳ 步骤 4 ---
👀 观测到节点执行完毕: 【Editor】
😡 主编大发雷霆,打回了稿件!反馈意见: 第 2 稿深度不够,给我回去重写!

--- ⏳ 步骤 5 ---
👀 观测到节点执行完毕: 【Writer】
✍️ 写手提交了新稿件: 【第3稿】关于 LangGraph 流式输出原理解析 的文章。已根据反馈『第 2 稿深度不够,给我回去重写!』进行了深度优化。
🔄 当前修改次数: 3 (这是增量累加后的结果)

--- ⏳ 步骤 6 ---
👀 观测到节点执行完毕: 【Editor】
🎉 主编非常满意,审核通过!反馈意见: 这次写得不错,可以发布了。

✅ [Agency 系统完毕] 最终文章已交付客户!

试想一下,如果你通过 WebSocket 把这些数据推送到前端,你的用户体验会有多棒?用户可以像看进度条一样,看着 AI 团队一步步为他打磨作品,这种“参与感”和“透明度”是单体 LLM 无法提供的。


坑与避坑指南

作为拥有 10 年架构经验的老兵,我必须在这个环节敲打敲打你们。流式输出虽然爽,但坑也不少:

💣 坑一:混淆 updatesvalues 导致前端数据错乱

现象:前端拿到的状态有时候缺字段,有时候又是全的。 解法:记住,updates 只返回当前节点 return 的字典。如果你在 writer_node 里只 return {"draft": "xxx"},那么在 updates 模式下,你拿到的 event 里就没有 topicstatus。如果你需要每次都拿到全量状态来渲染页面,请使用 stream_mode="values";如果你只是想做事件触发(比如弹出一个 Toast 提示“主编已审核”),用 updates 更轻量。

💣 坑二:把“节点级流式”和“Token级流式”搞混

现象:老板说想要 ChatGPT 那种一个字一个字蹦出来的效果,你却只做到了一个节点一个节点蹦出来的效果。 解法:这是完全两个维度的东西!

  • 节点级流式(本节课讲的):关注的是 Agent 工作流走到哪一步了。
  • Token级流式:关注的是某个具体大模型生成文本的过程。 在高级架构中,我们通常会结合两者。即:在图流转时使用 stream_mode="updates" 报告进度,同时在 writer_node 内部调用 LLM 时,开启 LLM 的异步 Token Streaming 推送给前端。这部分我们在后续的《高阶通讯篇》会专门拉一节课讲。

💣 坑三:Reducer 累加器的重复触发

现象revision_count 莫名其妙变成了 4、8、16。 解法:在使用 Annotated[int, operator.add] 时,你要清楚每一次节点返回数字,都会在原基础上相加。如果你在 editor_node 里不小心也 return {"revision_count": 1},那就会导致双重累加。严格划分每个节点对 State 中特定字段的写权限,是多智能体架构设计的核心原则。


📝 本期小结

今天,我们为 Agency 项目接入了“上帝视角”。

通过 LangGraph 的 stream_mode="updates",我们将一个原本耗时极长、容易让人失去耐心的黑盒系统,改造成了一个步骤清晰、状态透明的白盒系统。这不仅是代码层面的优化,更是产品体验上的降维打击

在实际的商业落地中,能够把“主编打回重写”这种内部博弈暴露给用户,不仅不会让用户觉得系统笨,反而会让用户觉得你的 AI “很努力”、“很专业”。这,就是技术反哺产品的最佳体现。

下节课,我们将进入一个更刺激的话题:Human-in-the-loop (人类在环)。 如果主编 AI 拿捏不准,怎么让真实的人类老板介入审批?Graph 是如何做到“暂停执行,等待人类输入”的?

敬请期待第 11 期。同学们,下课!记得把今天的代码在本地跑一遍,体会一下生成器吐出数据的快感!