第 21 期 | Async 执行:把阻塞留给昨日
🎯 本期学习目标
各位架构师们,欢迎回到我们的 AI 内容创作机构!今天,我们要给我们的智能体们打一针“兴奋剂”,让它们告别慢吞吞的“排队等候”,学会“多线程并行”!在 AI Agent 领域,效率就是生命,尤其是当你的 Agent 需要与外部世界(比如网络搜索、数据库查询、API 调用)频繁交互时。
本期,我们将深入 LangGraph 的异步执行机制,让你的 Agent 像多核处理器一样高效运转。学完本期,你将:
- 透彻理解异步的本质与必要性: 彻底告别对
async/await的模糊认知,明确为何它是构建高性能 AI Agent 的基石。 - 精通 LangGraph 异步节点集成: 掌握如何在 LangGraph 状态图中无缝集成异步函数,让你的节点不再阻塞,而是并行处理任务。
- 实战提升 AI Agency 效率: 将我们的
Researcher智能体从“一根筋”的串行搜索,重构为“多线程并发”的信息检索专家,效率提升百倍不是梦。 - 掌握异步编程的“避坑指南”: 识别并规避 LangGraph 异步开发中的常见陷阱,确保你的系统稳定、可扩展。
准备好了吗?让我们一起把那些恼人的阻塞操作,永远留在昨日!
📖 原理解析
为什么我们需要异步?——“等红灯”的困境
想象一下,你的 Researcher 智能体需要去图书馆(互联网)查三本书(执行三次搜索)。
- 同步(Sync)模式: 它会先走到第一本书的书架,找到书,读完,放回,再走到第二本书的书架,重复。如果每本书的查找和阅读都需要 5 秒,那么三本书总共需要 15 秒。这就是典型的“阻塞式” I/O (Input/Output)。在计算机世界里,当一个程序发起一个网络请求、文件读写或数据库查询时,它往往会“暂停”下来,等待 I/O 操作完成,CPU 在这段时间里无所事事,白白浪费。
- 异步(Async)模式: 聪明的
Researcher会怎么做?它会同时向三个图书管理员(并发请求)发出找书的指令,然后自己可以在等待期间去泡杯咖啡,或者去处理其他不依赖这三本书的任务。当任何一个图书管理员找到书后,它会收到通知并去取书。这样,三本书的查找和阅读几乎是同时进行的,总时间可能就只有最慢那本书的 5 秒多一点。这就是“非阻塞式” I/O,CPU 在等待 I/O 的过程中可以切换去做其他事情,大大提高了资源利用率。
在我们的 AI 内容创作机构中,Researcher 智能体经常需要进行:
- 多源信息检索: 同时查询多个搜索引擎(Google, Bing)、内部知识库、API 接口。
- 并发 API 调用: 向多个外部服务(如图片生成 API, 翻译 API)发起请求。
- 大数据量处理: 当需要处理大量数据时,分批次并行处理。
这些场景都是典型的 I/O 密集型任务。如果采用同步执行,Agent 的响应速度将无法忍受。因此,异步执行是构建高效、响应迅速的 AI Agent 系统的必由之路。
Python 的 async/await 魔法
Python 3.5 引入的 async/await 语法,是实现协程(coroutine)的优雅方式。它让我们可以用接近同步代码的写法,来组织异步逻辑。
async def:定义一个协程函数。当这个函数被调用时,它不会立即执行,而是返回一个协程对象。await:用于暂停当前协程的执行,等待另一个协程或可等待对象(awaitable)完成。当await的操作完成后,当前协程会从暂停的地方继续执行。
核心思想是:当一个 async 函数遇到 await 一个 I/O 密集型操作时,它会“让出” CPU 控制权给事件循环(Event Loop),让事件循环去调度其他准备好的协程执行,而不是傻傻地等待。当 I/O 操作完成后,事件循环会通知这个协程,并将其重新调度执行。
LangGraph 如何拥抱异步?
LangGraph 的设计非常现代化,它天然支持异步执行。这意味着你的 Graph 中的 Node 可以是普通的同步函数,也可以是 async def 定义的异步函数。
当 LangGraph 遇到一个异步节点时,它会:
- 识别异步: 检测到节点函数是
async def。 - 集成事件循环: 如果当前没有运行的 asyncio 事件循环,LangGraph 会自动启动一个(或者使用你提供的)。
- 调度执行: 将异步节点作为协程提交给事件循环执行。
- 等待完成: LangGraph 会
await这个异步节点的完成,并获取其返回结果。 - 更新状态: 节点执行完成后,其输出会用于更新
AgentState。
这种无缝集成使得在 LangGraph 中混合同步和异步节点变得非常简单,你无需关心底层的事件循环管理细节,只需专注于业务逻辑。
让我们通过一个 Mermaid 图来直观地看看,我们的 Researcher 智能体如何在 LangGraph 中利用异步机制,同时进行多项搜索任务:
graph TD
A[用户请求/Planner指令] --> B{Researcher Node (Async)}
B --> |启动并发搜索| C1(搜索任务 1: Web Search API)
B --> |启动并发搜索| C2(搜索任务 2: Knowledge Base Query)
B --> |启动并发搜索| C3(搜索任务 3: External API Call)
C1 -- 完成 --> D{聚合搜索结果}
C2 -- 完成 --> D
C3 -- 完成 --> D
D --> E[Writer Node (处理聚合结果)]
E --> F[Editor Node]
F --> G[最终内容输出]
subgraph 异步执行流
B --- C1
B --- C2
B --- C3
end
style C1 fill:#f9f,stroke:#333,stroke-width:2px;
style C2 fill:#f9f,stroke:#333,stroke-width:2px;
style C3 fill:#f9f,stroke:#333,stroke-width:2px;
style D fill:#bbf,stroke:#333,stroke-width:2px;图解:
Researcher Node (Async):这是我们本期要改造的核心。它是一个异步节点。启动并发搜索:当Researcher节点被调用时,它不是一个接一个地执行搜索任务,而是同时(并发地)启动搜索任务 1, 2, 3。搜索任务 1, 2, 3:这些代表了与外部世界进行 I/O 交互的异步操作,比如调用 Web 搜索 API、查询内部知识库或调用其他外部服务。它们会并行运行。聚合搜索结果:Researcher节点会await所有并发搜索任务完成,然后将它们的结果收集起来进行整合和分析。Writer Node:接收Researcher聚合后的信息,开始撰写内容。
通过这种方式,原本需要 T1 + T2 + T3 时间的串行搜索,现在只需要 max(T1, T2, T3) 的时间,效率得到了指数级提升!
💻 实战代码演练 (Agency项目中的具体应用)
好了,理论说得再多,不如我们直接上手改造我们的 AI Content Agency。我们将聚焦于 Researcher 智能体,让它拥有“三头六臂”,可以同时进行多项信息检索任务。
场景设定
我们的 Researcher 智能体需要针对一个主题,同时查询多个模拟的“外部数据源”。为了模拟 I/O 延迟,我们将使用 asyncio.sleep 来模拟网络请求的耗时。
1. 准备工作:模拟异步搜索工具
首先,我们需要一些异步的“搜索工具”。在真实世界中,这些可能是调用 Google Search API、Perplexity API 或你自己的数据库查询服务。
import asyncio
import time
from typing import List, Dict, Any
# 模拟一个异步的搜索工具
# Simulate an asynchronous search tool
async def _mock_async_search(query: str, source_name: str, delay: float = 2.0) -> Dict[str, Any]:
"""
模拟一个异步的外部搜索请求。
Simulates an asynchronous external search request.
"""
print(f"[{source_name}] 正在开始搜索:'{query}',预计耗时 {delay} 秒...")
# 模拟网络延迟或其他 I/O 阻塞
# Simulate network latency or other I/O blocking
await asyncio.sleep(delay)
result = {
"source": source_name,
"query": query,
"content": f"从 {source_name} 找到关于 '{query}' 的信息。耗时 {delay} 秒。",
"timestamp": time.time()
}
print(f"[{source_name}] 搜索完成:'{query}'")
return result
# 暴露给 LangGraph 节点的异步搜索工具函数
# Asynchronous search tool function exposed to LangGraph nodes
async def async_web_search_tool(query: str) -> Dict[str, Any]:
"""模拟异步的网络搜索"""
return await _mock_async_search(query, "Web Search Engine", delay=2.5)
async def async_knowledge_base_tool(query: str) -> Dict[str, Any]:
"""模拟异步的内部知识库查询"""
return await _mock_async_search(query, "Internal Knowledge Base", delay=1.8)
async def async_api_data_tool(query: str) -> Dict[str, Any]:
"""模拟异步的外部 API 数据获取"""
return await _mock_async_search(query, "External API Service", delay=3.0)
print("模拟异步搜索工具已准备就绪。")
2. 定义我们的 AgentState
我们需要一个共享的状态来在 Agent 之间传递信息。这里我们用一个简单的字典来模拟。
from typing import TypedDict, Annotated, List, Union
import operator
# 定义我们机构的共享状态
# Define our agency's shared state
class AgentState(TypedDict):
"""
代表 Agent 之间共享的当前状态。
Representing the current state shared between agents.
"""
topic: str # 当前内容创作的主题
research_queries: List[str] # 研究者需要执行的查询列表
research_results: Annotated[List[Dict[str, Any]], operator.add] # 研究结果列表,使用 operator.add 合并
final_content: str # 最终生成的内容
# ... 更多状态可以根据需要添加 ...
这里 Annotated[List[Dict[str, Any]], operator.add] 是 LangGraph 的一个高级特性,它告诉 LangGraph 当多个节点尝试更新 research_results 时,应该使用 operator.add(即列表拼接)的方式来合并结果,而不是覆盖。这对于并发写入场景非常有用。
3. 重构 Researcher 智能体为异步节点
现在,重头戏来了!我们将把 Researcher 智能体改造成一个异步节点,它将并发地执行多个搜索任务。
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage
# Researcher 智能体节点 (异步版本)
# Researcher Agent Node (Asynchronous Version)
async def async_researcher_node(state: AgentState) -> AgentState:
"""
异步执行多项研究查询,并聚合结果。
Asynchronously executes multiple research queries and aggregates results.
"""
print("\n--- Researcher 节点 (异步) 启动 ---")
topic = state["topic"]
queries = state.get("research_queries", [topic]) # 如果没有指定查询,则默认以主题作为查询
# 准备并发执行的搜索任务
# Prepare search tasks for concurrent execution
tasks = []
for query in queries:
# 假设我们有多个数据源需要同时查询
# Assume we have multiple data sources to query simultaneously
tasks.append(async_web_search_tool(query))
tasks.append(async_knowledge_base_tool(query))
tasks.append(async_api_data_tool(query))
print(f"Researcher 正在并发执行 {len(tasks)} 个搜索任务,针对主题:'{topic}'...")
# 使用 asyncio.gather 并发运行所有任务,并等待它们全部完成
# Use asyncio.gather to run all tasks concurrently and wait for them all to complete
# 如果其中一个任务失败,gather 默认会等待所有任务完成,然后抛出第一个异常。
# 可以通过 return_exceptions=True 让 gather 返回异常而不是抛出。
all_results = await asyncio.gather(*tasks) # 敲黑板!这里是异步并发的核心!
# 过滤掉可能的 None 结果 (如果某些任务因异常返回 None)
# Filter out possible None results (if some tasks return None due to exceptions)
valid_results = [res for res in all_results if res is not None]
print(f"--- Researcher 节点 (异步) 完成,共收集到 {len(valid_results)} 条结果 ---")
# 将结果添加到状态中
# Add results to the state
return {"research_results": valid_results}
# 模拟 Writer 节点 (同步版本,仅用于演示流程)
# Simulate Writer Node (synchronous version, just for flow demonstration)
def writer_node(state: AgentState) -> AgentState:
"""
根据研究结果撰写初稿。
Writes a draft based on research results.
"""
print("\n--- Writer 节点启动 ---")
research_results = state.get("research_results", [])
topic = state["topic"]
if not research_results:
print("没有研究结果,Writer 无法撰写。")
return {"final_content": f"未能找到关于 '{topic}' 的足够信息。"}
content_parts = [f"基于对 '{topic}' 的研究:\n"]
for i, res in enumerate(research_results):
content_parts.append(f" - [{i+1}] 来自 {res['source']}:{res['content']}")
draft = "\n".join(content_parts) + "\n\n(此为基于异步研究结果的初稿)"
print("Writer 完成初稿撰写。")
return {"final_content": draft}
# 模拟 Planner 节点
def planner_node(state: AgentState) -> AgentState:
"""
规划研究主题和查询。
Plans the research topic and queries.
"""
print("\n--- Planner 节点启动 ---")
topic = state["topic"]
print(f"Planner 正在为主题 '{topic}' 规划研究查询...")
# 模拟 Planner 生成多个查询
# Simulate Planner generating multiple queries
queries = [
f"{topic} 市场趋势",
f"{topic} 核心技术",
f"{topic} 竞争对手分析"
]
print(f"Planner 规划了 {len(queries)} 个查询。")
return {"research_queries": queries}
4. 构建 LangGraph 工作流
现在,我们将这些节点组装成一个 LangGraph 工作流。
# 构建 LangGraph
# Build the LangGraph
workflow = StateGraph(AgentState)
# 添加节点
# Add nodes
workflow.add_node("planner", planner_node)
workflow.add_node("researcher", async_researcher_node) # 注意这里是异步节点
workflow.add_node("writer", writer_node)
# 设置入口和出口
# Set entry and exit points
workflow.set_entry_point("planner")
workflow.add_edge("planner", "researcher")
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", END)
# 编译图
# Compile the graph
app = workflow.compile()
print("\nLangGraph 工作流已编译完成。")
# 运行工作流
# Run the workflow
async def run_agency_async():
print("\n--- 启动 AI 内容创作机构工作流 (异步版) ---")
initial_state = {"topic": "AI 在教育领域的应用", "research_results": []}
start_time = time.time()
# LangGraph 的 .stream() 方法在遇到异步节点时,会自动在内部处理 async/await
# 或者你可以使用 .invoke() 方法,它也会自动处理
# LangGraph's .stream() method automatically handles async/await internally
# when encountering async nodes, or you can use .invoke() which also handles it.
final_state = await app.invoke(initial_state) # 注意这里我们需要 await app.invoke()
end_time = time.time()
print("\n--- AI 内容创作机构工作流 (异步版) 完成 ---")
print(f"总耗时:{end_time - start_time:.2f} 秒")
print("\n最终内容:")
print(final_state["final_content"])
# 在主程序中运行异步函数
if __name__ == "__main__":
# Python 3.7+ 可以直接运行顶层 await,或者使用 asyncio.run()
# For Python 3.7+, you can directly run top-level await, or use asyncio.run()
# 如果你在 Jupyter/Colab 环境,可能已经有一个事件循环在运行,可以直接 await
# If in Jupyter/Colab, an event loop might already be running, you can directly await
# 否则,需要显式调用 asyncio.run()
# Otherwise, you need to explicitly call asyncio.run()
asyncio.run(run_agency_async())
print("\n--- 对比:如果我们用同步方式,会慢多少? ---")
# 假设每个搜索任务平均耗时 2.5s,3个查询 * 3个源 = 9个任务
# 同步耗时大约是 9 * 2.5s = 22.5s
# 异步耗时大约是 max(2.5, 1.8, 3.0) = 3.0s (因为是并发执行,取最长耗时)
# 实际运行可能会略有不同,但数量级上的差距是巨大的!
# This comparison highlights the massive difference in magnitude.
print("如果使用同步方式,3个查询 x 3个数据源 = 9个搜索任务。每个任务平均 2.5 秒。")
print(f"同步总耗时预估:9 * 2.5 = {9 * 2.5:.2f} 秒。")
print("而我们的异步实现,由于是并发执行,理论耗时接近单个最慢任务的耗时,即约 3.0 秒。")
print("效率提升,一目了然!")
运行结果分析:
当你运行这段代码时,你会发现 Researcher 节点内部的三个模拟搜索任务几乎同时开始,并且它们的完成时间会非常接近单个最慢任务的耗时(本例中是 3.0 秒)。而不是像同步版本那样,一个接一个地等待 9 次搜索完成(大约 22.5 秒)。
关键点:
async def async_researcher_node(state: AgentState):函数定义前加上async关键字,使其成为一个协程。await asyncio.gather(*tasks):这是核心!asyncio.gather接收一系列协程对象,并并发地运行它们。它会等待所有这些协程都完成后,才返回一个包含所有结果的列表。await app.invoke(initial_state):由于我们的图包含异步节点,我们调用app.invoke()时也需要await它。LangGraph 的invoke和stream方法都能够智能地处理图中的同步和异步节点。
通过这个实战,我们成功地将 Researcher 智能体升级为并发搜索专家,极大地提升了我们 AI 内容创作机构的数据收集效率!
坑与避坑指南
异步编程虽然强大,但也伴随着一些独特的挑战。作为一名高级架构师,你必须了解这些“坑”,并知道如何避开它们。
1. 事件循环管理:asyncio.run() 的正确姿势
- 坑: 在已经有事件循环运行的环境中(如 Jupyter Notebook、某些 Web 框架)再次调用
asyncio.run()会报错RuntimeError: asyncio.run() cannot be called from a running event loop。 - 避坑:
- 如果你在 Jupyter/Colab 环境,通常可以直接
await你的顶层异步函数(例如await run_agency_async()),因为它们底层已经有一个事件循环。 - 在普通的 Python 脚本中,使用
asyncio.run(your_async_main_function())是启动事件循环并运行异步代码的标准方式。 - 如果你需要在一个已运行的事件循环中调度新的协程,可以使用
asyncio.create_task()或loop.run_until_complete()。LangGraph 的app.invoke()和app.stream()已经为你处理了这些细节,它会检测当前事件循环状态并采取适当行动。
- 如果你在 Jupyter/Colab 环境,通常可以直接
2. 同步与异步的混淆:await 缺失或滥用
- 坑:
- 在一个
async def函数中,调用另一个async def函数却忘记await。这将导致你得到一个协程对象,而不是其执行结果,并且该协程不会被调度执行。 - 在同步函数中直接调用
async def函数,同样会得到一个协程对象,但它不会被执行。
- 在一个
- 避坑:
- 记住黄金法则:
await只能在async def函数内部使用。 - 当你在
async def函数中调用另一个async def函数时,几乎总是需要await它。 - 如果你的同步函数需要调用异步函数,你需要将同步函数包装在一个异步函数中,或者使用
asyncio.run()(如果它是一个顶层调用),或者更复杂地通过loop.run_in_executor()将同步代码放到线程池中执行,但这通常用于将阻塞的同步 I/O 操作变为非阻塞。
- 记住黄金法则:
3. 共享状态与并发写入:LangGraph 的 operator.add
- 坑: 在并发执行的异步节点中,如果多个节点尝试更新同一个列表或字典状态,可能会出现竞争条件或覆盖问题,导致数据丢失或不一致。
- 避坑:
- LangGraph 的救星:
Annotated[Type, operator.add]。 如我们示例所示,对于列表,使用operator.add可以让 LangGraph 自动将所有并发写入的结果列表进行拼接,而不是覆盖。对于字典,operator.add会进行字典合并。 - 如果需要更复杂的合并逻辑,你可以定义自己的合并函数。
- 对于更复杂的共享资源(如数据库连接、缓存),需要考虑使用锁(
asyncio.Lock)来保护共享资源的访问,以避免数据损坏,但这在 LangGraph 的状态管理层面较少直接涉及,更多是在工具函数内部。
- LangGraph 的救星:
4. 错误处理:asyncio.gather 与 return_exceptions
- 坑:
asyncio.gather(*tasks)默认行为是,如果任何一个任务失败并抛出异常,gather会立即抛出该异常,而不会等待其他任务完成。这可能导致你丢失其他已完成任务的结果。 - 避坑:
- 如果你希望即使有任务失败,也能收集到所有成功任务的结果,并统一处理异常,可以使用
asyncio.gather(*tasks, return_exceptions=True)。 - 当
return_exceptions=True时,如果一个任务失败,它的结果将是抛出的异常本身,而不是None。你需要在all_results列表中迭代检查每个结果是否是异常,并进行相应的处理。 - 在我们的示例中,简单地过滤掉了
None结果,这适用于任务成功返回字典,失败可能返回None的情况(例如,在_mock_async_search内部捕获异常并返回None)。更健壮的做法是捕获异常并记录。
- 如果你希望即使有任务失败,也能收集到所有成功任务的结果,并统一处理异常,可以使用
5. 资源限制与背压:不要盲目并发
- 坑: 理论上,你可以启动成千上万个并发任务。但实际上,外部 API 有速率限制,你的服务器有连接限制,过多的并发请求可能会导致:
- 被外部 API 封禁 IP。
- 服务器资源耗尽(内存、CPU、文件描述符)。
- TCP 连接耗尽。
- 避坑:
- 限流 (Rate Limiting): 对于外部 API 调用,务必实现客户端限流,例如使用
asyncio.Semaphore来限制同时进行的任务数量,或者使用aiohttp等库自带的连接池和限流功能。 - 批处理 (Batching): 如果可能,将请求合并成批次发送。
- 合理设置并发度: 根据你的服务器资源和外部服务的限制,找到一个合适的并发任务数量。不要一味追求最大并发。
- 限流 (Rate Limiting): 对于外部 API 调用,务必实现客户端限流,例如使用
6. 调试异步代码
- 坑: 异步代码的堆栈跟踪可能比同步代码更复杂,因为控制流会在不同的协程之间跳转。
- 避坑:
- 使用
asyncio.create_task命名任务:task = asyncio.create_task(coroutine(), name="my_research_task")这样在调试时更容易识别任务。 - 合理使用日志: 在异步函数的入口和出口打印日志,特别是耗时操作的开始和结束,可以帮助你追踪执行流程。
- 逐步调试器: 现代 IDE(如 VS Code, PyCharm)对异步代码的调试支持越来越好,学会使用断点和单步执行。
- 使用
掌握了这些“坑”与“避坑指南”,你就能更自信、更高效地在 LangGraph 中构建强大的异步 AI Agent 了!
📝 本期小结
各位顶级的 AI 架构师们,恭喜你们!本期我们深入探索了 LangGraph 的异步执行机制,这不仅仅是学会了几个 async/await 关键字,更是为你的 AI Agent 注入了“并行处理”的灵魂。
我们理解了:
- 异步的必要性,尤其是在 I/O 密集型任务中,它是突破性能瓶颈的关键。
- Python
async/await的工作原理,以及它如何与 LangGraph 无缝集成。 - 如何将
Researcher智能体重构,使其能够并发执行多项搜索任务,将原本串行的耗时操作变为并行的,效率提升立竿见影。 - 异步编程中的常见陷阱,并掌握了有效的避坑策略,确保你的系统既高效又稳定。
现在,你的 AI Content Agency 的 Researcher 已经不再是那个慢吞吞的“图书管理员”,而是一位能够同时调度多路信息源的“情报专家”了!这将极大地加速我们机构的内容生产流程,让我们的 Agent 更加智能、响应更迅速。
记住,在 AI Agent 的世界里,性能就是用户体验,性能就是成本效益。掌握异步,你就在构建下一代智能系统的道路上迈出了坚实的一步!
下一期,我们将继续深入 LangGraph 的其他高级特性,敬请期待!