Issue 12 | The Black Magic of Reducers: How to Safely Update Data Incrementally

Updated on 4/14/2026

Solving the Multi-Node Concurrent Array Writing Problem Using operator.add and Custom Aggregation Functions

Welcome back to our "LangGraph Multi-Agent Expert Course", future top AI architects! I am your old friend.

In the previous episode, we successfully got our "AI Content Agency" up and running. Our Planner (Editor-in-Chief) can break down tasks, hand them over to the Researcher to gather materials, and finally throw them to the Writer to draft the article.

However, I wonder if you encountered a blood-pressure-spiking paranormal event when running the code yourselves after class: "I clearly sent out 3 Researchers to gather materials from Google, Twitter, and Arxiv respectively, so why is the data finally passed to the Writer always from just 1 of them?! Were the fruits of the other 2's labor eaten by dogs?"

Congratulations, you've stepped into the most classic pitfall in multi-agent concurrent state management—State Overwrite.

In today's lesson, I will take you to uncover the most elegant, yet most easily overlooked black magic by beginners in LangGraph: Reducers. We will completely solve the problem of multi-node concurrent writing, allowing your agent team to learn "team collaboration" instead of "sabotaging each other."


🎯 Learning Objectives for this Episode

By the end of today's lesson, you will master the following hardcore skills:

  1. Understand the underlying logic of State updates: Figure out why LangGraph's default "Last Write Wins" mechanism leads to data loss.
  2. Master the use of built-in Reducers: Proficiently use typing.Annotated and operator.add to implement thread-safe incremental appending of lists.
  3. Write advanced custom Reducers: Hand-code aggregation functions that not only merge data but also perform data cleaning tasks like "deduplication" and "null value filtering" during the merge.
  4. Refactor the Agency's concurrent search flow: Upgrade our Researcher to a multi-threaded concurrent search architecture, laying a solid data foundation for the complex drafting in the next step.

📖 Principle Analysis

1. Why is your data lost? (The Overwrite Problem)

In LangGraph, the soul of the entire system is the State. You can imagine the State as a "shared whiteboard" passed among various Agents.

By default, when a node finishes executing and returns a dictionary, LangGraph takes this dictionary to update the whiteboard. Its default update logic is extremely simple and brutal: Overwrite.

Imagine this scenario: The Planner says: "Go check the latest progress on 'quantum computing'." So the system concurrently starts three Researchers:

  • Researcher_Google finds A, returning {"research_materials": ["A"]}
  • Researcher_Arxiv finds B, returning {"research_materials": ["B"]}
  • Researcher_Twitter finds C, returning {"research_materials": ["C"]}

Because they are executed concurrently, whoever slaps the result on the whiteboard last, only their result remains on the whiteboard. If the Twitter node finishes 1 millisecond later, the value of research_materials will be overwritten as ["C"]. A and B are completely wiped out.

This is the classic Race Condition.

2. The Black Magic of Reducers: From "Overwrite" to "Merge"

To solve this problem, LangGraph introduced the concept of Reducers. The essence of a Reducer is an interceptor/aggregator. It tells LangGraph: "Hey, when a node wants to update this field, don't overwrite it directly. Please use the method I provide to blend the new data and the old data together."

In Python, we use typing.Annotated to bind a Reducer to a State field.

Let's take a look at the core workflow of the AI Agency we are going to refactor today:

graph TD
    classDef planner fill:#f9f0ff,stroke:#d8b4e2,stroke-width:2px;
    classDef researcher fill:#e6f7ff,stroke:#91d5ff,stroke-width:2px;
    classDef writer fill:#f6ffed,stroke:#b7eb8f,stroke-width:2px;
    classDef state fill:#fffb8f,stroke:#d4b106,stroke-width:2px,stroke-dasharray: 5 5;

    START((Start)) --> Planner[Planner Agent
Generates multiple search keywords]:::planner Planner --> |Fan-out Concurrent| R1[Researcher: Google Search]:::researcher Planner --> |Fan-out Concurrent| R2[Researcher: Paper Retrieval]:::researcher Planner --> |Fan-out Concurrent| R3[Researcher: Social Media]:::researcher R1 -.-> |Returns Result A| State_Materials[(State: research_materials
Reducer: operator.add)]:::state R2 -.-> |Returns Result B| State_Materials R3 -.-> |Returns Result C| State_Materials State_Materials -.-> |Fan-in Aggregation: A+B+C| Writer[Writer Agent
Drafts article based on all materials]:::writer Writer --> END((End))

In the architecture above, when the concurrent Researchers return their results, the research_materials field with the Reducer acts like a reservoir, safely collecting A, B, and C all together, and finally feeding them as a complete list to the Writer.


💻 Practical Code Drill

Enough talk, show me the code. We will use Python to implement this AI Agency flow with concurrent Researchers.

Step 1: Environment and Basic Package Imports

import operator
from typing import Annotated, TypedDict, List, Any
from langgraph.graph import StateGraph, START, END

# Simulate LLM call latency
import time
import random

Step 2: Define the State with a Reducer (Core Focus!)

