第 07 期 | 条件边缘 (Conditional Edges):工作流的"分叉判决器"

更新于 2026/4/14

🎯 本期学习目标

同学们好!我是你们的 AI 技术导师。今天,我们不聊虚的,直接来点硬核的——LangGraph 的条件边缘 (Conditional Edges)。这是我们「AI 万能内容创作机构」实现真正智能、动态工作流的关键一步。学完这一期,你将:

  1. 深入理解条件边缘的核心机制:掌握 LangGraph 如何通过条件逻辑,让你的多智能体工作流像一个有大脑的交通枢纽,根据实时情况做出决策。
  2. 掌握 LLM 意图识别与动态分支设计:学会如何利用大型语言模型(LLM)的输出作为决策依据,实现工作流的智能分叉,避免无谓的计算和资源消耗。
  3. 为 Agency 项目引入“智能判断”能力:我们将具体改造我们的 Planner 智能体,让它不再是简单的“瀑布流”指挥官,而是能根据内容需求智能地分派任务,比如:只有当需要工具时才调用,不需要时直接进入下一步。
  4. 提升工作流的效率与灵活性:通过实战演练,你将亲手构建一个能够根据输入动态调整执行路径的系统,让你的 AI 内容机构更加高效、响应迅速。

📖 原理解析

在之前的课程中,我们的 LangGraph 工作流大多是线性的,或者在固定的节点之间跳转。这就像一条单行道,或者只有几个固定的岔路口。但在现实世界中,尤其是像我们「AI 万能内容创作机构」这样的复杂系统,需求是千变万化的。一篇短微博和一篇深度研究报告,它们所需的工作流路径会一样吗?显然不能!

这就是 条件边缘 (Conditional Edges) 出场的时刻!

什么是条件边缘? 简单来说,条件边缘允许你在一个节点执行完毕后,根据该节点的输出或者当前的全局状态,动态地决定接下来应该跳转到哪个节点。它不是一个固定的“if-else”,而是一个可以有多个分支的“switch-case”,其分支的判断逻辑完全由你掌控。

想象一下,你的 Planner 智能体收到一个内容创作请求。

  • 如果请求是“写一篇关于 AI 伦理的深度报告”,Planner 可能会判断:“嗯,这需要先 Research,然后 Write,最后 Edit。”
  • 如果请求是“生成 5 个关于夏季穿搭的社交媒体标题”,Planner 可能会判断:“这不需要 Research,直接调用 TitleGeneratorTool,然后 Writer 稍作修改即可。”
  • 如果请求是“我只是想问问 AI 最近有什么新闻”,Planner 可能会判断:“这是一个简单的 Query,直接 __end__ 即可。”

看到没?同一个 Planner,根据不同的输入,做出了不同的“分叉判决”。这就是条件边缘的魔力!

工作原理拆解:

  1. 节点执行 (Node Execution):一个节点(例如 Planner)完成其任务,并产生一个输出。这个输出会更新我们的 GraphState
  2. 决策函数 (Decision Function):这是条件边缘的核心。你提供一个 Python 函数(在 LangGraph 中),这个函数会接收当前的 GraphState 作为输入。它的职责是根据 GraphState 中的信息(特别是上一个节点的输出),返回一个字符串,这个字符串就是下一个要跳转的节点名称,或者是特殊的 __end__ (表示工作流结束)。
  3. 边缘映射 (Edge Mapping):你还需要提供一个字典,将决策函数返回的字符串映射到实际的节点。例如,如果决策函数返回 "research",那么它就跳转到 researcher 节点;如果返回 "tool_call",就跳转到 tool_executor 节点。

LangGraph 中的实现:add_conditional_edges()

这个方法是构建动态分支的关键。它的签名大致是这样的: graph.add_conditional_edges(source_node, decision_function, edge_mapping)

  • source_node: 触发条件判断的节点。
  • decision_function: 一个 Python 函数,它接收 GraphState 并返回下一个节点名。
  • edge_mapping: 一个字典,将 decision_function 的返回值映射到实际的节点名。

