第 07 期 | 条件边缘 (Conditional Edges):工作流的"分叉判决器"
🎯 本期学习目标
同学们好!我是你们的 AI 技术导师。今天,我们不聊虚的,直接来点硬核的——LangGraph 的条件边缘 (Conditional Edges)。这是我们「AI 万能内容创作机构」实现真正智能、动态工作流的关键一步。学完这一期,你将:
- 深入理解条件边缘的核心机制:掌握 LangGraph 如何通过条件逻辑,让你的多智能体工作流像一个有大脑的交通枢纽,根据实时情况做出决策。
- 掌握 LLM 意图识别与动态分支设计:学会如何利用大型语言模型(LLM)的输出作为决策依据,实现工作流的智能分叉,避免无谓的计算和资源消耗。
- 为 Agency 项目引入“智能判断”能力:我们将具体改造我们的
Planner智能体,让它不再是简单的“瀑布流”指挥官,而是能根据内容需求智能地分派任务,比如:只有当需要工具时才调用,不需要时直接进入下一步。 - 提升工作流的效率与灵活性:通过实战演练,你将亲手构建一个能够根据输入动态调整执行路径的系统,让你的 AI 内容机构更加高效、响应迅速。
📖 原理解析
在之前的课程中,我们的 LangGraph 工作流大多是线性的,或者在固定的节点之间跳转。这就像一条单行道,或者只有几个固定的岔路口。但在现实世界中,尤其是像我们「AI 万能内容创作机构」这样的复杂系统,需求是千变万化的。一篇短微博和一篇深度研究报告,它们所需的工作流路径会一样吗?显然不能!
这就是 条件边缘 (Conditional Edges) 出场的时刻!
什么是条件边缘? 简单来说,条件边缘允许你在一个节点执行完毕后,根据该节点的输出或者当前的全局状态,动态地决定接下来应该跳转到哪个节点。它不是一个固定的“if-else”,而是一个可以有多个分支的“switch-case”,其分支的判断逻辑完全由你掌控。
想象一下,你的 Planner 智能体收到一个内容创作请求。
- 如果请求是“写一篇关于 AI 伦理的深度报告”,
Planner可能会判断:“嗯,这需要先Research,然后Write,最后Edit。” - 如果请求是“生成 5 个关于夏季穿搭的社交媒体标题”,
Planner可能会判断:“这不需要Research,直接调用TitleGeneratorTool,然后Writer稍作修改即可。” - 如果请求是“我只是想问问 AI 最近有什么新闻”,
Planner可能会判断:“这是一个简单的Query,直接__end__即可。”
看到没?同一个 Planner,根据不同的输入,做出了不同的“分叉判决”。这就是条件边缘的魔力!
工作原理拆解:
- 节点执行 (Node Execution):一个节点(例如
Planner)完成其任务,并产生一个输出。这个输出会更新我们的GraphState。 - 决策函数 (Decision Function):这是条件边缘的核心。你提供一个 Python 函数(在 LangGraph 中),这个函数会接收当前的
GraphState作为输入。它的职责是根据GraphState中的信息(特别是上一个节点的输出),返回一个字符串,这个字符串就是下一个要跳转的节点名称,或者是特殊的__end__(表示工作流结束)。 - 边缘映射 (Edge Mapping):你还需要提供一个字典,将决策函数返回的字符串映射到实际的节点。例如,如果决策函数返回
"research",那么它就跳转到researcher节点;如果返回"tool_call",就跳转到tool_executor节点。
LangGraph 中的实现:add_conditional_edges()
这个方法是构建动态分支的关键。它的签名大致是这样的:
graph.add_conditional_edges(source_node, decision_function, edge_mapping)
source_node: 触发条件判断的节点。decision_function: 一个 Python 函数,它接收GraphState并返回下一个节点名。edge_mapping: 一个字典,将decision_function的返回值映射到实际的节点名。
Mermaid 图解:AI 内容机构的“分叉判决器”工作流
好了,光说不练假把式。我们用一个 Mermaid 图来直观地看看,我们的 Planner 智能体如何利用条件边缘,成为这个机构的“智能交通指挥官”。
graph TD
A[用户请求] --> B(Planner 智能体)
B -- LLM 意图识别 --> C{决策点:需要做什么?}
C -- "需要工具调用" --> D[Tool Executor 工具执行器]
D -- "工具结果" --> B
C -- "需要深入研究" --> E[Researcher 智能体]
E -- "研究结果" --> F[Writer 智能体]
C -- "直接写作" --> F[Writer 智能体]
C -- "任务完成/无需更多处理" --> G[__END__]
F --> H[Editor 智能体]
H --> G图解说明:
- 用户请求 (A):一切的起点,用户向我们的 AI 内容机构提交一个内容创作需求。
- Planner 智能体 (B):这是我们的核心决策者。它接收用户请求,并利用其内部的 LLM 进行意图识别和规划。
- 决策点:需要做什么? (C):这是条件边缘发挥作用的地方。
Planner智能体根据 LLM 的输出,判断下一步应该怎么走。- "需要工具调用":如果
Planner判断需要调用某个外部工具(比如关键词生成器、内容模板生成器),工作流会跳转到Tool Executor(D)。 - "需要深入研究":如果
Planner判断这是一个需要大量事实核查或背景知识的内容(例如深度报告),工作流会跳转到Researcher(E)。 - "直接写作":如果
Planner判断这是一个可以直接由Writer处理的任务(例如简单的社交媒体文案),工作流会直接跳转到Writer(F)。 - "任务完成/无需更多处理":如果
Planner判断当前请求已经满足,或者是一个查询而非创作任务,工作流直接__END__(G)。
- "需要工具调用":如果
- Tool Executor 工具执行器 (D):负责执行
Planner指定的工具。执行完毕后,工具的输出会返回给Planner(B),形成一个 Agentic Loop,让Planner基于工具结果进行下一步判断。 - Researcher 智能体 (E):执行研究任务。研究完成后,将结果传递给
Writer(F)。 - Writer 智能体 (F):根据研究结果或
Planner的直接指示,进行内容创作。 - Editor 智能体 (H):对
Writer的产出进行校对、润色。 - END (G):工作流的终点,表示任务完成。
通过这个结构,我们的 Planner 智能体不再是一个简单的“转发器”,而是一个真正的“分叉判决器”,能够根据实际情况,智能地调度资源,大大提升了整个系统的灵活性和效率。
💻 实战代码演练 (Agency项目中的具体应用)
现在,我们把理论落地到代码。我们将改造 Planner,让它能够根据 LLM 的输出,动态地决定是进行工具调用、研究,还是直接写作。
核心思路:
- 定义 GraphState:扩展我们的状态,以存储 LLM 的决策和工具调用信息。
- 模拟工具:为了演示,我们先定义一些简单的工具。
- 改造 Planner 节点:让
Planner不仅生成计划,还能指示下一步的动作(next_action)。 - 实现决策函数:根据
Planner的输出,决定下一个节点。 - 构建 LangGraph:使用
add_conditional_edges搭建动态工作流。
我们将使用 Python 和 LangChain/LangGraph。
import operator
from typing import Annotated, List, Tuple, Union, Literal, TypedDict
from langchain_core.agents import AgentAction, AgentFinish, Tool
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
# 确保你已经设置了 OPENAI_API_KEY 环境变量
# import os
# os.environ["OPENAI_API_KEY"] = "YOUR_API_KEY"
# --- 1. 定义 GraphState ---
# GraphState 是我们所有节点共享的“黑板”
class AgentState(TypedDict):
"""
Represents the state of our content agency's workflow.
代表我们内容机构工作流的状态。
"""
input: str # 用户输入的原始请求 / The original user input request
chat_history: Annotated[List[BaseMessage], operator.add] # 聊天历史,用于上下文 / Chat history for context
agent_outcome: Union[AgentAction, AgentFinish, None] # 智能体的决策结果,可能是工具调用或最终答案 / Agent's decision, could be tool call or final answer
intermediate_steps: Annotated[List[Tuple[AgentAction, str]], operator.add] # 工具调用及其结果 / Tool calls and their results
next_action: Literal["tool_call", "research", "write", "end", "continue"] # Planner 决定下一步要做的动作 / Planner's decision on the next action
research_result: str # 研究智能体产出的结果 / Result from the Researcher agent
writer_output: str # 写作智能体产出的内容 / Content produced by the Writer agent
editor_output: str # 编辑智能体产出的内容 / Content produced by the Editor agent
# --- 2. 模拟工具 ---
# 为了演示,我们创建几个简单的工具
def get_keywords(topic: str) -> str:
"""
Generates a list of relevant keywords for a given topic.
为给定主题生成相关关键词列表。
"""
print(f"\n--- Calling Tool: get_keywords for '{topic}' ---")
return f"Keywords for '{topic}': AI, Machine Learning, Deep Learning, Generative AI, LLMs"
def get_content_template(topic: str) -> str:
"""
Provides a basic content structure template for a given topic.
为给定主题提供一个基本内容结构模板。
"""
print(f"\n--- Calling Tool: get_content_template for '{topic}' ---")
return f"Template for '{topic}': Introduction, Main Points (3-5), Conclusion, Call to Action."
tools = [
Tool(name="get_keywords", func=get_keywords, description="Useful for generating keywords related to a topic."),
Tool(name="get_content_template", func=get_content_template, description="Useful for getting a content structure template for a topic."),
]
# --- 3. 定义 LLM 模型 ---
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# --- 4. 定义智能体节点 ---
# 4.1 Planner 智能体 (Agent Node)
# Planner 现在不仅会规划,还会根据需求判断下一步的走向
class PlannerAgent:
def __init__(self, llm: ChatOpenAI, tools: List[Tool]):
self.llm = llm
self.tools = tools
self.prompt = ChatPromptTemplate.from_messages([
("system", """
你是一个经验丰富的内容创作机构的规划师。你的任务是根据用户请求,决定下一步的最佳动作。
可以采取的动作包括:
- `tool_call`: 如果需要调用外部工具来获取信息或生成内容辅助。
- `research`: 如果需要进行深入的背景研究。
- `write`: 如果可以直接开始写作。
- `end`: 如果任务已经完成,或者请求是一个简单的问题,不需要进一步创作。
请严格以 JSON 格式输出你的决策,包含 'plan' (你的规划) 和 'next_action' (下一步动作)。
如果 'next_action' 是 'tool_call',你还需要在 'tool_calls' 字段中包含工具调用的详细信息。
可用的工具: {tool_names}
示例输出 (需要工具):
{{
"plan": "用户请求生成关键词,需要调用 get_keywords 工具。",
"next_action": "tool_call",
"tool_calls": [
{{
"tool_name": "get_keywords",
"args": {{"topic": "AI在教育中的应用"}}
}}
]
}}
示例输出 (需要研究):
{{
"plan": "用户请求一篇关于量子计算的文章,需要进行深入研究。",
"next_action": "research"
}}
示例输出 (直接写作):
{{
"plan": "用户请求一篇简短的社交媒体文案,可以直接开始写作。",
"next_action": "write"
}}
示例输出 (结束):
{{
"plan": "用户只是问候,任务完成。",
"next_action": "end"
}}
"""),
MessagesPlaceholder(variable_name="chat_history"),
("user", "{input}"),
MessagesPlaceholder(variable_name="agent_outcome"), # 用于传递工具结果回来
MessagesPlaceholder(variable_name="intermediate_steps")
])
# 绑定工具
self.runnable = self.prompt.partial(tool_names=", ".join([tool.name for tool in tools])) | llm.bind_tools(tools)
def __call__(self, state: AgentState):
print("\n--- Entering Planner Agent ---")
current_input = state["input"]
chat_history = state.get("chat_history", [])
intermediate_steps = state.get("intermediate_steps", [])
# 如果有工具结果,将其添加到聊天历史,让LLM知道
if intermediate_steps:
for action, observation in intermediate_steps:
chat_history.append(AIMessage(content=f"Tool Call: {action.tool} with args {action.tool_input}"))
chat_history.append(AIMessage(content=f"Tool Output: {observation}"))
response = self.runnable.invoke({
"input": current_input,
"chat_history": chat_history,
"intermediate_steps": intermediate_steps,
"agent_outcome": state["agent_outcome"] # 用于传递上一个智能体的决策,比如工具执行器返回的AgentFinish
})
# 解析 LLM 的输出,决定 next_action
# 注意:这里需要更健壮的JSON解析和错误处理
try:
parsed_response = response.tool_calls[0] if response.tool_calls else {}
# Fallback for when LLM doesn't use tool_calls but direct JSON
if not parsed_response and response.content:
import json
parsed_response = json.loads(response.content)
next_action = parsed_response.get("next_action")
tool_calls = parsed_response.get("tool_calls", [])
# 如果LLM直接输出了工具调用,但next_action不是tool_call,我们纠正它
if response.tool_calls and next_action != "tool_call":
next_action = "tool_call"
elif not response.tool_calls and not next_action: # LLM可能直接回复文本
next_action = "end" # 默认结束,或者可以设计一个"chat"节点
print(f"Planner Decision: {next_action}")
print(f"Planner Plan: {parsed_response.get('plan', 'No specific plan.')}")
# 更新状态
new_state = {
"chat_history": chat_history + [HumanMessage(content=current_input), response],
"next_action": next_action,
"agent_outcome": response # 存储LLM的原始输出,方便后续处理工具调用
}
return new_state
except Exception as e:
print(f"Error parsing Planner output: {e}")
print(f"LLM raw response: {response.content}")
# 如果解析失败,默认结束或者进入错误处理流程
return {"next_action": "end", "chat_history": chat_history + [HumanMessage(content=current_input), AIMessage(content=f"Error: {e}")]}
planner_agent = PlannerAgent(llm, tools)
# 4.2 Tool Executor 节点
# 负责执行 Planner 智能体指示的工具
class ToolExecutorAgent:
def __init__(self, tools: List[Tool]):
self.tools_map = {tool.name: tool for tool in tools}
def __call__(self, state: AgentState):
print("\n--- Entering Tool Executor Agent ---")
tool_calls = state["agent_outcome"].tool_calls # 从 Planner 的输出中提取工具调用
intermediate_steps = []
for tool_call in tool_calls:
tool_name = tool_call.name
tool_args = tool_call.args
if tool_name in self.tools_map:
try:
tool_output = self.tools_map[tool_name].func(**tool_args)
intermediate_steps.append((AgentAction(tool=tool_name, tool_input=tool_args, log=""), tool_output))
print(f"Executed tool '{tool_name}' with args {tool_args}. Output: {tool_output}")
except Exception as e:
error_msg = f"Error executing tool '{tool_name}': {e}"
intermediate_steps.append((AgentAction(tool=tool_name, tool_input=tool_args, log=""), error_msg))
print(error_msg)
else:
error_msg = f"Tool '{tool_name}' not found."
intermediate_steps.append((AgentAction(tool=tool_name, tool_input=tool_args, log=""), error_msg))
print(error_msg)
# 清除 agent_outcome,因为工具执行器不是最终结果
return {"intermediate_steps": intermediate_steps, "agent_outcome": None}
tool_executor_agent = ToolExecutorAgent(tools)
# 4.3 Researcher 智能体 (简化版)
def researcher_node(state: AgentState):
print("\n--- Entering Researcher Agent ---")
current_input = state["input"]
# 模拟研究过程
research_content = f"Research on '{current_input}': Detailed findings and insights. This would typically involve web searches, database queries, etc."
print(f"Research completed for: {current_input}")
return {"research_result": research_content, "next_action": "write"} # 研究完成后,指示下一步写作
# 4.4 Writer 智能体 (简化版)
def writer_node(state: AgentState):
print("\n--- Entering Writer Agent ---")
current_input = state["input"]
research_result = state.get("research_result", "No specific research provided.")
# 模拟写作过程
writing_content = f"Article Title: {current_input}\n\n" \
f"Based on research: {research_result}\n\n" \
f"Content: This is a beautifully written piece about {current_input}, incorporating all key findings and creative flair. " \
f"It aims to engage the audience and fulfill the content brief."
print(f"Writing completed for: {current_input}")
return {"writer_output": writing_content, "next_action": "edit"} # 写作完成后,指示下一步编辑
# 4.5 Editor 智能体 (简化版)
def editor_node(state: AgentState):
print("\n--- Entering Editor Agent ---")
writer_output = state["writer_output"]
# 模拟编辑过程
edited_content = f"--- Edited Version ---\n{writer_output}\n\n" \
f"Editor's notes: Checked grammar, improved flow, added a stronger call to action. Content is now polished and ready."
print(f"Editing completed for the written content.")
return {"editor_output": edited_content, "next_action": "end"} # 编辑完成后,指示结束
# --- 5. 决策函数:LangGraph 的“分叉判决器” ---
# 这个函数根据 Planner 智能体输出的 `next_action` 来决定下一个节点
def decide_next_step(state: AgentState) -> str:
"""
Decides the next node based on the Planner's `next_action` or the current state.
根据 Planner 的 `next_action` 或当前状态决定下一个节点。
"""
next_action = state["next_action"]
print(f"\n--- Decision Point: Planner recommended '{next_action}' ---")
if next_action == "tool_call":
return "tool_executor"
elif next_action == "research":
return "researcher"
elif next_action == "write":
return "writer"
elif next_action == "edit": # 当 Writer 完成后,会设置 next_action 为 'edit'
return "editor"
elif next_action == "end":
return END
else:
# 默认处理,例如错误或未知动作
print(f"Warning: Unknown next_action '{next_action}'. Ending workflow.")
return END
# --- 6. 构建 LangGraph ---
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("planner", planner_agent)
workflow.add_node("tool_executor", tool_executor_agent)
workflow.add_node("researcher", researcher_node)
workflow.add_node("writer", writer_node)
workflow.add_node("editor", editor_node)
# 设置入口点
workflow.set_entry_point("planner")
# 添加条件边缘 - 这是本期的核心!
workflow.add_conditional_edges(
"planner", # 当 planner 节点执行完毕后,调用 decide_next_step 函数
decide_next_step,
{
"tool_call": "tool_executor", # 如果 decide_next_step 返回 "tool_call",跳转到 tool_executor
"research": "researcher", # 如果 decide_next_step 返回 "research",跳转到 researcher
"write": "writer", # 如果 decide_next_step 返回 "write",跳转到 writer
END: END # 如果 decide_next_step 返回 END,则结束
}
)
# 添加普通边缘
# tool_executor 执行完工具后,通常需要回到 planner 重新评估(Agentic Loop)
workflow.add_edge("tool_executor", "planner")
# researcher 完成后,通常会进入 writer
workflow.add_edge("researcher", "writer")
# writer 完成后,通常会进入 editor (通过 writer_node 内部的 next_action='edit' 触发)
# 这里我们手动添加,因为 writer_node 只是一个函数,它不能直接设置条件边缘
# 而是它的返回值会更新 state,然后由另一个节点(例如一个统一的决策节点)来判断
# 为了简化,我们让 writer 节点直接返回 "next_action": "edit",然后我们在这里添加一个从 writer 到 editor 的普通边缘
# 更严谨的做法是 writer 节点也回到一个通用决策点
workflow.add_conditional_edges(
"writer",
decide_next_step, # writer 节点执行完毕后,调用 decide_next_step 函数
{
"edit": "editor",
END: END
}
)
# editor 完成后,结束
workflow.add_edge("editor", END)
# 编译工作流
app = workflow.compile()
# --- 7. 运行工作流 Demo ---
print("\n--- Demo 1: 需要工具调用的请求 (获取关键词) ---")
inputs_1 = {"input": "请帮我生成关于'AI在医疗领域的应用'的关键词。", "chat_history": []}
for s in app.stream(inputs_1):
print(s)
# 预期流程: Planner -> Tool Executor -> Planner (agentic loop) -> END (因为工具调用后 Planner 可能会认为任务完成)
print("\n\n--- Demo 2: 需要深入研究的请求 (撰写深度文章) ---")
inputs_2 = {"input": "请撰写一篇关于'量子计算的未来发展'的深度文章。", "chat_history": []}
for s in app.stream(inputs_2):
print(s)
# 预期流程: Planner -> Researcher -> Writer -> Editor -> END
print("\n\n--- Demo 3: 直接写作的请求 (简短社交媒体文案) ---")
inputs_3 = {"input": "为夏季促销活动写一个简短的社交媒体文案。", "chat_history": []}
for s in app.stream(inputs_3):
print(s)
# 预期流程: Planner -> Writer -> Editor -> END
print("\n\n--- Demo 4: 简单问询 (直接结束) ---")
inputs_4 = {"input": "你好,AI内容机构!", "chat_history": []}
for s in app.stream(inputs_4):
print(s)
# 预期流程: Planner -> END (Planner 判断无需创作,直接结束)
代码解析:
AgentState扩展:我们增加了next_action,research_result,writer_output,editor_output等字段,这些是智能体之间传递信息和决策的关键。PlannerAgent改造:- 它的 Prompt 被重新设计,明确要求 LLM 输出 JSON 格式,其中必须包含
next_action字段,用于指导工作流的走向。 - 如果
next_action是tool_call,它还会期望tool_calls字段。 - 我们使用
llm.bind_tools(tools)让 LLM 知道可用的工具,这样它就能在response.tool_calls中直接生成工具调用。 __call__方法现在负责解析 LLM 的输出,并更新state["next_action"]。
- 它的 Prompt 被重新设计,明确要求 LLM 输出 JSON 格式,其中必须包含
ToolExecutorAgent:负责执行Planner智能体指定的工具。执行结果会通过intermediate_steps返回,并让工作流回到Planner,形成一个Agentic Loop,这是高级智能体模式的常见设计。researcher_node,writer_node,editor_node:这些是简化的智能体节点,它们模拟了各自的功能,并在完成任务后,通过更新state["next_action"]来暗示下一步的推荐动作。decide_next_step函数:这是条件边缘的核心决策函数。它接收当前的AgentState,并根据state["next_action"]的值,返回下一个要跳转的节点名称(或END)。add_conditional_edges():- 我们为
planner节点添加了条件边缘。这意味着每次planner节点执行完毕,都会调用decide_next_step函数来决定下一步去哪里。 - 注意,我们还为
writer节点添加了条件边缘,使其完成写作后,也能根据其返回的
- 我们为