Pay attention! This is the core of this lesson. We are going to define an AgencyState.

# --- Custom Reducer Function ---
# Why not just use operator.add?
# Because operator.add will throw a TypeError when encountering None + List!
# In real business scenarios, a Researcher might fail and return None, so we need more robust merge logic.
def safe_merge_materials(left: List[str], right: List[str] | None) -> List[str]:
    """
    Custom Reducer: Safely merges lists with built-in deduplication.
    left: The existing data in the state
    right: The new data returned by the node
    """
    if not left:
        left = []
    if not right:
        return left
    
    # Merge and deduplicate, maintaining basic order (real deduplication might be more complex, this is a simple demonstration)
    merged = left + right
    # Use dict.fromkeys to deduplicate while maintaining order
    return list(dict.fromkeys(merged))

# --- Define the Graph State ---
class AgencyState(TypedDict):
    # Task topic (normal field, default overwrite)
    topic: str 
    
    # Specific search tasks assigned to Researchers (normal field, overwrite)
    search_tasks: List[str] 
    
    # Collected materials (Magic field! Use Annotated to bind the custom safe_merge_materials)
    # When any node returns {"research_materials": ["new data"]},
    # LangGraph will automatically execute: new_state = safe_merge_materials(old_state, "new data")
    research_materials: Annotated[List[str], safe_merge_materials]
    
    # Final article (normal field, overwrite)
    final_article: str

Instructor's Commentary: See the power of Annotated? Many beginners just write research_materials: List[str] and can't figure out for the life of them why data is lost during concurrency. Remember, in LangGraph, if you want incremental updates, you must use Annotated!

Step 3: Write Node Logic (Nodes)

Next, we implement the Planner, multiple Researchers, and the Writer. To let you see the execution order clearly, I've added detailed print statements in the code.

def planner_node(state: AgencyState):
    """
    Planner Node: Receives the main topic and breaks it down into multiple specific search tasks.
    """
    print(f"👨‍💼 [Planner] Breaking down topic: {state['topic']}")
    
    # Simulate LLM thinking process, hardcoding the breakdown results directly
    tasks = [
        f"Historical development of {state['topic']}",
        f"Latest technological breakthroughs in {state['topic']}",
        f"Commercial application cases of {state['topic']}"
    ]
    
    print(f"👨‍💼 [Planner] Breakdown complete, dispatching {len(tasks)} concurrent search tasks.")
    # Returning a normal overwrite-type field here
    return {"search_tasks": tasks}

def researcher_node(state: AgencyState, task: str):
    """
    Researcher Node: This is actually a node factory; we will dynamically generate nodes based on tasks.
    Note: In actual LangGraph, concurrency is usually combined with the Send API for dynamic routing.
    For the clarity of the Reducer concept in this episode, we use static concurrency for demonstration.
    """
    print(f"🔍 [Researcher] Starting search for subtask: {task}...")
    # Simulate network request latency
    time.sleep(random.uniform(0.5, 1.5))
    
    # Simulate searched data
    mock_result = f"In-depth report snippet about [{task}]"
    
    # Sometimes the search might fail (simulate empty data to test our safe_merge_materials)
    if random.random() < 0.1: 
        print(f"⚠️ [Researcher] Search for {task} failed, no materials found.")
        return {"research_materials": None}
        
    print(f"✅ [Researcher] Search complete: {task}")
    
    # Key point: Writing to the research_materials field.
    # Because a Reducer is bound, the list here will be appended to the original list, not overwritten!
    return {"research_materials": [mock_result]}

def writer_node(state: AgencyState):
    """
    Writer Node: Collects all materials and writes the final article.
    """
    materials = state.get("research_materials", [])
    print(f"\n✍️ [Writer] Received {len(materials)} research materials, starting to draft...")
    
    if not materials:
        article = "Sorry, unable to draft the article due to lack of materials."
    else:
        # Simulate drafting process
        content = "\n".join([f"- {m}" for m in materials])
        article = f"《In-depth Analysis of {state['topic']}》\n\nSynthesizing various sources, we draw the following conclusions:\n{content}\n\n(End of Article)"
        
    print(f"✍️ [Writer] Drafting complete!")
    return {"final_article": article}

Step 4: Assemble the Graph and Handle Concurrency

Here is an advanced LangGraph trick: How do you make nodes execute in parallel? The answer is: Start from the same node, connect multiple edges to different nodes, and then aggregate them back into one node.

def build_agency_graph():
    builder = StateGraph(AgencyState)
    
    # Add nodes
    builder.add_node("planner", planner_node)
    
    # To demonstrate concurrency, we manually create 3 fixed Researcher node instances
    # In real scenarios, you would use the Send() API for dynamic concurrency (we will cover this in later lessons)
    builder.add_node("researcher_1", lambda state: researcher_node(state, state["search_tasks"][0]))
    builder.add_node("researcher_2", lambda state: researcher_node(state, state["search_tasks"][1]))
    builder.add_node("researcher_3", lambda state: researcher_node(state, state["search_tasks"][2]))
    
    builder.add_node("writer", writer_node)
    
    # Define control flow (Edges)
    builder.add_edge(START, "planner")
    
    # Fan-out: After Planner finishes, trigger 3 Researchers simultaneously
    builder.add_edge("planner", "researcher_1")
    builder.add_edge("planner", "researcher_2")
    builder.add_edge("planner", "researcher_3")
    
    # Fan-in: Only trigger Writer after all 3 Researchers have finished
    # Note: LangGraph will automatically wait for all predecessor nodes to complete before executing the target node
    builder.add_edge(["researcher_1", "researcher_2", "researcher_3"], "writer")
    
    builder.add_edge("writer", END)
    
    return builder.compile()