Mermaid 图解:AI 内容机构的“分叉判决器”工作流

好了,光说不练假把式。我们用一个 Mermaid 图来直观地看看,我们的 Planner 智能体如何利用条件边缘,成为这个机构的“智能交通指挥官”。

graph TD
    A[用户请求] --> B(Planner 智能体)

    B -- LLM 意图识别 --> C{决策点:需要做什么?}

    C -- "需要工具调用" --> D[Tool Executor 工具执行器]
    D -- "工具结果" --> B

    C -- "需要深入研究" --> E[Researcher 智能体]
    E -- "研究结果" --> F[Writer 智能体]

    C -- "直接写作" --> F[Writer 智能体]

    C -- "任务完成/无需更多处理" --> G[__END__]

    F --> H[Editor 智能体]
    H --> G

图解说明:

  1. 用户请求 (A):一切的起点,用户向我们的 AI 内容机构提交一个内容创作需求。
  2. Planner 智能体 (B):这是我们的核心决策者。它接收用户请求,并利用其内部的 LLM 进行意图识别和规划。
  3. 决策点:需要做什么? (C):这是条件边缘发挥作用的地方。Planner 智能体根据 LLM 的输出,判断下一步应该怎么走。
    • "需要工具调用":如果 Planner 判断需要调用某个外部工具(比如关键词生成器、内容模板生成器),工作流会跳转到 Tool Executor (D)。
    • "需要深入研究":如果 Planner 判断这是一个需要大量事实核查或背景知识的内容(例如深度报告),工作流会跳转到 Researcher (E)。
    • "直接写作":如果 Planner 判断这是一个可以直接由 Writer 处理的任务(例如简单的社交媒体文案),工作流会直接跳转到 Writer (F)。
    • "任务完成/无需更多处理":如果 Planner 判断当前请求已经满足,或者是一个查询而非创作任务,工作流直接 __END__ (G)。
  4. Tool Executor 工具执行器 (D):负责执行 Planner 指定的工具。执行完毕后,工具的输出会返回给 Planner (B),形成一个 Agentic Loop,让 Planner 基于工具结果进行下一步判断。
  5. Researcher 智能体 (E):执行研究任务。研究完成后,将结果传递给 Writer (F)。
  6. Writer 智能体 (F):根据研究结果或 Planner 的直接指示,进行内容创作。
  7. Editor 智能体 (H):对 Writer 的产出进行校对、润色。
  8. END (G):工作流的终点,表示任务完成。

通过这个结构,我们的 Planner 智能体不再是一个简单的“转发器”,而是一个真正的“分叉判决器”,能够根据实际情况,智能地调度资源,大大提升了整个系统的灵活性和效率。

💻 实战代码演练 (Agency项目中的具体应用)

现在,我们把理论落地到代码。我们将改造 Planner,让它能够根据 LLM 的输出,动态地决定是进行工具调用、研究,还是直接写作。

核心思路:

  1. 定义 GraphState:扩展我们的状态,以存储 LLM 的决策和工具调用信息。
  2. 模拟工具:为了演示,我们先定义一些简单的工具。
  3. 改造 Planner 节点:让 Planner 不仅生成计划,还能指示下一步的动作(next_action)。
  4. 实现决策函数:根据 Planner 的输出,决定下一个节点。
  5. 构建 LangGraph:使用 add_conditional_edges 搭建动态工作流。

我们将使用 Python 和 LangChain/LangGraph。

import operator
from typing import Annotated, List, Tuple, Union, Literal, TypedDict
from langchain_core.agents import AgentAction, AgentFinish, Tool
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END

# 确保你已经设置了 OPENAI_API_KEY 环境变量
# import os
# os.environ["OPENAI_API_KEY"] = "YOUR_API_KEY"

