Issue 23 | Fine-Grained Token Stream Retrieval (stream_mode="messages")
When generating a 10,000-word article, how do we achieve a typewriter-like streaming effect on the frontend while the Graph is running?
Welcome back to our "LangGraph Multi-Agent Expert Course", fellow AI Architects. I am your old friend.
In the previous session, our "AI Content Agency" began to take shape: the Planner strategizing, the Researcher frantically retrieving data, the Writer writing furiously, and the Editor strictly gatekeeping. The moment the entire workflow ran successfully, I believe everyone felt a sense of satisfaction.
However, yesterday a student complained in the group: "Teacher, my Graph ran successfully, but when the Writer node started drafting that 10,000-word in-depth industry report, my frontend interface froze for a full 40 seconds! The boss thought the system crashed and almost had my head."
This is not just an experience issue; it is a severe UX (User Experience) disaster.
In traditional monolithic LLM calls, we have long been accustomed to using streaming=True to achieve the typewriter effect. But in a complex state machine like LangGraph, composed of multiple Agent nodes, the default streaming output (stream_mode="values" or "updates") is at the node level—meaning it insists on waiting for the Writer to squeeze out all 10,000 words and the node to finish executing before throwing the final state to you.
Can we tolerate this? Of course not! Today, we are going to peel back the underlying layers of LangGraph, "hijack" the LLM's Token stream from within the node, and push it directly to the frontend! We will be using today's absolute protagonist: stream_mode="messages".
🎯 Learning Objectives for this Session
After completing this lesson, you will have mastered the following skills:
- Breaking Node Barriers: Deeply understand the essential difference between Graph-level state streams and LLM-level Token streams.
- Mastering
stream_mode="messages": Learn to intercept and parse the Message Chunks and Metadata emitted by LangGraph's underlying layers. - Precise Routing and Distribution: In multi-Agent collaboration, precisely locate and extract only the Token stream of the Writer node, filtering out the internal thought processes of the Planner and Researcher.
- Refactoring the Agency Output Layer: Endow our AI Content Agency with silky-smooth, real-time "typewriter" output capabilities.
📖 Principle Analysis
Before writing code, we must first understand LangGraph's streaming output philosophy. Pay attention, this is a high-frequency interview topic!
LangGraph provides three main stream_modes:
"values": Every time a node updates, it throws the complete global State back to you."updates": When a node finishes executing, it throws the part of the state updated by that node to you. (This is what we used most in the previous 22 sessions)."messages": Fine-grained listening mode. It no longer waits for the node to finish, but directly listens to the ChatModel inside the node. As soon as the LLM spits out a Token (AIMessageChunk), it immediately throws it out along with the Metadata indicating which node it currently belongs to.
For example:
"updates" is like eating at a restaurant; you have to wait for the chef (Writer) to finish cooking the entire plate of the "10,000-word article" and serve it to the table before you can see the dish.
"messages" is like pulling up a stool and sitting right next to the chef; every time he chops an onion (Token), you can see it clearly.
Let's use a Mermaid diagram to see the workflow behind this:
sequenceDiagram
participant User as Frontend User
participant Graph as LangGraph (Agency)
participant Planner as Planner Node
participant Writer as Writer Node
participant LLM as Underlying LLM
User->>Graph: Submit request: "Write a 10,000-word AI report"
Graph->>Planner: Trigger planning
Planner-->>Graph: Return outline (Node ends)
Note over User, Graph: If using "updates" mode here,
the frontend will only receive one outline update at this moment
Graph->>Writer: Trigger writing (Extremely time-consuming)
Writer->>LLM: invoke(outline)
rect rgb(230, 240, 255)
Note over Graph, LLM: The magic moment of stream_mode="messages"
LLM-->>Graph: Token 1 ("As")
Graph-->>User: ⚡️ Real-time push Token 1 (with metadata: node="Writer")
LLM-->>Graph: Token 2 (" AI")
Graph-->>User: ⚡️ Real-time push Token 2 (with metadata: node="Writer")
LLM-->>Graph: Token 3 (" de")
Graph-->>User: ⚡️ Real-time push Token 3 (with metadata: node="Writer")
end
LLM-->>Writer: Complete generation finished
Writer-->>Graph: Return complete article state (Node ends)
Graph-->>User: Final state update endsUnderstand? Under the stream_mode="messages" mode, the Graph becomes a transparent pipeline, and every breath of the underlying LLM is transmitted to the user in real-time.
💻 Practical Code Drill
Enough talk, Show me the code.
We will base this on our "AI Content Agency" project, extracting the core logic of Planner -> Writer to demonstrate how to achieve the typewriter effect.
Step 1: Build the Base Graph and Agents
(To allow you to copy and run directly, I have streamlined the definitions of State and Node into a single script. Please ensure you have installed langgraph and langchain-openai.)
import os
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END, START
from langgraph.graph.message import add_messages
# Assuming you have configured environment variables, if not, uncomment and fill in
# os.environ["OPENAI_API_KEY"] = "sk-..."
# 1. Define our Agency State
class AgencyState(TypedDict):
# Use add_messages to automatically merge conversation history
messages: Annotated[list[BaseMessage], add_messages]
outline: str
final_article: str
# 2. Initialize the LLM (Note: No need to explicitly write streaming=True here, LangGraph handles it internally)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
# 3. Define the Planner node (Responsible for writing the outline, not streamed to the user)
def planner_node(state: AgencyState):
print("\n[System Log] Planner is thinking about the outline...")
prompt = f"Please generate an article outline based on the user's request. User request: {state['messages'][-1].content}"
response = llm.invoke(prompt)
return {"outline": response.content}
# 4. Define the Writer node (Responsible for writing the long article, this is the protagonist we want to listen to!)
def writer_node(state: AgencyState):
print("\n[System Log] Writer starts drafting the long article based on the outline...")
prompt = f"You are a senior writer. Please write a detailed article based on the following outline:\n{state['outline']}"
# Just invoke directly, streaming interception is done externally by the Graph's stream method
response = llm.invoke(prompt)
return {"messages": [response], "final_article": response.content}
# 5. Assemble the Graph
workflow = StateGraph(AgencyState)
workflow.add_node("planner", planner_node)
workflow.add_node("writer", writer_node)
workflow.add_edge(START, "planner")
workflow.add_edge("planner", "writer")
workflow.add_edge("writer", END)
app = workflow.compile()
Step 2: The Moment of Magic (Core Extraction Logic)
Now comes the main event. We will run this Graph using stream_mode="messages" and extract only the Token stream of the Writer node.
Please read the comments in this code carefully; this is the essence of a 10-year veteran's troubleshooting experience:
def run_agency_with_streaming(user_input: str):
print(f"👨💻 User Input: {user_input}\n" + "="*50)
inputs = {"messages": [HumanMessage(content=user_input)]}
# Pay attention! Enable stream_mode="messages"
# Returns a generator, yielding a tuple each time: (message_chunk, metadata)
stream = app.stream(inputs, stream_mode="messages")
print("✍️ Frontend typewriter effect starts:\n")
for chunk, metadata in stream:
# metadata is a dictionary containing highly valuable information such as which node the current Token originates from
# It looks like this: {'langgraph_step': 2, 'langgraph_node': 'writer', 'langgraph_triggering_edges': ['planner']}
node_name = metadata.get("langgraph_node")
# [Filtering Strategy 1]: We only care about the output of the Writer node
# Because Planner also calls the LLM, without this check, the frontend would print the outline as part of the article!
if node_name == "writer":
# [Filtering Strategy 2]: Ensure this is an AIMessageChunk (text chunk generated by the model)
# Because when the Graph just enters or exits a node, it also sends some non-text control messages
if chunk.__class__.__name__ == "AIMessageChunk":
# [Filtering Strategy 3]: Extract the actual text content
# Sometimes the model might be making a Tool Call, in which case content is empty
if chunk.content:
# Simulate frontend typewriter: output without line breaks and flush the buffer immediately
print(chunk.content, end="", flush=True)
print("\n\n" + "="*50 + "\n✅ Article generation complete!")
# Run test
if __name__ == "__main__":
run_agency_with_streaming("Please write a short article about the development trends of artificial intelligence in 2024, divided into three paragraphs.")
When you run this code, you will see the console first print [System Log] Planner is thinking about the outline..., at which point the interface is quiet (because we blocked the Planner's stream).
Immediately after, it prints [System Log] Writer starts drafting the long article based on the outline..., and then, the article flows out onto your screen word by word, just like someone typing furiously on a keyboard!
This is true production-grade UX.
Pitfalls and Avoidance Guide (Troubleshooting Experience from a High-Level Perspective)
As your mentor, I cannot just teach you how to run a Demo; I must also tell you what horror stories you will encounter in production environments (like when deploying that multi-million dollar project for a client).
🚨 Pitfall 1: Frontend Crashes Caused by Tool Calls
Crash Scene: When your Writer Agent has the ability to call external search tools (e.g., halfway through writing, it realizes it lacks data and calls Google Search), the chunk.content inside the chunk spat out by the LLM might be empty! What it is actually generating is chunk.tool_call_chunks. If your frontend blindly reads chunk.content and tries to concatenate it, it might throw a NoneType error or cause frontend React/Vue components to crash.
Veteran's Solution: In the extraction logic, be sure to add defensive programming:
if chunk.content:
# Normal text, send to frontend
send_to_frontend(chunk.content)
elif chunk.tool_call_chunks:
# Generating parameters for a tool call, you can choose to display a "Retrieving data..." animation on the frontend
show_loading_animation(chunk.tool_call_chunks[0]['name'])
🚨 Pitfall 2: Ghostly "Double Output"
Crash Scene: Some students, wanting both streaming output and the final state, wrote the code as stream_mode=["messages", "updates"]. As a result, the frontend received a typewriter output, and then instantly received a whole repeated paragraph of the complete article at the end.
Veteran's Solution: If you use multiple stream_modes simultaneously, the tuple returned by app.stream becomes (stream_mode_name, payload). You must route the flow by checking the first element:
for event_type, payload in app.stream(inputs, stream_mode=["messages", "updates"]):
if event_type == "messages":
chunk, metadata = payload
# Handle typewriter logic...
elif event_type == "updates":
# Handle node state update logic (e.g., updating a progress bar in the sidebar)
pass
🚨 Pitfall 3: Attempting to Read Graph State During Streaming Output
Crash Scene: During stream_mode="messages", some students tried to get the current global State via app.get_state(config), only to find it hadn't updated at all!
Veteran's Solution: Remember the "philosophy" we discussed at the beginning. The messages mode is intercepting the real-time output of the underlying LLM. At this point, the Writer node has not finished executing! LangGraph's state update must wait until after the node returns. Therefore, during streaming output, the global State remains the state from when the previous node (Planner) ended. Do not attempt to read the current node's final State during Token streaming.
📝 Summary of this Session
Today, we solved a pain point with immense commercial value: the UX blocking issue in long-text generation.
We did not modify any internal code logic of the Agents; simply by switching to stream_mode="messages" at the Graph invocation layer and pairing it with precise metadata routing filtering, we endowed our AI Content Agency with seamless real-time feedback capabilities.
This is the charm of architectural design: the decoupling of underlying logic brings extreme flexibility to top-level performance.
Homework:
Try modifying today's code so that the Planner node's outline generation process is also displayed on the frontend like a typewriter in another color or another UI component (like a sidebar). Hint: You need to modify the node_name == "writer" conditional logic.
In the next session (Session 24), we will face the most challenging part of the entire Agency project—Human-in-the-loop. If the Editor feels the article is terribly written, how do we pause the Graph, wait for the human editor-in-chief (that's you) to manually modify the outline, and then have the Writer regenerate it?
See you next session! Stay passionate, and keep Coding!