Issue 07 | Conditional Edges: The "Branch Decider" of Workflows
🎯 Learning Objectives
Hello everyone! I am your AI technical mentor. Today, we're skipping the fluff and diving straight into the hardcore stuff—LangGraph's Conditional Edges. This is a crucial step for our "AI Universal Content Creation Agency" to achieve truly intelligent, dynamic workflows. After completing this issue, you will:
- Deeply understand the core mechanism of conditional edges: Master how LangGraph uses conditional logic to make your multi-agent workflow act like a traffic hub with a brain, making decisions based on real-time situations.
- Master LLM intent recognition and dynamic branch design: Learn how to use the output of Large Language Models (LLMs) as a basis for decision-making to achieve intelligent workflow branching, avoiding unnecessary computation and resource consumption.
- Introduce "intelligent judgment" capabilities to the Agency project: We will specifically revamp our
Planneragent so that it is no longer a simple "waterfall" commander, but can intelligently assign tasks based on content needs. For example: only calling tools when necessary, and proceeding directly to the next step when not. - Improve workflow efficiency and flexibility: Through practical exercises, you will build a system capable of dynamically adjusting execution paths based on input, making your AI content agency more efficient and responsive.
📖 Concept Explanation
In previous lessons, our LangGraph workflows were mostly linear or jumped between fixed nodes. This is like a one-way street or having only a few fixed intersections. But in the real world, especially in complex systems like our "AI Universal Content Creation Agency", requirements are constantly changing. Would a short tweet and an in-depth research report require the same workflow path? Obviously not!
This is where Conditional Edges come into play!
What are conditional edges? Simply put, conditional edges allow you to dynamically determine which node to jump to next after a node finishes executing, based on that node's output or the current global state. It is not a fixed "if-else", but a "switch-case" that can have multiple branches, and the judgment logic of its branches is completely under your control.
Imagine your Planner agent receives a content creation request.
- If the request is "Write an in-depth report on AI ethics", the
Plannermight decide: "Hmm, this requiresResearchfirst, thenWrite, and finallyEdit." - If the request is "Generate 5 social media titles about summer outfits", the
Plannermight decide: "This doesn't needResearch, just call theTitleGeneratorTooldirectly, and then theWritercan make minor modifications." - If the request is "I just want to ask AI what the latest news is", the
Plannermight decide: "This is a simpleQuery, just go straight to__end__."
See that? The same Planner makes different "forking decisions" based on different inputs. This is the magic of conditional edges!
Breakdown of how it works:
- Node Execution: A node (e.g.,
Planner) completes its task and produces an output. This output updates ourGraphState. - Decision Function: This is the core of the conditional edge. You provide a Python function (in LangGraph) that takes the current
GraphStateas input. Its responsibility is to return a string based on the information in theGraphState(especially the output of the previous node). This string is the name of the next node to jump to, or the special__end__(indicating the end of the workflow). - Edge Mapping: You also need to provide a dictionary that maps the string returned by the decision function to the actual node. For example, if the decision function returns
"research", it jumps to theresearchernode; if it returns"tool_call", it jumps to thetool_executornode.
Implementation in LangGraph: add_conditional_edges()
This method is the key to building dynamic branches. Its signature looks roughly like this:
graph.add_conditional_edges(source_node, decision_function, edge_mapping)
source_node: The node that triggers the conditional judgment.decision_function: A Python function that receives theGraphStateand returns the next node name.edge_mapping: A dictionary mapping the return value of thedecision_functionto the actual node name.
Mermaid Diagram: The "Forking Decider" Workflow of the AI Content Agency
Alright, enough talk, let's see it in action. We'll use a Mermaid diagram to visually see how our Planner agent uses conditional edges to become the "intelligent traffic commander" of this agency.
graph TD
A[User Request] --> B(Planner Agent)
B -- LLM Intent Recognition --> C{Decision Point: What needs to be done?}
C -- "Needs Tool Call" --> D[Tool Executor]
D -- "Tool Result" --> B
C -- "Needs In-depth Research" --> E[Researcher Agent]
E -- "Research Result" --> F[Writer Agent]
C -- "Direct Writing" --> F[Writer Agent]
C -- "Task Completed / No Further Processing Needed" --> G[__END__]
F --> H[Editor Agent]
H --> GDiagram Explanation:
- User Request (A): The starting point of everything. The user submits a content creation request to our AI content agency.
- Planner Agent (B): This is our core decision-maker. It receives the user request and uses its internal LLM for intent recognition and planning.
- Decision Point: What needs to be done? (C): This is where conditional edges come into play. The
Planneragent determines the next step based on the LLM's output.- "Needs Tool Call": If the
Plannerdetermines that an external tool needs to be called (e.g., keyword generator, content template generator), the workflow jumps to theTool Executor(D). - "Needs In-depth Research": If the
Plannerdetermines that this is content requiring extensive fact-checking or background knowledge (e.g., an in-depth report), the workflow jumps to theResearcher(E). - "Direct Writing": If the
Plannerdetermines that this is a task that can be handled directly by theWriter(e.g., simple social media copy), the workflow jumps directly to theWriter(F). - "Task Completed / No Further Processing Needed": If the
Plannerdetermines that the current request has been satisfied, or it is a query rather than a creation task, the workflow goes straight to__END__(G).
- "Needs Tool Call": If the
- Tool Executor (D): Responsible for executing the tools specified by the
Planner. After execution, the tool's output is returned to thePlanner(B), forming an Agentic Loop, allowing thePlannerto make the next judgment based on the tool results. - Researcher Agent (E): Executes research tasks. Once the research is complete, it passes the results to the
Writer(F). - Writer Agent (F): Creates content based on the research results or direct instructions from the
Planner. - Editor Agent (H): Proofreads and polishes the output of the
Writer. - END (G): The end point of the workflow, indicating task completion.
Through this structure, our Planner agent is no longer a simple "forwarder", but a true "forking decider" capable of intelligently scheduling resources based on actual situations, greatly improving the flexibility and efficiency of the entire system.
💻 Practical Code Drill (Specific Application in the Agency Project)
Now, let's put the theory into code. We will revamp the Planner so that it can dynamically decide whether to make tool calls, conduct research, or write directly based on the LLM's output.
Core Idea:
- Define GraphState: Extend our state to store LLM decisions and tool call information.
- Simulate Tools: For demonstration purposes, we will define some simple tools first.
- Revamp Planner Node: Make the
Plannernot only generate plans but also indicate the next action (next_action). - Implement Decision Function: Decide the next node based on the
Planner's output. - Build LangGraph: Use
add_conditional_edgesto build a dynamic workflow.
We will use Python and LangChain/LangGraph.
import operator
from typing import Annotated, List, Tuple, Union, Literal, TypedDict
from langchain_core.agents import AgentAction, AgentFinish, Tool
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
# Ensure you have set the OPENAI_API_KEY environment variable
# import os
# os.environ["OPENAI_API_KEY"] = "YOUR_API_KEY"
# --- 1. Define GraphState ---
# GraphState is the "blackboard" shared by all our nodes
class AgentState(TypedDict):
"""
Represents the state of our content agency's workflow.
"""
input: str # The original user input request
chat_history: Annotated[List[BaseMessage], operator.add] # Chat history for context
agent_outcome: Union[AgentAction, AgentFinish, None] # Agent's decision, could be tool call or final answer
intermediate_steps: Annotated[List[Tuple[AgentAction, str]], operator.add] # Tool calls and their results
next_action: Literal["tool_call", "research", "write", "end", "continue"] # Planner's decision on the next action
research_result: str # Result from the Researcher agent
writer_output: str # Content produced by the Writer agent
editor_output: str # Content produced by the Editor agent
# --- 2. Simulate Tools ---
# For demonstration, we create a few simple tools
def get_keywords(topic: str) -> str:
"""
Generates a list of relevant keywords for a given topic.
"""
print(f"\n--- Calling Tool: get_keywords for '{topic}' ---")
return f"Keywords for '{topic}': AI, Machine Learning, Deep Learning, Generative AI, LLMs"
def get_content_template(topic: str) -> str:
"""
Provides a basic content structure template for a given topic.
"""
print(f"\n--- Calling Tool: get_content_template for '{topic}' ---")
return f"Template for '{topic}': Introduction, Main Points (3-5), Conclusion, Call to Action."
tools = [
Tool(name="get_keywords", func=get_keywords, description="Useful for generating keywords related to a topic."),
Tool(name="get_content_template", func=get_content_template, description="Useful for getting a content structure template for a topic."),
]
# --- 3. Define LLM Model ---
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# --- 4. Define Agent Nodes ---
# 4.1 Planner Agent (Agent Node)
# Planner now not only plans but also determines the next direction based on needs
class PlannerAgent:
def __init__(self, llm: ChatOpenAI, tools: List[Tool]):
self.llm = llm
self.tools = tools
self.prompt = ChatPromptTemplate.from_messages([
("system", """
You are an experienced planner at a content creation agency. Your task is to determine the best next action based on the user's request.
Possible actions include:
- `tool_call`: If external tools need to be called to obtain information or assist in content generation.
- `research`: If in-depth background research is required.
- `write`: If writing can start directly.
- `end`: If the task is completed, or the request is a simple question that does not require further creation.
Please strictly output your decision in JSON format, including 'plan' (your plan) and 'next_action' (the next action).
If 'next_action' is 'tool_call', you also need to include the tool call details in the 'tool_calls' field.
Available tools: {tool_names}
Example Output (Needs Tool):
{{
"plan": "User requested keyword generation, need to call the get_keywords tool.",
"next_action": "tool_call",
"tool_calls": [
{{
"tool_name": "get_keywords",
"args": {{"topic": "Application of AI in Education"}}
}}
]
}}
Example Output (Needs Research):
{{
"plan": "User requested an article on quantum computing, in-depth research is needed.",
"next_action": "research"
}}
Example Output (Direct Writing):
{{
"plan": "User requested a short social media copy, can start writing directly.",
"next_action": "write"
}}
Example Output (End):
{{
"plan": "User is just greeting, task completed.",
"next_action": "end"
}}
"""),
MessagesPlaceholder(variable_name="chat_history"),
("user", "{input}"),
MessagesPlaceholder(variable_name="agent_outcome"), # Used to pass tool results back
MessagesPlaceholder(variable_name="intermediate_steps")
])
# Bind tools
self.runnable = self.prompt.partial(tool_names=", ".join([tool.name for tool in tools])) | llm.bind_tools(tools)
def __call__(self, state: AgentState):
print("\n--- Entering Planner Agent ---")
current_input = state["input"]
chat_history = state.get("chat_history", [])
intermediate_steps = state.get("intermediate_steps", [])
# If there are tool results, add them to the chat history so the LLM knows
if intermediate_steps:
for action, observation in intermediate_steps:
chat_history.append(AIMessage(content=f"Tool Call: {action.tool} with args {action.tool_input}"))
chat_history.append(AIMessage(content=f"Tool Output: {observation}"))
response = self.runnable.invoke({
"input": current_input,
"chat_history": chat_history,
"intermediate_steps": intermediate_steps,
"agent_outcome": state["agent_outcome"] # Used to pass the decision of the previous agent, such as AgentFinish returned by the tool executor
})
# Parse the LLM's output to determine next_action
# Note: More robust JSON parsing and error handling are needed here
try:
parsed_response = response.tool_calls[0] if response.tool_calls else {}
# Fallback for when LLM doesn't use tool_calls but direct JSON
if not parsed_response and response.content:
import json
parsed_response = json.loads(response.content)
next_action = parsed_response.get("next_action")
tool_calls = parsed_response.get("tool_calls", [])
# If the LLM directly outputs a tool call, but next_action is not tool_call, we correct it
if response.tool_calls and next_action != "tool_call":
next_action = "tool_call"
elif not response.tool_calls and not next_action: # LLM might reply with text directly
next_action = "end" # Default to end, or a "chat" node could be designed
print(f"Planner Decision: {next_action}")
print(f"Planner Plan: {parsed_response.get('plan', 'No specific plan.')}")
# Update state
new_state = {
"chat_history": chat_history + [HumanMessage(content=current_input), response],
"next_action": next_action,
"agent_outcome": response # Store the raw output of the LLM for easier subsequent processing of tool calls
}
return new_state
except Exception as e:
print(f"Error parsing Planner output: {e}")
print(f"LLM raw response: {response.content}")
# If parsing fails, default to end or enter an error handling flow
return {"next_action": "end", "chat_history": chat_history + [HumanMessage(content=current_input), AIMessage(content=f"Error: {e}")]}
planner_agent = PlannerAgent(llm, tools)
# 4.2 Tool Executor Node
# Responsible for executing the tools indicated by the Planner agent
class ToolExecutorAgent:
def __init__(self, tools: List[Tool]):
self.tools_map = {tool.name: tool for tool in tools}
def __call__(self, state: AgentState):
print("\n--- Entering Tool Executor Agent ---")
tool_calls = state["agent_outcome"].tool_calls # Extract tool calls from the Planner's output
intermediate_steps = []
for tool_call in tool_calls:
tool_name = tool_call.name
tool_args = tool_call.args
if tool_name in self.tools_map:
try:
tool_output = self.tools_map[tool_name].func(**tool_args)
intermediate_steps.append((AgentAction(tool=tool_name, tool_input=tool_args, log=""), tool_output))
print(f"Executed tool '{tool_name}' with args {tool_args}. Output: {tool_output}")
except Exception as e:
error_msg = f"Error executing tool '{tool_name}': {e}"
intermediate_steps.append((AgentAction(tool=tool_name, tool_input=tool_args, log=""), error_msg))
print(error_msg)
else:
error_msg = f"Tool '{tool_name}' not found."
intermediate_steps.append((AgentAction(tool=tool_name, tool_input=tool_args, log=""), error_msg))
print(error_msg)
# Clear agent_outcome because the tool executor is not the final result
return {"intermediate_steps": intermediate_steps, "agent_outcome": None}
tool_executor_agent = ToolExecutorAgent(tools)
# 4.3 Researcher Agent (Simplified Version)
def researcher_node(state: AgentState):
print("\n--- Entering Researcher Agent ---")
current_input = state["input"]
# Simulate the research process
research_content = f"Research on '{current_input}': Detailed findings and insights. This would typically involve web searches, database queries, etc."
print(f"Research completed for: {current_input}")
return {"research_result": research_content, "next_action": "write"} # After research is complete, indicate writing as the next step
# 4.4 Writer Agent (Simplified Version)
def writer_node(state: AgentState):
print("\n--- Entering Writer Agent ---")
current_input = state["input"]
research_result = state.get("research_result", "No specific research provided.")
# Simulate the writing process
writing_content = f"Article Title: {current_input}\n\n" \
f"Based on research: {research_result}\n\n" \
f"Content: This is a beautifully written piece about {current_input}, incorporating all key findings and creative flair. " \
f"It aims to engage the audience and fulfill the content brief."
print(f"Writing completed for: {current_input}")
return {"writer_output": writing_content, "next_action": "edit"} # After writing is complete, indicate editing as the next step
# 4.5 Editor Agent (Simplified Version)
def editor_node(state: AgentState):
print("\n--- Entering Editor Agent ---")
writer_output = state["writer_output"]
# Simulate the editing process
edited_content = f"--- Edited Version ---\n{writer_output}\n\n" \
f"Editor's notes: Checked grammar, improved flow, added a stronger call to action. Content is now polished and ready."
print(f"Editing completed for the written content.")
return {"editor_output": edited_content, "next_action": "end"} # After editing is complete, indicate end
# --- 5. Decision Function: LangGraph's "Forking Decider" ---
# This function decides the next node based on the `next_action` output by the Planner agent
def decide_next_step(state: AgentState) -> str:
"""
Decides the next node based on the Planner's `next_action` or the current state.
"""
next_action = state["next_action"]
print(f"\n--- Decision Point: Planner recommended '{next_action}' ---")
if next_action == "tool_call":
return "tool_executor"
elif next_action == "research":
return "researcher"
elif next_action == "write":
return "writer"
elif next_action == "edit": # When Writer finishes, it sets next_action to 'edit'
return "editor"
elif next_action == "end":
return END
else:
# Default handling, e.g., for errors or unknown actions
print(f"Warning: Unknown next_action '{next_action}'. Ending workflow.")
return END
# --- 6. Build LangGraph ---
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("planner", planner_agent)
workflow.add_node("tool_executor", tool_executor_agent)
workflow.add_node("researcher", researcher_node)
workflow.add_node("writer", writer_node)
workflow.add_node("editor", editor_node)
# Set entry point
workflow.set_entry_point("planner")
# Add conditional edges - This is the core of this issue!
workflow.add_conditional_edges(
"planner", # After the planner node finishes executing, call the decide_next_step function
decide_next_step,
{
"tool_call": "tool_executor", # If decide_next_step returns "tool_call", jump to tool_executor
"research": "researcher", # If decide_next_step returns "research", jump to researcher
"write": "writer", # If decide_next_step returns "write", jump to writer
END: END # If decide_next_step returns END, then end
}
)
# Add normal edges
# After tool_executor finishes executing tools, it usually needs to return to planner for re-evaluation (Agentic Loop)
workflow.add_edge("tool_executor", "planner")
# After researcher finishes, it usually goes to writer
workflow.add_edge("researcher", "writer")
# After writer finishes, it usually goes to editor (triggered by next_action='edit' inside writer_node)
# We add this manually here because writer_node is just a function and cannot set conditional edges directly
# Instead, its return value updates the state, which is then judged by another node (e.g., a unified decision node)
# For simplicity, we make the writer node directly return "next_action": "edit", and then we add a normal edge from writer to editor here
# A more rigorous approach would be for the writer node to also return to a general decision point
workflow.add_conditional_edges(
"writer",
decide_next_step, # After the writer node finishes executing, call the decide_next_step function
{
"edit": "editor",
END: END
}
)
# After editor finishes, end
workflow.add_edge("editor", END)
# Compile workflow
app = workflow.compile()
# --- 7. Run Workflow Demo ---
print("\n--- Demo 1: Request requiring tool call (Get keywords) ---")
inputs_1 = {"input": "Please help me generate keywords about 'Application of AI in Healthcare'.", "chat_history": []}
for s in app.stream(inputs_1):
print(s)
# Expected flow: Planner -> Tool Executor -> Planner (agentic loop) -> END (because after the tool call, Planner might consider the task completed)
print("\n\n--- Demo 2: Request requiring in-depth research (Write an in-depth article) ---")
inputs_2 = {"input": "Please write an in-depth article about 'The Future Development of Quantum Computing'.", "chat_history": []}
for s in app.stream(inputs_2):
print(s)
# Expected flow: Planner -> Researcher -> Writer -> Editor -> END
print("\n\n--- Demo 3: Request for direct writing (Short social media copy) ---")
inputs_3 = {"input": "Write a short social media copy for a summer promotional campaign.", "chat_history": []}
for s in app.stream(inputs_3):
print(s)
# Expected flow: Planner -> Writer -> Editor -> END
print("\n\n--- Demo 4: Simple inquiry (End directly) ---")
inputs_4 = {"input": "Hello, AI Content Agency!", "chat_history": []}
for s in app.stream(inputs_4):
print(s)
# Expected flow: Planner -> END (Planner determines no creation is needed, ends directly)
Code Analysis:
AgentStateExtension: We added fields likenext_action,research_result,writer_output, andeditor_output. These are key for passing information and decisions between agents.PlannerAgentRevamp:- Its Prompt has been redesigned to explicitly require the LLM to output in JSON format, which must include the
next_actionfield to guide the workflow's direction. - If
next_actionistool_call, it will also expect thetool_callsfield. - We use
llm.bind_tools(tools)to let the LLM know the available tools, so it can directly generate tool calls inresponse.tool_calls. - The
__call__method is now responsible for parsing the LLM's output and updatingstate["next_action"].
- Its Prompt has been redesigned to explicitly require the LLM to output in JSON format, which must include the
ToolExecutorAgent: Responsible for executing the tools specified by thePlanneragent. The execution results are returned viaintermediate_steps, and the workflow returns to thePlanner, forming an Agentic Loop, which is a common design in advanced agent patterns.researcher_node,writer_node,editor_node: These are simplified agent nodes that simulate their respective functions and, upon completing their tasks, imply the recommended next action by updatingstate["next_action"].decide_next_stepFunction: This is the core decision function for conditional edges. It receives the currentAgentStateand, based on the value ofstate["next_action"], returns the name of the next node to jump to (orEND).add_conditional_edges():- We added conditional edges to the
plannernode. This means that every time theplannernode finishes executing, thedecide_next_stepfunction is called to determine where to go next. - Note that we also added conditional edges to the
writernode, so that after it finishes writing, it can also based on its returned
- We added conditional edges to the