# --- 1. 定义 GraphState ---
# GraphState 是我们所有节点共享的“黑板”
class AgentState(TypedDict):
    """
    Represents the state of our content agency's workflow.
    代表我们内容机构工作流的状态。
    """
    input: str # 用户输入的原始请求 / The original user input request
    chat_history: Annotated[List[BaseMessage], operator.add] # 聊天历史,用于上下文 / Chat history for context
    agent_outcome: Union[AgentAction, AgentFinish, None] # 智能体的决策结果,可能是工具调用或最终答案 / Agent's decision, could be tool call or final answer
    intermediate_steps: Annotated[List[Tuple[AgentAction, str]], operator.add] # 工具调用及其结果 / Tool calls and their results
    next_action: Literal["tool_call", "research", "write", "end", "continue"] # Planner 决定下一步要做的动作 / Planner's decision on the next action
    research_result: str # 研究智能体产出的结果 / Result from the Researcher agent
    writer_output: str # 写作智能体产出的内容 / Content produced by the Writer agent
    editor_output: str # 编辑智能体产出的内容 / Content produced by the Editor agent

# --- 2. 模拟工具 ---
# 为了演示,我们创建几个简单的工具
def get_keywords(topic: str) -> str:
    """
    Generates a list of relevant keywords for a given topic.
    为给定主题生成相关关键词列表。
    """
    print(f"\n--- Calling Tool: get_keywords for '{topic}' ---")
    return f"Keywords for '{topic}': AI, Machine Learning, Deep Learning, Generative AI, LLMs"

def get_content_template(topic: str) -> str:
    """
    Provides a basic content structure template for a given topic.
    为给定主题提供一个基本内容结构模板。
    """
    print(f"\n--- Calling Tool: get_content_template for '{topic}' ---")
    return f"Template for '{topic}': Introduction, Main Points (3-5), Conclusion, Call to Action."

tools = [
    Tool(name="get_keywords", func=get_keywords, description="Useful for generating keywords related to a topic."),
    Tool(name="get_content_template", func=get_content_template, description="Useful for getting a content structure template for a topic."),
]

# --- 3. 定义 LLM 模型 ---
llm = ChatOpenAI(model="gpt-4o", temperature=0)

# --- 4. 定义智能体节点 ---

