第 23 期 | 细粒度 Token 流获取 (stream_mode="messages")
当生成万字长文时,如果实现一边跑 Graph 一边在前端打字机般吐出文章?
各位 AI 架构师们,欢迎回到我们的《LangGraph 多智能体专家课》。我是你们的老朋友。
在上一期中,我们的「AI 万能内容创作机构 (AI Content Agency)」已经初具规模,Planner 运筹帷幄,Researcher 疯狂检索,Writer 奋笔疾书,Editor 严格把关。整个工作流跑通的那一刻,相信大家心里都暗爽了一下。
但是,昨天有同学在群里跟我抱怨:“老师,我的 Graph 跑是跑通了,但是当 Writer 节点开始撰写那篇 1 万字的深度行业报告时,我的前端界面卡在那儿整整 40 秒!老板以为系统死机了,差点把我祭天。”
这不仅是体验问题,这是严重的 UX(用户体验)灾难。
在传统的单体 LLM 调用中,我们早就习惯了用 streaming=True 来实现打字机效果。但在 LangGraph 这种由多个 Agent 节点组成的复杂状态机中,默认的流式输出(stream_mode="values" 或 "updates")是节点级别的——也就是说,它非得等 Writer 把这 1 万字全部憋出来、节点运行结束,才会把最终状态扔给你。
这能忍?当然不能!今天,我们就来扒开 LangGraph 的底层,把大模型的 Token 流从节点内部“劫持”出来,直接推送到前端!我们要用到今天的绝对主角:stream_mode="messages"。
🎯 本期学习目标
上完这节课,你将把以下技能收入囊中:
- 打破节点壁垒:深刻理解 Graph 级状态流与 LLM 级 Token 流的本质区别。
- 掌握
stream_mode="messages":学会拦截并解析 LangGraph 底层抛出的消息块 (Message Chunks) 与元数据 (Metadata)。 - 精准路由分发:在多 Agent 协同中,精准定位并只提取 Writer 节点的 Token 流,过滤掉 Planner 和 Researcher 的内部思考过程。
- 重构 Agency 输出层:为我们的 AI Content Agency 赋予丝滑的实时“打字机”输出能力。
📖 原理解析
在动手写代码之前,我们要先搞懂 LangGraph 的流式输出哲学。敲黑板,这里是面试高频考点!
LangGraph 提供了三种主要的 stream_mode:
"values":每次有节点更新,就把完整的全局状态 (State) 重新抛给你一次。"updates":哪个节点执行完了,就把那个节点更新的那部分状态抛给你。(这是我们前 22 期最常用的)。"messages":细粒度监听模式。它不再等待节点结束,而是直接监听节点内部的 ChatModel。只要大模型吐出一个 Token(AIMessageChunk),它就立刻连同当前属于哪个节点的元数据(Metadata)一起抛出来。
打个比方:
"updates" 就像是去餐厅吃饭,你要等厨师(Writer)把一整盘“万字长文”炒好端上桌,你才能看到菜。
"messages" 就像是你直接搬了个小马扎坐在厨师旁边,他每切一刀洋葱(Token),你都能看得清清楚楚。
让我们用一张 Mermaid 图来看看这背后的工作流走向:
sequenceDiagram
participant User as 前端用户
participant Graph as LangGraph (Agency)
participant Planner as Planner 节点
participant Writer as Writer 节点
participant LLM as 底层大模型
User->>Graph: 提交需求:"写一篇万字AI报告"
Graph->>Planner: 触发规划
Planner-->>Graph: 返回大纲 (节点结束)
Note over User, Graph: 此时如果用 "updates" 模式,
前端只能在此刻收到一次大纲更新
Graph->>Writer: 触发撰写 (耗时极长)
Writer->>LLM: invoke(大纲)
rect rgb(230, 240, 255)
Note over Graph, LLM: stream_mode="messages" 的魔法时刻
LLM-->>Graph: Token 1 ("随")
Graph-->>User: ⚡️ 实时推送 Token 1 (附带 metadata: node="Writer")
LLM-->>Graph: Token 2 ("着")
Graph-->>User: ⚡️ 实时推送 Token 2 (附带 metadata: node="Writer")
LLM-->>Graph: Token 3 ("人")
Graph-->>User: ⚡️ 实时推送 Token 3 (附带 metadata: node="Writer")
end
LLM-->>Writer: 完整生成完毕
Writer-->>Graph: 返回完整文章状态 (节点结束)
Graph-->>User: 最终状态更新结束看懂了吗?在 stream_mode="messages" 模式下,Graph 变成了一个透明的管道,底层 LLM 的每一次呼吸,都被实时传递给了用户。
💻 实战代码演练
废话不多说,Show me the code。
我们将基于我们的「AI Content Agency」项目,截取 Planner -> Writer 这一段核心逻辑,来演示如何实现打字机效果。
第一步:构建基础图与 Agent
(为了让你能直接复制运行,我把 State 和 Node 的定义精简在了一个脚本里。请确保你安装了 langgraph 和 langchain-openai)
import os
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END, START
from langgraph.graph.message import add_messages
# 假设你已经配置了环境变量,如果没有请取消注释并填入
# os.environ["OPENAI_API_KEY"] = "sk-..."
# 1. 定义我们的 Agency 状态
class AgencyState(TypedDict):
# 使用 add_messages 自动合并对话历史
messages: Annotated[list[BaseMessage], add_messages]
outline: str
final_article: str
# 2. 初始化大模型 (注意:这里不需要特意写 streaming=True,LangGraph 底层会自动处理)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
# 3. 定义 Planner 节点 (负责写大纲,不流式输出给用户看)
def planner_node(state: AgencyState):
print("\n[系统日志] Planner 正在思考大纲...")
prompt = f"请根据用户的需求,生成一个文章大纲。用户需求:{state['messages'][-1].content}"
response = llm.invoke(prompt)
return {"outline": response.content}
# 4. 定义 Writer 节点 (负责写长文,这是我们要监听的主角!)
def writer_node(state: AgencyState):
print("\n[系统日志] Writer 开始根据大纲撰写长文...")
prompt = f"你是一个资深作家。请根据以下大纲写一篇详细的文章:\n{state['outline']}"
# 直接 invoke 即可,流式拦截由 Graph 的 stream 方法在外部完成
response = llm.invoke(prompt)
return {"messages": [response], "final_article": response.content}
# 5. 组装 Graph
workflow = StateGraph(AgencyState)
workflow.add_node("planner", planner_node)
workflow.add_node("writer", writer_node)
workflow.add_edge(START, "planner")
workflow.add_edge("planner", "writer")
workflow.add_edge("writer", END)
app = workflow.compile()
第二步:见证奇迹的时刻 (核心提取逻辑)
现在,重头戏来了。我们要用 stream_mode="messages" 来运行这个 Graph,并且只提取 Writer 节点的 Token 流。
请仔细阅读这段代码里的注释,这是 10 年老兵的踩坑精华:
def run_agency_with_streaming(user_input: str):
print(f"👨💻 用户输入: {user_input}\n" + "="*50)
inputs = {"messages": [HumanMessage(content=user_input)]}
# 敲黑板!启用 stream_mode="messages"
# 返回的是一个生成器,每次 yield 一个元组:(message_chunk, metadata)
stream = app.stream(inputs, stream_mode="messages")
print("✍️ 前端打字机效果开始:\n")
for chunk, metadata in stream:
# metadata 是一个字典,包含了当前 Token 来源于哪个节点等极具价值的信息
# 长这个样子:{'langgraph_step': 2, 'langgraph_node': 'writer', 'langgraph_triggering_edges': ['planner']}
node_name = metadata.get("langgraph_node")
# 【过滤策略 1】:我们只关心 Writer 节点的输出
# 因为 Planner 也会调用 LLM,如果不加判断,前端会把大纲也当作文章打印出来!
if node_name == "writer":
# 【过滤策略 2】:确保这是一个 AIMessageChunk (模型生成的文本块)
# 因为 Graph 刚进入节点或结束节点时,也会发送一些非文本块的控制消息
if chunk.__class__.__name__ == "AIMessageChunk":
# 【过滤策略 3】:提取实际的文本内容
# 有时候模型可能在调用工具(Tool Call),这时候 content 是空的
if chunk.content:
# 模拟前端打字机:不换行输出,并立即刷新缓冲区
print(chunk.content, end="", flush=True)
print("\n\n" + "="*50 + "\n✅ 文章生成完毕!")
# 运行测试
if __name__ == "__main__":
run_agency_with_streaming("请写一篇关于2024年人工智能发展趋势的短文,分为三个段落。")
当你运行这段代码时,你会看到控制台先打印出 [系统日志] Planner 正在思考大纲...,此时界面是安静的(因为我们屏蔽了 Planner 的流)。
紧接着打印 [系统日志] Writer 开始根据大纲撰写长文...,然后,文章就像有人在键盘上疯狂敲击一样,一个字一个字地在你的屏幕上流淌出来!
这就是真正的生产级 UX。
坑与避坑指南 (高阶视角的排错经验)
作为你们的导师,我不能只教你们怎么跑通 Demo,更要告诉你们在生产环境(比如给客户部署那个千万级项目时)会遇到什么鬼故事。
🚨 坑 1:Tool Calls(工具调用)导致的前端崩溃
翻车现场:当你的 Writer Agent 具备了调用外部搜索工具的能力(比如它写到一半发现数据不够,去调 Google Search),此时 LLM 吐出的 chunk 里面,chunk.content 可能是空的!它真正在生成的是 chunk.tool_call_chunks。如果你的前端盲目读取 chunk.content 并尝试拼接,可能会报 NoneType 错误或引发前端 React/Vue 组件崩溃。
老司机解法:在提取逻辑中,务必加上防御性编程:
if chunk.content:
# 正常文本,发送给前端
send_to_frontend(chunk.content)
elif chunk.tool_call_chunks:
# 正在生成工具调用的参数,可以选择在前端显示 "正在检索资料..." 的动画
show_loading_animation(chunk.tool_call_chunks[0]['name'])
🚨 坑 2:幽灵般的 "双重输出"
翻车现场:有同学为了既要流式输出,又要获取最终状态,把代码写成了 stream_mode=["messages", "updates"]。结果前端收到了一次打字机输出,最后又瞬间收到了一整段重复的完整文章。
老司机解法:如果你同时使用了多种 stream_mode,app.stream 返回的元组会变成 (stream_mode_name, payload)。你必须通过判断第一个元素来分流:
for event_type, payload in app.stream(inputs, stream_mode=["messages", "updates"]):
if event_type == "messages":
chunk, metadata = payload
# 处理打字机逻辑...
elif event_type == "updates":
# 处理节点状态更新逻辑(比如更新侧边栏的进度条)
pass
🚨 坑 3:试图在流式输出中读取 Graph State
翻车现场:在 stream_mode="messages" 期间,有同学想通过 app.get_state(config) 去获取当前的全局 State,发现根本没有更新!
老司机解法:记住我们开头讲的“道”。messages 模式是拦截底层大模型的实时输出。此时 Writer 节点还没有执行完!LangGraph 的状态更新必须等待节点 return 之后才会发生。所以,在流式输出期间,全局 State 依然是上一个节点(Planner)结束时的状态。不要在 Token 流式传输期间尝试读取当前节点的最终 State。
📝 本期小结
今天,我们解决了一个极具商业价值的痛点:长文本生成的 UX 阻塞问题。
我们没有修改任何 Agent 内部的代码逻辑,仅仅通过在 Graph 调用层切换到 stream_mode="messages",并配合精准的 metadata 路由过滤,就为我们的 AI Content Agency 赋予了无缝的实时反馈能力。
这就是架构设计的魅力:底层逻辑的解耦,带来了顶层表现的极度灵活。
课后作业:
尝试修改今天的代码,让 Planner 节点生成大纲的过程,也在前端以另一种颜色或另一个 UI 组件(比如侧边栏)打字机般显示出来。提示:你需要修改 node_name == "writer" 这个判断逻辑。
在下一期(第 24 期)中,我们将迎来整个 Agency 项目中最具挑战的一环——Human-in-the-loop (人类在环)。如果 Editor 觉得文章写得太烂,如何让 Graph 暂停,等待人类主编(也就是你)手动修改大纲后,再让 Writer 重新生成?
我们下期再见!保持热爱,继续 Coding!