Issue 10 | Fine-Grained Streaming Output: Watching How the Graph Gets the Job Done
Node-by-node output (stream_mode="updates") lets you observe every internal dynamic of the "Editor's rejection" in real-time.
Welcome back to our "LangGraph Multi-Agent Expert Course". I am your instructor.
In previous lessons, we have built a pretty solid team for the "AI Content Agency": the Planner breaks down tasks, the Researcher gathers materials, the Writer works hard on drafting, and the Editor strictly controls quality.
However, recently some students complained in the group: "Teacher, my multi-agent system runs too much like a black box! Especially when the Editor is unsatisfied with the Writer's article and sends it back for a rewrite, the entire Graph might loop in the background for several minutes. During this time, the frontend page is completely unresponsive, and when the boss stares at the screen asking 'Did it crash?', I can only awkwardly wipe my sweat."
This is a very classic advanced engineering problem: Observability and User Experience (UX).
In traditional single LLM calls, we can use Token-level streaming output (typewriter effect) to alleviate user anxiety. But in multi-agent workflows (Graph), the real "big moves" happen between node transitions. Today, we are going to tear open this black box and use LangGraph's powerful stream_mode="updates" feature to expose every execution step of the Graph to the sunlight!
🎯 Learning Objectives for This Episode
After taking this lesson, you will no longer be a "novice" who can only wait dryly for the program to finish running, but an architect who can precisely control the execution rhythm of the Graph. Specific takeaways are as follows:
- Break the Black Box Anxiety: Deeply understand the concept of "state slicing" during multi-agent execution.
- Master Core APIs: Thoroughly understand the underlying differences between
stream_mode="updates"andvalues. - Practical Agency Workflow: In the love-hate relationship (looping rejections) between the Writer and Editor, accurately capture every node state update to provide a real-time feedback data source for your frontend.
- Elevate Architectural Taste: Learn how to elegantly parse LangGraph's Generator outputs.
📖 Principle Analysis
Before talking about the code, let's discuss the underlying principles.
In LangGraph, when we call graph.invoke(), the system holds its breath until the entire graph reaches the END node before spitting out the final State to you. It's like ordering "Buddha Jumps Over the Wall" at a restaurant; you wait in the lobby for two hours, and finally, the waiter brings out the finished pot.
But if you call graph.stream(), it's like sitting at the bar of an open kitchen.
LangGraph provides several different stream_modes, with the two most core ones being:
stream_mode="values": Every time the state updates, it pushes the complete current state to you in full. (Equivalent to taking a panoramic photo of the entire kitchen's current status and sending it to you after every step).stream_mode="updates": Every time a node finishes execution, it only pushes the incremental modifications made to the state by that node to you. (Equivalent to the chef shouting: "Chopping station finished, added a plate of chopped green onions!").
In multi-agent collaboration, the updates mode is our most commonly used weapon. Because it allows us to clearly know: "Who just did the work? What work was done?"
Let's use a Mermaid diagram to see how the data flows in our AI Content Agency when stream_mode="updates" is introduced:
sequenceDiagram
participant User as User (Client)
participant Graph as LangGraph Engine
participant Writer as Node: Writer
participant Editor as Node: Editor
User->>Graph: Initiate task stream(..., stream_mode="updates")
Graph->>Writer: 1. Execute writing
Writer-->>Graph: Return increment: {draft: "Draft v1", revision_count: 1}
Graph-->>User: ⚡ yield {"Writer": {draft: "Draft v1", revision_count: 1}}
Graph->>Editor: 2. Review draft
Editor-->>Graph: Return increment: {feedback: "Lacks depth", status: "REJECTED"}
Graph-->>User: ⚡ yield {"Editor": {feedback: "Lacks depth", status: "REJECTED"}}
Note over Graph, Editor: Conditional edge check: Rejected, route back to Writer
Graph->>Writer: 3. Rewrite based on feedback
Writer-->>Graph: Return increment: {draft: "Revision v2", revision_count: 2}
Graph-->>User: ⚡ yield {"Writer": {draft: "Revision v2", revision_count: 2}}
Graph->>Editor: 4. Review again
Editor-->>Graph: Return increment: {status: "APPROVED"}
Graph-->>User: ⚡ yield {"Editor": {status: "APPROVED"}}
Note over Graph, Editor: Conditional edge check: Approved, route to END
Graph-->>User: End StreamDo you see it now? Every ⚡ yield is a heartbeat spat out by LangGraph to the outside. As long as the external business code listens to these heartbeats, it can draw beautiful progress bars or real-time logs on the frontend: "The Editor is reviewing...", "The Editor rejected the article, the Writer is making the 2nd revision...".
💻 Practical Code Drill
Enough talk, Show me the code.
We will build this "Writer and Editor" looping workflow using Python and capture it using stream_mode="updates".
To allow everyone to run it directly, I used mock functions for the LLM calls here, focusing on demonstrating the streaming output architecture.
1. Define State and Node Logic
import time
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator
# ==========================================
# 1. Define the global State of our Agency
# ==========================================
class AgencyState(TypedDict):
topic: str
draft: str
feedback: str
revision_count: Annotated[int, operator.add] # Use a reducer, accumulate on each update
status: str # "PENDING", "REJECTED", "APPROVED"
# ==========================================
# 2. Define Nodes
# ==========================================
def writer_node(state: AgencyState):
"""Writer node: Responsible for writing articles based on topic or feedback"""
topic = state.get("topic")
feedback = state.get("feedback", "")
current_count = state.get("revision_count", 0)
# Mock the time-consuming process of LLM thinking and writing
time.sleep(1.5)
if current_count == 0:
new_draft = f"[First Draft] An article about {topic}. The content is relatively superficial."
else:
new_draft = f"[Draft {current_count + 1}] An article about {topic}. Deeply optimized based on the feedback: '{feedback}'."
# Return incremental updates
return {
"draft": new_draft,
"revision_count": 1 # Because operator.add is used, returning 1 here will add 1 to the original base
}
def editor_node(state: AgencyState):
"""Editor node: Responsible for reviewing articles"""
draft = state.get("draft")
current_count = state.get("revision_count", 0)
# Mock the time-consuming process of the Editor reviewing
time.sleep(1)
# Business logic: Force rejection for the first two times, approve on the third time (mocking a strict Editor)
if current_count < 3:
return {
"feedback": f"Draft {current_count} lacks depth, go back and rewrite it!",
"status": "REJECTED"
}
else:
return {
"feedback": "Good job this time, it can be published.",
"status": "APPROVED"
}
# ==========================================
# 3. Define Conditional Edge
# ==========================================
def check_approval(state: AgencyState):
"""Check if the Editor approved"""
if state.get("status") == "APPROVED":
return "approved"
return "rejected"
2. Assemble the Graph and Enable Streaming Observation
Next is the moment to witness the magic. We will assemble the graph and use the stream method.
# ==========================================
# 4. Assemble the Graph
# ==========================================
workflow = StateGraph(AgencyState)
workflow.add_node("Writer", writer_node)
workflow.add_node("Editor", editor_node)
workflow.set_entry_point("Writer")
workflow.add_edge("Writer", "Editor")
# If rejected, go back to Writer; if approved, proceed to END
workflow.add_conditional_edges(
"Editor",
check_approval,
{
"rejected": "Writer",
"approved": END
}
)
app = workflow.compile()
# ==========================================
# 5. Witness the magic: Use stream_mode="updates"
# ==========================================
def run_agency_with_stream():
print("🚀 [Agency System Started] Received client request...")
initial_state = {
"topic": "LangGraph Streaming Output Principle Analysis",
"revision_count": 0
}
# The core is here! Call app.stream and specify the mode
# app.stream returns a Python Generator
stream_generator = app.stream(initial_state, stream_mode="updates")
step = 1
for event in stream_generator:
# The structure of event is a dictionary: { "Node Name": { State Increment } }
print(f"\n--- ⏳ Step {step} ---")
# Iterate through the event to get the node name and output
for node_name, node_update in event.items():
print(f"👀 Observed node execution completed: [{node_name}]")
# Depending on the node, we can do different frontend UI rendering
if node_name == "Writer":
print(f"✍️ Writer submitted a new draft: {node_update.get('draft')}")
print(f"🔄 Current revision count: {node_update.get('revision_count')} (This is the result after incremental accumulation)")
elif node_name == "Editor":
status = node_update.get('status')
if status == "REJECTED":
print(f"😡 Editor is furious and rejected the draft! Feedback: {node_update.get('feedback')}")
else:
print(f"🎉 Editor is very satisfied and approved! Feedback: {node_update.get('feedback')}")
step += 1
time.sleep(0.5) # Pause slightly to make it easier to observe the output effect with the naked eye
print("\n✅ [Agency System Completed] The final article has been delivered to the client!")
if __name__ == "__main__":
run_agency_with_stream()
3. Execution Effect Display (Terminal Output Mock)
When you run the above code, you no longer need to wait dead for 5 seconds to see the final result. You will see a log pop up in the terminal every second or two, vividly displaying the internal "palace drama":
🚀 [Agency System Started] Received client request...
--- ⏳ Step 1 ---
👀 Observed node execution completed: [Writer]
✍️ Writer submitted a new draft: [First Draft] An article about LangGraph Streaming Output Principle Analysis. The content is relatively superficial.
🔄 Current revision count: 1 (This is the result after incremental accumulation)
--- ⏳ Step 2 ---
👀 Observed node execution completed: [Editor]
😡 Editor is furious and rejected the draft! Feedback: Draft 1 lacks depth, go back and rewrite it!
--- ⏳ Step 3 ---
👀 Observed node execution completed: [Writer]
✍️ Writer submitted a new draft: [Draft 2] An article about LangGraph Streaming Output Principle Analysis. Deeply optimized based on the feedback: 'Draft 1 lacks depth, go back and rewrite it!'.
🔄 Current revision count: 2 (This is the result after incremental accumulation)
--- ⏳ Step 4 ---
👀 Observed node execution completed: [Editor]
😡 Editor is furious and rejected the draft! Feedback: Draft 2 lacks depth, go back and rewrite it!
--- ⏳ Step 5 ---
👀 Observed node execution completed: [Writer]
✍️ Writer submitted a new draft: [Draft 3] An article about LangGraph Streaming Output Principle Analysis. Deeply optimized based on the feedback: 'Draft 2 lacks depth, go back and rewrite it!'.
🔄 Current revision count: 3 (This is the result after incremental accumulation)
--- ⏳ Step 6 ---
👀 Observed node execution completed: [Editor]
🎉 Editor is very satisfied and approved! Feedback: Good job this time, it can be published.
✅ [Agency System Completed] The final article has been delivered to the client!
Imagine, if you push this data to the frontend via WebSocket, how great would your user experience be? Users can watch the AI team polish their work step by step, just like watching a progress bar. This sense of "participation" and "transparency" is something a monolithic LLM cannot provide.
Pitfalls and Avoidance Guide
As a veteran with 10 years of architectural experience, I must give you some warnings in this section. Although streaming output is satisfying, there are quite a few pitfalls:
💣 Pitfall 1: Confusing updates and values leading to frontend data chaos
Symptom: The state received by the frontend sometimes lacks fields, and sometimes it is complete.
Solution: Remember, updates only returns the dictionary returned by the current node. If you only return {"draft": "xxx"} in writer_node, then in updates mode, the event you get will not have topic and status. If you need to get the full state every time to render the page, please use stream_mode="values"; if you just want to do event triggering (like popping up a Toast saying "Editor has reviewed"), updates is more lightweight.
💣 Pitfall 2: Confusing "Node-level Streaming" and "Token-level Streaming"
Symptom: The boss says they want the ChatGPT effect where words pop out one by one, but you only achieved the effect of nodes popping out one by one. Solution: These are things of completely different dimensions!
- Node-level Streaming (covered in this lesson): Focuses on which step the Agent workflow has reached.
- Token-level Streaming: Focuses on the process of a specific large model generating text.
In advanced architectures, we usually combine the two. That is: use
stream_mode="updates"to report progress during graph transitions, while simultaneously enabling the LLM's asynchronous Token Streaming to push to the frontend when calling the LLM insidewriter_node. We will dedicate a specific lesson to this part in the upcoming "Advanced Communication Chapter".
💣 Pitfall 3: Repeated Triggering of Reducer Accumulator
Symptom: revision_count inexplicably becomes 4, 8, 16.
Solution: When using Annotated[int, operator.add], you must be clear that every time a node returns a number, it will be added to the original base. If you accidentally also return {"revision_count": 1} in editor_node, it will lead to double accumulation. Strictly dividing the write permissions of each node for specific fields in the State is a core principle of multi-agent architecture design.
📝 Episode Summary
Today, we integrated a "God's-eye view" into the Agency project.
Through LangGraph's stream_mode="updates", we transformed a black-box system that was originally extremely time-consuming and easy to lose patience with, into a white-box system with clear steps and transparent states. This is not only an optimization at the code level but also a massive leap in product experience.
In actual commercial implementation, being able to expose internal drama like "Editor rejects and rewrites" to users will not make users feel the system is stupid; instead, it will make users feel your AI is "working hard" and "very professional". This is the best manifestation of technology feeding back into the product.
Next lesson, we will enter a more exciting topic: Human-in-the-loop. If the Editor AI is unsure, how do we let a real human boss intervene for approval? How does the Graph achieve "pause execution, wait for human input"?
Please look forward to Episode 11. Class dismissed, students! Remember to run today's code locally and experience the thrill of the Generator spitting out data!