# 4.1 Planner 智能体 (Agent Node)
# Planner 现在不仅会规划,还会根据需求判断下一步的走向
class PlannerAgent:
    def __init__(self, llm: ChatOpenAI, tools: List[Tool]):
        self.llm = llm
        self.tools = tools
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """
            你是一个经验丰富的内容创作机构的规划师。你的任务是根据用户请求,决定下一步的最佳动作。
            可以采取的动作包括:
            - `tool_call`: 如果需要调用外部工具来获取信息或生成内容辅助。
            - `research`: 如果需要进行深入的背景研究。
            - `write`: 如果可以直接开始写作。
            - `end`: 如果任务已经完成,或者请求是一个简单的问题,不需要进一步创作。
            
            请严格以 JSON 格式输出你的决策,包含 'plan' (你的规划) 和 'next_action' (下一步动作)。
            如果 'next_action' 是 'tool_call',你还需要在 'tool_calls' 字段中包含工具调用的详细信息。
            
            可用的工具: {tool_names}
            
            示例输出 (需要工具):
            {{
                "plan": "用户请求生成关键词,需要调用 get_keywords 工具。",
                "next_action": "tool_call",
                "tool_calls": [
                    {{
                        "tool_name": "get_keywords",
                        "args": {{"topic": "AI在教育中的应用"}}
                    }}
                ]
            }}
            
            示例输出 (需要研究):
            {{
                "plan": "用户请求一篇关于量子计算的文章,需要进行深入研究。",
                "next_action": "research"
            }}
            
            示例输出 (直接写作):
            {{
                "plan": "用户请求一篇简短的社交媒体文案,可以直接开始写作。",
                "next_action": "write"
            }}
            
            示例输出 (结束):
            {{
                "plan": "用户只是问候,任务完成。",
                "next_action": "end"
            }}
            """),
            MessagesPlaceholder(variable_name="chat_history"),
            ("user", "{input}"),
            MessagesPlaceholder(variable_name="agent_outcome"), # 用于传递工具结果回来
            MessagesPlaceholder(variable_name="intermediate_steps")
        ])
        
        # 绑定工具
        self.runnable = self.prompt.partial(tool_names=", ".join([tool.name for tool in tools])) | llm.bind_tools(tools)

    def __call__(self, state: AgentState):
        print("\n--- Entering Planner Agent ---")
        current_input = state["input"]
        chat_history = state.get("chat_history", [])
        intermediate_steps = state.get("intermediate_steps", [])

        # 如果有工具结果,将其添加到聊天历史,让LLM知道
        if intermediate_steps:
            for action, observation in intermediate_steps:
                chat_history.append(AIMessage(content=f"Tool Call: {action.tool} with args {action.tool_input}"))
                chat_history.append(AIMessage(content=f"Tool Output: {observation}"))
            
        response = self.runnable.invoke({
            "input": current_input,
            "chat_history": chat_history,
            "intermediate_steps": intermediate_steps,
            "agent_outcome": state["agent_outcome"] # 用于传递上一个智能体的决策,比如工具执行器返回的AgentFinish
        })
        
        # 解析 LLM 的输出,决定 next_action
        # 注意:这里需要更健壮的JSON解析和错误处理
        try:
            parsed_response = response.tool_calls[0] if response.tool_calls else {}
            # Fallback for when LLM doesn't use tool_calls but direct JSON
            if not parsed_response and response.content:
                import json
                parsed_response = json.loads(response.content)

            next_action = parsed_response.get("next_action")
            tool_calls = parsed_response.get("tool_calls", [])

            # 如果LLM直接输出了工具调用,但next_action不是tool_call,我们纠正它
            if response.tool_calls and next_action != "tool_call":
                 next_action = "tool_call"
            elif not response.tool_calls and not next_action: # LLM可能直接回复文本
                next_action = "end" # 默认结束,或者可以设计一个"chat"节点
            
            print(f"Planner Decision: {next_action}")
            print(f"Planner Plan: {parsed_response.get('plan', 'No specific plan.')}")

            # 更新状态
            new_state = {
                "chat_history": chat_history + [HumanMessage(content=current_input), response],
                "next_action": next_action,
                "agent_outcome": response # 存储LLM的原始输出,方便后续处理工具调用
            }
            return new_state
        except Exception as e:
            print(f"Error parsing Planner output: {e}")
            print(f"LLM raw response: {response.content}")
            # 如果解析失败,默认结束或者进入错误处理流程
            return {"next_action": "end", "chat_history": chat_history + [HumanMessage(content=current_input), AIMessage(content=f"Error: {e}")]}

planner_agent = PlannerAgent(llm, tools)

# 4.2 Tool Executor 节点
# 负责执行 Planner 智能体指示的工具
class ToolExecutorAgent:
    def __init__(self, tools: List[Tool]):
        self.tools_map = {tool.name: tool for tool in tools}

    def __call__(self, state: AgentState):
        print("\n--- Entering Tool Executor Agent ---")
        tool_calls = state["agent_outcome"].tool_calls # 从 Planner 的输出中提取工具调用

        intermediate_steps = []
        for tool_call in tool_calls:
            tool_name = tool_call.name
            tool_args = tool_call.args
            
            if tool_name in self.tools_map:
                try:
                    tool_output = self.tools_map[tool_name].func(**tool_args)
                    intermediate_steps.append((AgentAction(tool=tool_name, tool_input=tool_args, log=""), tool_output))
                    print(f"Executed tool '{tool_name}' with args {tool_args}. Output: {tool_output}")
                except Exception as e:
                    error_msg = f"Error executing tool '{tool_name}': {e}"
                    intermediate_steps.append((AgentAction(tool=tool_name, tool_input=tool_args, log=""), error_msg))
                    print(error_msg)
            else:
                error_msg = f"Tool '{tool_name}' not found."
                intermediate_steps.append((AgentAction(tool=tool_name, tool_input=tool_args, log=""), error_msg))
                print(error_msg)
        
        # 清除 agent_outcome,因为工具执行器不是最终结果
        return {"intermediate_steps": intermediate_steps, "agent_outcome": None} 

