Issue 11 | Parallel Execution: Divide and Conquer Mechanism
Hello everyone, welcome back to the "LangGraph Multi-Agent Expert Course". I am your mentor.
In the previous episode, we successfully introduced a memory mechanism to our "AI Content Agency", granting our Agent the ability to maintain contextual coherence. However, as the orders our agency receives become increasingly complex and customer demands grow higher, we have encountered a severe performance bottleneck.
Imagine this scenario: A client requests a "2024 AI Industry Development Report for China, the US, and Japan". According to our previous linear workflow: The Planner breaks down three sub-topics (China, US, Japan) $\rightarrow$ The Researcher searches for information on China $\rightarrow$ Waits until finished, then searches for the US $\rightarrow$ Then searches for Japan $\rightarrow$ Finally hands it over to the Writer.
What is this called? This is called Sequential Execution. If data collection for each country takes 10 seconds, the Research phase alone will consume 30 seconds! In a real business environment, your clients will lose patience waiting, and your computing resources are not being fully utilized.
How do top-tier content agencies actually do it? After the Editor-in-Chief (Planner) breaks down the task, they simultaneously dispatch 3 Researchers. Zhang researches China, Li researches the US, and Wang researches Japan. They split into three paths; whoever finishes throws their data into a summary pool, and finally, the Writer takes everything at once to write the draft.
This is our topic for today: Parallel Execution and the Divide and Conquer mechanism. In distributed systems, we call this Fan-out and Fan-in.
Ready to refactor our workflow? Let's go!
🎯 Learning Objectives for this Episode
Through this episode's practical exercise, you will master the following advanced skills:
- Understand the Fan-out & Fan-in Architecture: Master the principles of dynamic distribution and result aggregation in multi-agent collaboration.
- Master the LangGraph
SendAPI: Say goodbye to static conditional branches and learn how to dynamically create multiple node instances based on runtime data. - Master Concurrent State Aggregation (State Reduction): Solve the data overwrite disaster that occurs when multiple nodes write to the State simultaneously.
- Refactor the Core Agency Flow: Equip our content agency with a "multi-threaded" engine, boosting efficiency exponentially.
📖 Principle Analysis
There are two ways to implement parallelism in LangGraph. The first is Static Parallelism: one node connects directly to three fixed nodes. This is simple but lacks flexibility. Because in our Agency, the number of topics broken down by the Planner is dynamic (sometimes 2, sometimes 5).
Therefore, we need the second approach: Dynamic Parallelism.
LangGraph provides an extremely elegant magic weapon for this: the Send object.
When you return a list of [Send("node_name", payload), Send("node_name", payload)] from a node or conditional edge, LangGraph will instantly clone the corresponding number of node_name nodes and feed the different payloads to them concurrently.
This is like the Planner standing in the middle of the office and shouting: "Bring me 3 Researchers!" Instantly, 3 Researchers spring up, taking their respective sub-topics to work.
Pay attention! Look at the Mermaid architecture diagram of our refactored Agency below:
graph TD
Start((Start)) --> Planner[Planner Node\nPlanner breaks down topics]
Planner -- "Fan-out\nReturns [Send, Send, Send]" --> Dispatch{Dispatch Router}
Dispatch -.->|Send payload 1| Researcher1[Researcher Instance 1\nZhang researches Topic A]
Dispatch -.->|Send payload 2| Researcher2[Researcher Instance 2\nLi researches Topic B]
Dispatch -.->|Send payload 3| Researcher3[Researcher Instance 3\nWang researches Topic C]
Researcher1 -.->|Fan-in\nAppend to List| Aggregate((State Aggregation))
Researcher2 -.->|Fan-in\nAppend to List| Aggregate
Researcher3 -.->|Fan-in\nAppend to List| Aggregate
Aggregate --> Writer[Writer Node\nWriter aggregates and writes]
Writer --> End((End))
classDef core fill:#2d3436,stroke:#74b9ff,stroke-width:2px,color:#fff;
classDef agent fill:#0984e3,stroke:#fff,stroke-width:2px,color:#fff;
classDef router fill:#d63031,stroke:#fff,stroke-width:2px,color:#fff;
class Planner,Writer core;
class Researcher1,Researcher2,Researcher3 agent;
class Dispatch router;Workflow Analysis:
- The Planner determines the article outline and generates a
sub_topicslist. - The Dispatch Router iterates through this list, generates multiple
Sendobjects, and triggers the Fan-out. - Multiple Researcher instances run in parallel (LangGraph handles concurrency automatically under the hood).
- After each Researcher finishes running, it writes the results to the State. The State here must use a specific Reducer (such as
operator.add) to achieve Fan-in aggregation; otherwise, the Researcher that finishes later will overwrite the results of the one that finished earlier. - Only after all Researchers have completed will the flow transition to the Writer.
💻 Practical Code Drill
Enough talk, show me the code.
To allow everyone to run and experience it directly, I will use Mock LLM calls in this episode's Demo, utilizing time.sleep to demonstrate the power of parallelism.
Step 1: Define a State that Supports Aggregation
This is the easiest place to fall into a trap! When defining research_results in the State, we must use Annotated and operator.add. This tells LangGraph: "When multiple nodes write data to this field simultaneously, do not overwrite, but Append them together!"
import operator
import time
from typing import Annotated, TypedDict, List
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
# ==========================================
# 1. State Definition
# ==========================================
class AgencyState(TypedDict):
main_topic: str # Main topic given by the client
sub_topics: List[str] # List of sub-topics broken down by the Planner
# ⚠️ Core point: Use operator.add to ensure results are appended to the list during parallel writes, rather than overwriting each other
research_results: Annotated[List[str], operator.add]
final_article: str # The final generated article
# To pass specific parameters to Send, we define a dedicated State for the Researcher
class ResearcherState(TypedDict):
sub_topic: str
Step 2: Write Node Logic (Nodes)
Next, let's implement the three core employees of the Agency: the Planner, the Researcher, and the Writer.
# ==========================================
# 2. Node Functions
# ==========================================
def planner_node(state: AgencyState):
"""
Planner Node: Responsible for breaking down the main topic into multiple sub-topics.
The Planner: Breaks down the main topic into sub-topics.
"""
print(f"👨💼 [Planner] Breaking down main topic: '{state['main_topic']}'...")
# Mock LLM breakdown logic
mock_sub_topics = [
f"Historical background of {state['main_topic']}",
f"Core technologies of {state['main_topic']}",
f"Future trends of {state['main_topic']}"
]
print(f"👨💼 [Planner] Breakdown complete, generated {len(mock_sub_topics)} sub-topics. Ready to dispatch!")
# Update the broken-down sub-topics into the State
return {"sub_topics": mock_sub_topics}
def researcher_node(state: ResearcherState):
"""
Researcher Node: Responsible for conducting deep research on a SINGLE sub-topic.
The Researcher: Conducts deep research on a SINGLE sub-topic.
"""
topic = state["sub_topic"]
print(f" 🔍 [Researcher] Task received, starting search for: '{topic}'...")
# Mock time-consuming web search and reading process (using sleep)
# Note: In real LangGraph asynchronous execution, this blocking would be handled efficiently
time.sleep(2)
mock_result = f"[Research Report on '{topic}']: Here is some highly valuable in-depth information..."
print(f" ✅ [Researcher] Search complete: '{topic}'")
# ⚠️ Core point: The key of the returned dictionary must match the field name to be aggregated in AgencyState
# Because we used operator.add, the list returned here will be appended to the main list
return {"research_results": [mock_result]}
def writer_node(state: AgencyState):
"""
Writer Node: Aggregates all research reports and writes the final article.
The Writer: Aggregates all research reports and writes the final article.
"""
print(f"\n✍️ [Writer] Received all research materials, starting to write furiously...")
results = state.get("research_results", [])
print(f"✍️ [Writer] Received a total of {len(results)} research reports.")
# Mock writing process
article = f"# In-Depth Analysis of {state['main_topic']}\n\n"
for i, res in enumerate(results, 1):
article += f"## Chapter {i}\n{res}\n\n"
print(f"✍️ [Writer] Draft complete!")
return {"final_article": article}
Step 3: The Miracle-Witnessing Router (The Dispatch Router)
This is the soul of Fan-out. We do not connect the Planner and Researcher directly with an edge; instead, we use a conditional routing function that returns Send objects.
# ==========================================
# 3. Dynamic Dispatch Router
# ==========================================
def dispatch_researchers(state: AgencyState):
"""
Dispatcher: Dynamically creates the corresponding number of Researcher instances based on the sub-topics generated by the Planner.
The Dispatcher: Dynamically creates Researcher instances based on sub_topics.
"""
sub_topics = state.get("sub_topics", [])
# ⚠️ Core point: Create a Send object for each sub-topic
# Send("target_node_name", State Payload passed to that node)
sends = []
for topic in sub_topics:
sends.append(Send("researcher", {"sub_topic": topic}))
return sends
Step 4: Build and Run the Graph (Build & Run)
# ==========================================
# 4. Build the Graph
# ==========================================
builder = StateGraph(AgencyState)
# Add nodes
builder.add_node("planner", planner_node)
builder.add_node("researcher", researcher_node)
builder.add_node("writer", writer_node)
# Set control flow
builder.add_edge(START, "planner")
# After Planner finishes, trigger dynamic dispatch (Fan-out)
builder.add_conditional_edges(
"planner",
dispatch_researchers,
# Declare the nodes that Send might route to here
["researcher"]
)
# After all researcher nodes finish executing, flow uniformly to writer (Fan-in)
builder.add_edge("researcher", "writer")
builder.add_edge("writer", END)
# Compile the graph
graph = builder.compile()
# ==========================================
# 5. Test Run
# ==========================================
if __name__ == "__main__":
print("🚀 Starting AI Content Agency parallel workflow...\n")
initial_state = {
"main_topic": "Quantum Computing",
"research_results": [] # Initialize as an empty list
}
# Record start time
start_time = time.time()
# Run the Graph
final_state = graph.invoke(initial_state)
# Record end time
end_time = time.time()
print("\n" + "="*40)
print("🏆 Final Output Article Preview:")
print("="*40)
print(final_state["final_article"])
print("="*40)
print(f"⏱️ Total time elapsed: {end_time - start_time:.2f} seconds")
# If sequential, 3 topics sleeping for 2 seconds each would take at least 6 seconds.
# Because it is parallel, the total time should be just over 2 seconds!
When you run this code, you will see the following fascinating output printed in the console:
🚀 Starting AI Content Agency parallel workflow...
👨💼 [Planner] Breaking down main topic: 'Quantum Computing'...
👨💼 [Planner] Breakdown complete, generated 3 sub-topics. Ready to dispatch!
🔍 [Researcher] Task received, starting search for: 'Historical background of Quantum Computing'...
🔍 [Researcher] Task received, starting search for: 'Core technologies of Quantum Computing'...
🔍 [Researcher] Task received, starting search for: 'Future trends of Quantum Computing'...
✅ [Researcher] Search complete: 'Historical background of Quantum Computing'
✅ [Researcher] Search complete: 'Future trends of Quantum Computing'
✅ [Researcher] Search complete: 'Core technologies of Quantum Computing'
✍️ [Writer] Received all research materials, starting to write furiously...
✍️ [Writer] Received a total of 3 research reports.
✍️ [Writer] Draft complete!
========================================
🏆 Final Output Article Preview:
========================================
# In-Depth Analysis of Quantum Computing
## Chapter 1
[Research Report on 'Historical background of Quantum Computing']: Here is some highly valuable in-depth information...
## Chapter 2
[Research Report on 'Core technologies of Quantum Computing']: Here is some highly valuable in-depth information...
## Chapter 3
[Research Report on 'Future trends of Quantum Computing']: Here is some highly valuable in-depth information...
========================================
⏱️ Total time elapsed: 2.03 seconds
Everyone, did you see the final time elapsed? 2.03 seconds! 3 tasks taking 2 seconds each executed concurrently, and the total time is still 2 seconds. This is the absolute power of parallel divide and conquer. Your Agency's throughput has now reached industrial-grade levels.
Pitfalls and Avoidance Guide
As your mentor, I must not only teach you how to write code that runs but also tell you where you might bleed in a production environment. When using the parallel mechanism, there are three "massive pitfalls" you must guard against:
💣 Pitfall 1: Forgetting to use a Reducer, leading to ruthless State Overwrite
Symptom: Dispatched 5 Researchers, but the Writer only received one report in the end.
Diagnosis: When defining research_results: list[str] in TypedDict, Annotated[..., operator.add] was not added. LangGraph's default behavior is Overwrite. 5 parallel nodes simultaneously return {"research_results": [...]}, and the node that finishes last will completely wipe out the hard work of the previous 4.
Cure: Always remember to add a Reducer to the target field of a Fan-in.
💣 Pitfall 2: Concurrency Storm triggering API Rate Limit (Rate Limit 429 Error)
Symptom: The Planner breaks down 20 sub-topics and instantly dispatches 20 Researchers to request the OpenAI API, resulting in a direct HTTP 429 Too Many Requests denial of service from OpenAI.
Diagnosis: The Fan-out is extremely aggressive and lacks concurrency control.
Cure: In a production environment, if you don't know how many tasks the Planner will generate, never run unprotected. You should add a Retry with Exponential Backoff mechanism in the underlying logic calling the LLM, for example, by using the tenacity library; or limit the maximum concurrency (max_concurrency) in LangGraph's RunnableConfig.
💣 Pitfall 3: The Wooden Barrel Effect and Deadlock Risk (The Slowest Intern Effect)
Symptom: The total time elapsed is unusually long. Diagnosis: The characteristic of the Fan-in mechanism is that it must wait for all dispatched Send nodes to finish executing before moving to the next node (Writer). If Zhang and Li finish researching in 2 seconds, but Wang encounters network lag and gets stuck for 60 seconds, the entire system has to wait 60 seconds for Wang. Cure: Set strict Timeouts for the LLM calls or search tools in the Researcher nodes. If a Researcher times out, catch the exception and return a fallback result like "No data available for this topic at the moment", and absolutely never let the entire Graph hang.
📝 Episode Summary
Today, we equipped the "AI Content Agency" with a turbocharged engine for parallel processing.
We learned:
- Fan-out: Dynamically cloning nodes by returning a list of
Sendobjects through conditional edges. - Fan-in: Safely aggregating data generated by concurrent nodes using
Annotatedandoperator.add. - Performance Leap: Compressing the original $O(N)$ time complexity to approximately $O(1)$ when resources permit.
Now, our Planner strategizes, the Researchers advance side by side, and the Writer writes like a god. However, if you look closely at the generated article, you will find: if the data the Researchers bring back is garbage, the Writer will produce garbage (Garbage in, garbage out).
Is the agency still missing a gatekeeper? That's right. In the next episode (Episode 12), we will introduce the Human-in-the-loop mechanism. Before the Writer starts writing, or after the draft is submitted, we will let the real-world Editor-in-Chief (you) intervene in the workflow to approve, reject, or modify.
Let the Agents run wild, while keeping the steering wheel in human hands. See you next episode! 👋