# --- Simulate Run ---
if __name__ == "__main__":
    app = build_agency_graph()
    
    print("🚀 Starting AI Content Agency workflow...\n" + "="*40)
    
    initial_state = {"topic": "2024 Room-Temperature Superconductivity"}
    
    # Run the entire graph
    final_state = app.invoke(initial_state)
    
    print("\n" + "="*40)
    print("🎉 Final Delivered Result:\n")
    print(final_state["final_article"])
    print("="*40)

When you run this code, you will see output similar to this:

🚀 Starting AI Content Agency workflow...
========================================
👨‍💼 [Planner] Breaking down topic: 2024 Room-Temperature Superconductivity
👨‍💼 [Planner] Breakdown complete, dispatching 3 concurrent search tasks.
🔍 [Researcher] Starting search for subtask: Historical development of 2024 Room-Temperature Superconductivity...
🔍 [Researcher] Starting search for subtask: Latest technological breakthroughs in 2024 Room-Temperature Superconductivity...
🔍 [Researcher] Starting search for subtask: Commercial application cases of 2024 Room-Temperature Superconductivity...
✅ [Researcher] Search complete: Historical development of 2024 Room-Temperature Superconductivity
✅ [Researcher] Search complete: Commercial application cases of 2024 Room-Temperature Superconductivity
✅ [Researcher] Search complete: Latest technological breakthroughs in 2024 Room-Temperature Superconductivity

✍️ [Writer] Received 3 research materials, starting to draft...
✍️ [Writer] Drafting complete!

========================================
🎉 Final Delivered Result:

《In-depth Analysis of 2024 Room-Temperature Superconductivity》

Synthesizing various sources, we draw the following conclusions:
- In-depth report snippet about [Historical development of 2024 Room-Temperature Superconductivity]
- In-depth report snippet about [Commercial application cases of 2024 Room-Temperature Superconductivity]
- In-depth report snippet about [Latest technological breakthroughs in 2024 Room-Temperature Superconductivity]

(End of Article)
========================================

Look! 3 pieces of material, not a single one missing! This is the magic of Reducers.


Pitfalls and Avoidance Guide

As your mentor, I absolutely cannot just teach you how to write Demos. In actual production environments, Reducers are the most likely places to cause bugs. Here are 3 high-frequency pitfalls I've summarized; be sure to note them down:

💣 Pitfall 1: Blindly Trusting operator.add

Many tutorials (including early official documentation) will teach you to write Annotated[list, operator.add] directly. Danger! If a node returns {"research_materials": None} due to network anomalies, LLM hallucinations, or other reasons, operator.add(list, None) will directly throw a TypeError, causing the entire graph to crash. Avoidance Rule: Always write a safe_merge wrapper function like my code above to handle None cases properly.

💣 Pitfall 2: In-place Mutation in Reducers

If you write code like left.append(right) or left.extend(right) in a custom Reducer and then return left, you will encounter extremely bizarre phantom bugs in certain concurrent or Time Travel scenarios. Avoidance Rule: A Reducer must be a Pure Function! Always return a new object. Using left + right creates a new list, which is safe; using append is dangerous.

💣 Pitfall 3: Thinking Reducers are Only Effective for Concurrency

This is a common misconception. Reducers do not exist solely for concurrency. Even if you have a linear flow (Node A -> Node B -> Node C), if you want each node to append logs or message records (like a messages list) to a list, you must also use a Reducer. Without a Reducer, the data from the subsequent node will always overwrite the previous one.


📝 Episode Summary

Today, we delved into the underlying logic of State updates in LangGraph.

  1. We understood that a State without magic is just a ruthless "overwrite machine."
  2. We introduced Annotated and Reducers, endowing the State with the ability to update incrementally.
  3. We hand-coded a safe list merge function, perfectly solving the data loss problem caused by multiple Researchers concurrently gathering materials in the AI Agency.

Now, our AI Content Agency possesses powerful multi-threaded concurrent information gathering capabilities.

However, observant students might have noticed: our concurrency today is "static," meaning the Planner must fixedly assign 3 tasks to 3 hardcoded nodes. What if the Planner dynamically decides to gather information in 5 or 10 directions?

In the next episode, "Episode 13 | The Dimensional Strike of the Send API: How to Implement True Dynamic Concurrent Routing", I will take you to unlock LangGraph's ultimate concurrency weapon, giving your Agent true dynamic scaling capabilities!

Architects, remember to type out the code. See you in the next episode! 🚀