tool_executor_agent = ToolExecutorAgent(tools)

# 4.3 Researcher 智能体 (简化版)
def researcher_node(state: AgentState):
    print("\n--- Entering Researcher Agent ---")
    current_input = state["input"]
    # 模拟研究过程
    research_content = f"Research on '{current_input}': Detailed findings and insights. This would typically involve web searches, database queries, etc."
    print(f"Research completed for: {current_input}")
    return {"research_result": research_content, "next_action": "write"} # 研究完成后,指示下一步写作

# 4.4 Writer 智能体 (简化版)
def writer_node(state: AgentState):
    print("\n--- Entering Writer Agent ---")
    current_input = state["input"]
    research_result = state.get("research_result", "No specific research provided.")
    # 模拟写作过程
    writing_content = f"Article Title: {current_input}\n\n" \
                      f"Based on research: {research_result}\n\n" \
                      f"Content: This is a beautifully written piece about {current_input}, incorporating all key findings and creative flair. " \
                      f"It aims to engage the audience and fulfill the content brief."
    print(f"Writing completed for: {current_input}")
    return {"writer_output": writing_content, "next_action": "edit"} # 写作完成后,指示下一步编辑

# 4.5 Editor 智能体 (简化版)
def editor_node(state: AgentState):
    print("\n--- Entering Editor Agent ---")
    writer_output = state["writer_output"]
    # 模拟编辑过程
    edited_content = f"--- Edited Version ---\n{writer_output}\n\n" \
                     f"Editor's notes: Checked grammar, improved flow, added a stronger call to action. Content is now polished and ready."
    print(f"Editing completed for the written content.")
    return {"editor_output": edited_content, "next_action": "end"} # 编辑完成后,指示结束

# --- 5. 决策函数:LangGraph 的“分叉判决器” ---
# 这个函数根据 Planner 智能体输出的 `next_action` 来决定下一个节点
def decide_next_step(state: AgentState) -> str:
    """
    Decides the next node based on the Planner's `next_action` or the current state.
    根据 Planner 的 `next_action` 或当前状态决定下一个节点。
    """
    next_action = state["next_action"]
    print(f"\n--- Decision Point: Planner recommended '{next_action}' ---")
    
    if next_action == "tool_call":
        return "tool_executor"
    elif next_action == "research":
        return "researcher"
    elif next_action == "write":
        return "writer"
    elif next_action == "edit": # 当 Writer 完成后,会设置 next_action 为 'edit'
        return "editor"
    elif next_action == "end":
        return END
    else:
        # 默认处理,例如错误或未知动作
        print(f"Warning: Unknown next_action '{next_action}'. Ending workflow.")
        return END

# --- 6. 构建 LangGraph ---
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("planner", planner_agent)
workflow.add_node("tool_executor", tool_executor_agent)
workflow.add_node("researcher", researcher_node)
workflow.add_node("writer", writer_node)
workflow.add_node("editor", editor_node)

# 设置入口点
workflow.set_entry_point("planner")

# 添加条件边缘 - 这是本期的核心!
workflow.add_conditional_edges(
    "planner", # 当 planner 节点执行完毕后,调用 decide_next_step 函数
    decide_next_step,
    {
        "tool_call": "tool_executor", # 如果 decide_next_step 返回 "tool_call",跳转到 tool_executor
        "research": "researcher",     # 如果 decide_next_step 返回 "research",跳转到 researcher
        "write": "writer",            # 如果 decide_next_step 返回 "write",跳转到 writer
        END: END                      # 如果 decide_next_step 返回 END,则结束
    }
)

# 添加普通边缘
# tool_executor 执行完工具后,通常需要回到 planner 重新评估(Agentic Loop)
workflow.add_edge("tool_executor", "planner") 

