第 10 期 | 精细流式输出:看着 Graph 怎么把活干完
逐节点输出结果 ( stream_mode="updates" ) 让你实时观察到"主编打回"的每一次内部动态。
欢迎回到我们的《LangGraph 多智能体专家课》。我是你们的导师。
在前面的课程中,我们已经为「AI 万能内容创作机构 (AI Content Agency)」搭建了相当不错的班底:Planner 负责拆解任务,Researcher 负责搜集资料,Writer 负责奋笔疾书,Editor 负责严把质量关。
但是,最近有同学在群里跟我抱怨:“老师,我的多智能体系统跑起来太像一个黑盒了!特别是当 Editor(主编)对 Writer(写手)的文章不满意,打回去重写时,整个 Graph 可能会在后台循环跑上好几分钟。这期间前端页面毫无反应,老板盯着屏幕问我‘是不是死机了’,我只能尴尬地擦汗。”
这是一个非常经典的高级工程问题:可观测性与用户体验 (UX)。
在传统的单次 LLM 调用中,我们可以用 Token 级别的流式输出(打字机效果)来缓解用户的焦虑。但在多智能体工作流(Graph)中,真正的“大动作”发生在节点流转之间。今天,我们就要撕开这个黑盒,利用 LangGraph 强大的 stream_mode="updates" 功能,让 Graph 的每一步执行都暴露在阳光下!
🎯 本期学习目标
上完这节课,你将不再是一个只能干等程序运行结束的“小白”,而是能精准掌控 Graph 运行节奏的架构师。具体收益如下:
- 破除黑盒焦虑:深刻理解多智能体执行过程中“状态切片”的概念。
- 掌握核心 API:彻底搞懂
stream_mode="updates"与values的底层差异。 - 实战 Agency 工作流:在 Writer 和 Editor 的相爱相杀(循环打回)中,精准捕获每一次节点状态更新,为你的前端提供实时反馈的数据源。
- 提升架构品味:学会如何优雅地解析 LangGraph 的生成器 (Generator) 输出。
📖 原理解析
在讲代码之前,我们先来聊聊“道”。
在 LangGraph 中,当我们调用 graph.invoke() 时,系统会憋着一口气,直到整个图走到 END 节点,才把最终的 State 吐给你。这就像你去餐厅点了一道“佛跳墙”,你在大厅等了两个小时,最后服务员端上来一锅成品。
但如果你调用的是 graph.stream(),你就像是坐在了开放式厨房的吧台前。
LangGraph 提供了几种不同的 stream_mode,最核心的有两个:
stream_mode="values":每次状态更新时,把完整的当前状态全量推给你。(相当于每做完一步,把整个厨房的现状拍张全景照片发给你)。stream_mode="updates":每次节点执行完毕,只把该节点对状态做出的增量修改推给你。(相当于厨师大喊:“切菜岗完成,增加了一盘切好的葱花!”)。
在多智能体协同中,updates 模式是我们最常用的利器。因为它能让我们清晰地知道:“刚刚是谁干了活?干了什么活?”
让我们用一张 Mermaid 图来看看,在我们的 AI Content Agency 中,当引入了 stream_mode="updates" 后,数据流是怎么走的:
sequenceDiagram
participant User as 用户 (Client)
participant Graph as LangGraph 引擎
participant Writer as 节点: Writer (写手)
participant Editor as 节点: Editor (主编)
User->>Graph: 发起任务 stream(..., stream_mode="updates")
Graph->>Writer: 1. 执行写作
Writer-->>Graph: 返回增量: {draft: "初稿v1", revision_count: 1}
Graph-->>User: ⚡ yield {"Writer": {draft: "初稿v1", revision_count: 1}}
Graph->>Editor: 2. 检查初稿
Editor-->>Graph: 返回增量: {feedback: "缺乏深度", status: "REJECTED"}
Graph-->>User: ⚡ yield {"Editor": {feedback: "缺乏深度", status: "REJECTED"}}
Note over Graph, Editor: 条件边判断:被打回,流转回 Writer
Graph->>Writer: 3. 根据反馈重写
Writer-->>Graph: 返回增量: {draft: "二稿v2", revision_count: 2}
Graph-->>User: ⚡ yield {"Writer": {draft: "二稿v2", revision_count: 2}}
Graph->>Editor: 4. 再次检查
Editor-->>Graph: 返回增量: {status: "APPROVED"}
Graph-->>User: ⚡ yield {"Editor": {status: "APPROVED"}}
Note over Graph, Editor: 条件边判断:通过,流转至 END
Graph-->>User: 结束 Stream看明白了吗?每一次 ⚡ yield,就是 LangGraph 向外部吐出的一次心跳。外部的业务代码只要监听这些心跳,就可以在前端画出漂亮的进度条或者实时日志:“主编正在审稿...”、“主编打回了文章,写手正在进行第 2 次修改...”。
💻 实战代码演练
废话不多说,Show me the code。
我们将用 Python 构建这段“写手与主编”的循环工作流,并用 stream_mode="updates" 将其捕获。
为了让大家直接能跑通,这里的 LLM 调用我用了模拟函数(Mock),重点展示流式输出的架构。
1. 定义状态与节点逻辑
import time
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator
# ==========================================
# 1. 定义我们 Agency 的全局状态 (State)
# ==========================================
class AgencyState(TypedDict):
topic: str
draft: str
feedback: str
revision_count: Annotated[int, operator.add] # 使用 reducer,每次更新累加
status: str # "PENDING", "REJECTED", "APPROVED"
# ==========================================
# 2. 定义节点 (Nodes)
# ==========================================
def writer_node(state: AgencyState):
"""写手节点:负责根据主题或反馈写文章"""
topic = state.get("topic")
feedback = state.get("feedback", "")
current_count = state.get("revision_count", 0)
# 模拟 LLM 思考与写作的耗时
time.sleep(1.5)
if current_count == 0:
new_draft = f"【初稿】关于 {topic} 的文章。内容比较浅显。"
else:
new_draft = f"【第{current_count + 1}稿】关于 {topic} 的文章。已根据反馈『{feedback}』进行了深度优化。"
# 返回增量更新 (Updates)
return {
"draft": new_draft,
"revision_count": 1 # 因为用了 operator.add,这里返回 1 就会在原基础上 +1
}
def editor_node(state: AgencyState):
"""主编节点:负责审核文章"""
draft = state.get("draft")
current_count = state.get("revision_count", 0)
# 模拟主编审稿耗时
time.sleep(1)
# 业务逻辑:前两次强制打回,第三次才给过 (模拟严苛的主编)
if current_count < 3:
return {
"feedback": f"第 {current_count} 稿深度不够,给我回去重写!",
"status": "REJECTED"
}
else:
return {
"feedback": "这次写得不错,可以发布了。",
"status": "APPROVED"
}
# ==========================================
# 3. 定义条件边 (Conditional Edge)
# ==========================================
def check_approval(state: AgencyState):
"""判断主编是否通过"""
if state.get("status") == "APPROVED":
return "approved"
return "rejected"
2. 组装 Graph 并开启流式观测
接下来是见证奇迹的时刻。我们将图拼装起来,并使用 stream 方法。
# ==========================================
# 4. 组装 Graph
# ==========================================
workflow = StateGraph(AgencyState)
workflow.add_node("Writer", writer_node)
workflow.add_node("Editor", editor_node)
workflow.set_entry_point("Writer")
workflow.add_edge("Writer", "Editor")
# 如果打回,回到 Writer;如果通过,走向 END
workflow.add_conditional_edges(
"Editor",
check_approval,
{
"rejected": "Writer",
"approved": END
}
)
app = workflow.compile()
# ==========================================
# 5. 见证奇迹:使用 stream_mode="updates"
# ==========================================
def run_agency_with_stream():
print("🚀 [Agency 系统启动] 接收到客户需求...")
initial_state = {
"topic": "LangGraph 流式输出原理解析",
"revision_count": 0
}
# 核心在这里!调用 app.stream 并指定模式
# app.stream 返回的是一个 Python 生成器 (Generator)
stream_generator = app.stream(initial_state, stream_mode="updates")
step = 1
for event in stream_generator:
# event 的结构是一个字典:{ "节点名称": { 状态增量 } }
print(f"\n--- ⏳ 步骤 {step} ---")
# 遍历 event 拿到节点名和输出
for node_name, node_update in event.items():
print(f"👀 观测到节点执行完毕: 【{node_name}】")
# 根据不同的节点,我们可以做不同的前端 UI 渲染
if node_name == "Writer":
print(f"✍️ 写手提交了新稿件: {node_update.get('draft')}")
print(f"🔄 当前修改次数: {node_update.get('revision_count')} (这是增量累加后的结果)")
elif node_name == "Editor":
status = node_update.get('status')
if status == "REJECTED":
print(f"😡 主编大发雷霆,打回了稿件!反馈意见: {node_update.get('feedback')}")
else:
print(f"🎉 主编非常满意,审核通过!反馈意见: {node_update.get('feedback')}")
step += 1
time.sleep(0.5) # 稍微停顿一下,方便肉眼观察输出效果
print("\n✅ [Agency 系统完毕] 最终文章已交付客户!")
if __name__ == "__main__":
run_agency_with_stream()
3. 运行效果展示(终端输出模拟)
当你运行上面的代码时,你不再需要死等 5 秒钟才看到最终结果。你会看到终端每隔一两秒就会蹦出一条日志,生动地展示内部的“宫斗”戏码:
🚀 [Agency 系统启动] 接收到客户需求...
--- ⏳ 步骤 1 ---
👀 观测到节点执行完毕: 【Writer】
✍️ 写手提交了新稿件: 【初稿】关于 LangGraph 流式输出原理解析 的文章。内容比较浅显。
🔄 当前修改次数: 1 (这是增量累加后的结果)
--- ⏳ 步骤 2 ---
👀 观测到节点执行完毕: 【Editor】
😡 主编大发雷霆,打回了稿件!反馈意见: 第 1 稿深度不够,给我回去重写!
--- ⏳ 步骤 3 ---
👀 观测到节点执行完毕: 【Writer】
✍️ 写手提交了新稿件: 【第2稿】关于 LangGraph 流式输出原理解析 的文章。已根据反馈『第 1 稿深度不够,给我回去重写!』进行了深度优化。
🔄 当前修改次数: 2 (这是增量累加后的结果)
--- ⏳ 步骤 4 ---
👀 观测到节点执行完毕: 【Editor】
😡 主编大发雷霆,打回了稿件!反馈意见: 第 2 稿深度不够,给我回去重写!
--- ⏳ 步骤 5 ---
👀 观测到节点执行完毕: 【Writer】
✍️ 写手提交了新稿件: 【第3稿】关于 LangGraph 流式输出原理解析 的文章。已根据反馈『第 2 稿深度不够,给我回去重写!』进行了深度优化。
🔄 当前修改次数: 3 (这是增量累加后的结果)
--- ⏳ 步骤 6 ---
👀 观测到节点执行完毕: 【Editor】
🎉 主编非常满意,审核通过!反馈意见: 这次写得不错,可以发布了。
✅ [Agency 系统完毕] 最终文章已交付客户!
试想一下,如果你通过 WebSocket 把这些数据推送到前端,你的用户体验会有多棒?用户可以像看进度条一样,看着 AI 团队一步步为他打磨作品,这种“参与感”和“透明度”是单体 LLM 无法提供的。
坑与避坑指南
作为拥有 10 年架构经验的老兵,我必须在这个环节敲打敲打你们。流式输出虽然爽,但坑也不少:
💣 坑一:混淆 updates 和 values 导致前端数据错乱
现象:前端拿到的状态有时候缺字段,有时候又是全的。
解法:记住,updates 只返回当前节点 return 的字典。如果你在 writer_node 里只 return {"draft": "xxx"},那么在 updates 模式下,你拿到的 event 里就没有 topic 和 status。如果你需要每次都拿到全量状态来渲染页面,请使用 stream_mode="values";如果你只是想做事件触发(比如弹出一个 Toast 提示“主编已审核”),用 updates 更轻量。
💣 坑二:把“节点级流式”和“Token级流式”搞混
现象:老板说想要 ChatGPT 那种一个字一个字蹦出来的效果,你却只做到了一个节点一个节点蹦出来的效果。 解法:这是完全两个维度的东西!
- 节点级流式(本节课讲的):关注的是 Agent 工作流走到哪一步了。
- Token级流式:关注的是某个具体大模型生成文本的过程。
在高级架构中,我们通常会结合两者。即:在图流转时使用
stream_mode="updates"报告进度,同时在writer_node内部调用 LLM 时,开启 LLM 的异步 Token Streaming 推送给前端。这部分我们在后续的《高阶通讯篇》会专门拉一节课讲。
💣 坑三:Reducer 累加器的重复触发
现象:revision_count 莫名其妙变成了 4、8、16。
解法:在使用 Annotated[int, operator.add] 时,你要清楚每一次节点返回数字,都会在原基础上相加。如果你在 editor_node 里不小心也 return {"revision_count": 1},那就会导致双重累加。严格划分每个节点对 State 中特定字段的写权限,是多智能体架构设计的核心原则。
📝 本期小结
今天,我们为 Agency 项目接入了“上帝视角”。
通过 LangGraph 的 stream_mode="updates",我们将一个原本耗时极长、容易让人失去耐心的黑盒系统,改造成了一个步骤清晰、状态透明的白盒系统。这不仅是代码层面的优化,更是产品体验上的降维打击。
在实际的商业落地中,能够把“主编打回重写”这种内部博弈暴露给用户,不仅不会让用户觉得系统笨,反而会让用户觉得你的 AI “很努力”、“很专业”。这,就是技术反哺产品的最佳体现。
下节课,我们将进入一个更刺激的话题:Human-in-the-loop (人类在环)。 如果主编 AI 拿捏不准,怎么让真实的人类老板介入审批?Graph 是如何做到“暂停执行,等待人类输入”的?
敬请期待第 11 期。同学们,下课!记得把今天的代码在本地跑一遍,体会一下生成器吐出数据的快感!