# researcher 完成后,通常会进入 writer
workflow.add_edge("researcher", "writer")

# writer 完成后,通常会进入 editor (通过 writer_node 内部的 next_action='edit' 触发)
# 这里我们手动添加,因为 writer_node 只是一个函数,它不能直接设置条件边缘
# 而是它的返回值会更新 state,然后由另一个节点(例如一个统一的决策节点)来判断
# 为了简化,我们让 writer 节点直接返回 "next_action": "edit",然后我们在这里添加一个从 writer 到 editor 的普通边缘
# 更严谨的做法是 writer 节点也回到一个通用决策点
workflow.add_conditional_edges(
    "writer",
    decide_next_step, # writer 节点执行完毕后,调用 decide_next_step 函数
    {
        "edit": "editor",
        END: END
    }
)

# editor 完成后,结束
workflow.add_edge("editor", END)


# 编译工作流
app = workflow.compile()

# --- 7. 运行工作流 Demo ---

print("\n--- Demo 1: 需要工具调用的请求 (获取关键词) ---")
inputs_1 = {"input": "请帮我生成关于'AI在医疗领域的应用'的关键词。", "chat_history": []}
for s in app.stream(inputs_1):
    print(s)
# 预期流程: Planner -> Tool Executor -> Planner (agentic loop) -> END (因为工具调用后 Planner 可能会认为任务完成)


print("\n\n--- Demo 2: 需要深入研究的请求 (撰写深度文章) ---")
inputs_2 = {"input": "请撰写一篇关于'量子计算的未来发展'的深度文章。", "chat_history": []}
for s in app.stream(inputs_2):
    print(s)
# 预期流程: Planner -> Researcher -> Writer -> Editor -> END


print("\n\n--- Demo 3: 直接写作的请求 (简短社交媒体文案) ---")
inputs_3 = {"input": "为夏季促销活动写一个简短的社交媒体文案。", "chat_history": []}
for s in app.stream(inputs_3):
    print(s)
# 预期流程: Planner -> Writer -> Editor -> END

print("\n\n--- Demo 4: 简单问询 (直接结束) ---")
inputs_4 = {"input": "你好,AI内容机构!", "chat_history": []}
for s in app.stream(inputs_4):
    print(s)
# 预期流程: Planner -> END (Planner 判断无需创作,直接结束)

代码解析:

  1. AgentState 扩展:我们增加了 next_action, research_result, writer_output, editor_output 等字段,这些是智能体之间传递信息和决策的关键。
  2. PlannerAgent 改造
    • 它的 Prompt 被重新设计,明确要求 LLM 输出 JSON 格式,其中必须包含 next_action 字段,用于指导工作流的走向。
    • 如果 next_actiontool_call,它还会期望 tool_calls 字段。
    • 我们使用 llm.bind_tools(tools) 让 LLM 知道可用的工具,这样它就能在 response.tool_calls 中直接生成工具调用。
    • __call__ 方法现在负责解析 LLM 的输出,并更新 state["next_action"]
  3. ToolExecutorAgent:负责执行 Planner 智能体指定的工具。执行结果会通过 intermediate_steps 返回,并让工作流回到 Planner,形成一个Agentic Loop,这是高级智能体模式的常见设计。
  4. researcher_node, writer_node, editor_node:这些是简化的智能体节点,它们模拟了各自的功能,并在完成任务后,通过更新 state["next_action"] 来暗示下一步的推荐动作。
  5. decide_next_step 函数:这是条件边缘的核心决策函数。它接收当前的 AgentState,并根据 state["next_action"] 的值,返回下一个要跳转的节点名称(或 END)。
  6. add_conditional_edges()
    • 我们为 planner 节点添加了条件边缘。这意味着每次 planner 节点执行完毕,都会调用 decide_next_step 函数来决定下一步去哪里。
    • 注意,我们还为 writer 节点添加了条件边缘,使其完成写作后,也能根据其返回的