第 21 期 | Async 执行:把阻塞留给昨日

更新于 2026/4/16

🎯 本期学习目标

各位架构师们,欢迎回到我们的 AI 内容创作机构!今天,我们要给我们的智能体们打一针“兴奋剂”,让它们告别慢吞吞的“排队等候”,学会“多线程并行”!在 AI Agent 领域,效率就是生命,尤其是当你的 Agent 需要与外部世界(比如网络搜索、数据库查询、API 调用)频繁交互时。

本期,我们将深入 LangGraph 的异步执行机制,让你的 Agent 像多核处理器一样高效运转。学完本期,你将:

  1. 透彻理解异步的本质与必要性: 彻底告别对 async/await 的模糊认知,明确为何它是构建高性能 AI Agent 的基石。
  2. 精通 LangGraph 异步节点集成: 掌握如何在 LangGraph 状态图中无缝集成异步函数,让你的节点不再阻塞,而是并行处理任务。
  3. 实战提升 AI Agency 效率: 将我们的 Researcher 智能体从“一根筋”的串行搜索,重构为“多线程并发”的信息检索专家,效率提升百倍不是梦。
  4. 掌握异步编程的“避坑指南”: 识别并规避 LangGraph 异步开发中的常见陷阱,确保你的系统稳定、可扩展。

准备好了吗?让我们一起把那些恼人的阻塞操作,永远留在昨日!

📖 原理解析

为什么我们需要异步?——“等红灯”的困境

想象一下,你的 Researcher 智能体需要去图书馆(互联网)查三本书(执行三次搜索)。

  • 同步(Sync)模式: 它会先走到第一本书的书架,找到书,读完,放回,再走到第二本书的书架,重复。如果每本书的查找和阅读都需要 5 秒,那么三本书总共需要 15 秒。这就是典型的“阻塞式” I/O (Input/Output)。在计算机世界里,当一个程序发起一个网络请求、文件读写或数据库查询时,它往往会“暂停”下来,等待 I/O 操作完成,CPU 在这段时间里无所事事,白白浪费。
  • 异步(Async)模式: 聪明的 Researcher 会怎么做?它会同时向三个图书管理员(并发请求)发出找书的指令,然后自己可以在等待期间去泡杯咖啡,或者去处理其他不依赖这三本书的任务。当任何一个图书管理员找到书后,它会收到通知并去取书。这样,三本书的查找和阅读几乎是同时进行的,总时间可能就只有最慢那本书的 5 秒多一点。这就是“非阻塞式” I/O,CPU 在等待 I/O 的过程中可以切换去做其他事情,大大提高了资源利用率。

在我们的 AI 内容创作机构中,Researcher 智能体经常需要进行:

  1. 多源信息检索: 同时查询多个搜索引擎(Google, Bing)、内部知识库、API 接口。
  2. 并发 API 调用: 向多个外部服务(如图片生成 API, 翻译 API)发起请求。
  3. 大数据量处理: 当需要处理大量数据时,分批次并行处理。

这些场景都是典型的 I/O 密集型任务。如果采用同步执行,Agent 的响应速度将无法忍受。因此,异步执行是构建高效、响应迅速的 AI Agent 系统的必由之路。

Python 的 async/await 魔法

Python 3.5 引入的 async/await 语法,是实现协程(coroutine)的优雅方式。它让我们可以用接近同步代码的写法,来组织异步逻辑。

  • async def:定义一个协程函数。当这个函数被调用时,它不会立即执行,而是返回一个协程对象。
  • await:用于暂停当前协程的执行,等待另一个协程或可等待对象(awaitable)完成。当 await 的操作完成后,当前协程会从暂停的地方继续执行。

核心思想是:当一个 async 函数遇到 await 一个 I/O 密集型操作时,它会“让出” CPU 控制权给事件循环(Event Loop),让事件循环去调度其他准备好的协程执行,而不是傻傻地等待。当 I/O 操作完成后,事件循环会通知这个协程,并将其重新调度执行。

LangGraph 如何拥抱异步?

LangGraph 的设计非常现代化,它天然支持异步执行。这意味着你的 Graph 中的 Node 可以是普通的同步函数,也可以是 async def 定义的异步函数。

当 LangGraph 遇到一个异步节点时,它会:

  1. 识别异步: 检测到节点函数是 async def
  2. 集成事件循环: 如果当前没有运行的 asyncio 事件循环,LangGraph 会自动启动一个(或者使用你提供的)。
  3. 调度执行: 将异步节点作为协程提交给事件循环执行。
  4. 等待完成: LangGraph 会 await 这个异步节点的完成,并获取其返回结果。
  5. 更新状态: 节点执行完成后,其输出会用于更新 AgentState

这种无缝集成使得在 LangGraph 中混合同步和异步节点变得非常简单,你无需关心底层的事件循环管理细节,只需专注于业务逻辑。

让我们通过一个 Mermaid 图来直观地看看,我们的 Researcher 智能体如何在 LangGraph 中利用异步机制,同时进行多项搜索任务:

graph TD
    A[用户请求/Planner指令] --> B{Researcher Node (Async)}
    B --> |启动并发搜索| C1(搜索任务 1: Web Search API)
    B --> |启动并发搜索| C2(搜索任务 2: Knowledge Base Query)
    B --> |启动并发搜索| C3(搜索任务 3: External API Call)

    C1 -- 完成 --> D{聚合搜索结果}
    C2 -- 完成 --> D
    C3 -- 完成 --> D

    D --> E[Writer Node (处理聚合结果)]
    E --> F[Editor Node]
    F --> G[最终内容输出]

    subgraph 异步执行流
        B --- C1
        B --- C2
        B --- C3
    end
    style C1 fill:#f9f,stroke:#333,stroke-width:2px;
    style C2 fill:#f9f,stroke:#333,stroke-width:2px;
    style C3 fill:#f9f,stroke:#333,stroke-width:2px;
    style D fill:#bbf,stroke:#333,stroke-width:2px;

图解:

  • Researcher Node (Async):这是我们本期要改造的核心。它是一个异步节点。
  • 启动并发搜索:当 Researcher 节点被调用时,它不是一个接一个地执行搜索任务,而是同时(并发地)启动 搜索任务 1, 2, 3
  • 搜索任务 1, 2, 3:这些代表了与外部世界进行 I/O 交互的异步操作,比如调用 Web 搜索 API、查询内部知识库或调用其他外部服务。它们会并行运行。
  • 聚合搜索结果Researcher 节点会 await 所有并发搜索任务完成,然后将它们的结果收集起来进行整合和分析。
  • Writer Node:接收 Researcher 聚合后的信息,开始撰写内容。

通过这种方式,原本需要 T1 + T2 + T3 时间的串行搜索,现在只需要 max(T1, T2, T3) 的时间,效率得到了指数级提升!

💻 实战代码演练 (Agency项目中的具体应用)

好了,理论说得再多,不如我们直接上手改造我们的 AI Content Agency。我们将聚焦于 Researcher 智能体,让它拥有“三头六臂”,可以同时进行多项信息检索任务。

场景设定

我们的 Researcher 智能体需要针对一个主题,同时查询多个模拟的“外部数据源”。为了模拟 I/O 延迟,我们将使用 asyncio.sleep 来模拟网络请求的耗时。

1. 准备工作:模拟异步搜索工具

首先,我们需要一些异步的“搜索工具”。在真实世界中,这些可能是调用 Google Search API、Perplexity API 或你自己的数据库查询服务。

import asyncio
import time
from typing import List, Dict, Any

# 模拟一个异步的搜索工具
# Simulate an asynchronous search tool
async def _mock_async_search(query: str, source_name: str, delay: float = 2.0) -> Dict[str, Any]:
    """
    模拟一个异步的外部搜索请求。
    Simulates an asynchronous external search request.
    """
    print(f"[{source_name}] 正在开始搜索:'{query}',预计耗时 {delay} 秒...")
    # 模拟网络延迟或其他 I/O 阻塞
    # Simulate network latency or other I/O blocking
    await asyncio.sleep(delay)
    result = {
        "source": source_name,
        "query": query,
        "content": f"从 {source_name} 找到关于 '{query}' 的信息。耗时 {delay} 秒。",
        "timestamp": time.time()
    }
    print(f"[{source_name}] 搜索完成:'{query}'")
    return result

# 暴露给 LangGraph 节点的异步搜索工具函数
# Asynchronous search tool function exposed to LangGraph nodes
async def async_web_search_tool(query: str) -> Dict[str, Any]:
    """模拟异步的网络搜索"""
    return await _mock_async_search(query, "Web Search Engine", delay=2.5)

async def async_knowledge_base_tool(query: str) -> Dict[str, Any]:
    """模拟异步的内部知识库查询"""
    return await _mock_async_search(query, "Internal Knowledge Base", delay=1.8)

async def async_api_data_tool(query: str) -> Dict[str, Any]:
    """模拟异步的外部 API 数据获取"""
    return await _mock_async_search(query, "External API Service", delay=3.0)

print("模拟异步搜索工具已准备就绪。")

2. 定义我们的 AgentState

我们需要一个共享的状态来在 Agent 之间传递信息。这里我们用一个简单的字典来模拟。

from typing import TypedDict, Annotated, List, Union
import operator

# 定义我们机构的共享状态
# Define our agency's shared state
class AgentState(TypedDict):
    """
    代表 Agent 之间共享的当前状态。
    Representing the current state shared between agents.
    """
    topic: str  # 当前内容创作的主题
    research_queries: List[str]  # 研究者需要执行的查询列表
    research_results: Annotated[List[Dict[str, Any]], operator.add] # 研究结果列表,使用 operator.add 合并
    final_content: str # 最终生成的内容
    # ... 更多状态可以根据需要添加 ...

这里 Annotated[List[Dict[str, Any]], operator.add] 是 LangGraph 的一个高级特性,它告诉 LangGraph 当多个节点尝试更新 research_results 时,应该使用 operator.add(即列表拼接)的方式来合并结果,而不是覆盖。这对于并发写入场景非常有用。

3. 重构 Researcher 智能体为异步节点

现在,重头戏来了!我们将把 Researcher 智能体改造成一个异步节点,它将并发地执行多个搜索任务。

from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage

# Researcher 智能体节点 (异步版本)
# Researcher Agent Node (Asynchronous Version)
async def async_researcher_node(state: AgentState) -> AgentState:
    """
    异步执行多项研究查询,并聚合结果。
    Asynchronously executes multiple research queries and aggregates results.
    """
    print("\n--- Researcher 节点 (异步) 启动 ---")
    topic = state["topic"]
    queries = state.get("research_queries", [topic]) # 如果没有指定查询,则默认以主题作为查询

    # 准备并发执行的搜索任务
    # Prepare search tasks for concurrent execution
    tasks = []
    for query in queries:
        # 假设我们有多个数据源需要同时查询
        # Assume we have multiple data sources to query simultaneously
        tasks.append(async_web_search_tool(query))
        tasks.append(async_knowledge_base_tool(query))
        tasks.append(async_api_data_tool(query))
    
    print(f"Researcher 正在并发执行 {len(tasks)} 个搜索任务,针对主题:'{topic}'...")
    
    # 使用 asyncio.gather 并发运行所有任务,并等待它们全部完成
    # Use asyncio.gather to run all tasks concurrently and wait for them all to complete
    # 如果其中一个任务失败,gather 默认会等待所有任务完成,然后抛出第一个异常。
    # 可以通过 return_exceptions=True 让 gather 返回异常而不是抛出。
    all_results = await asyncio.gather(*tasks) # 敲黑板!这里是异步并发的核心!
    
    # 过滤掉可能的 None 结果 (如果某些任务因异常返回 None)
    # Filter out possible None results (if some tasks return None due to exceptions)
    valid_results = [res for res in all_results if res is not None]

    print(f"--- Researcher 节点 (异步) 完成,共收集到 {len(valid_results)} 条结果 ---")
    
    # 将结果添加到状态中
    # Add results to the state
    return {"research_results": valid_results}

# 模拟 Writer 节点 (同步版本,仅用于演示流程)
# Simulate Writer Node (synchronous version, just for flow demonstration)
def writer_node(state: AgentState) -> AgentState:
    """
    根据研究结果撰写初稿。
    Writes a draft based on research results.
    """
    print("\n--- Writer 节点启动 ---")
    research_results = state.get("research_results", [])
    topic = state["topic"]
    
    if not research_results:
        print("没有研究结果,Writer 无法撰写。")
        return {"final_content": f"未能找到关于 '{topic}' 的足够信息。"}

    content_parts = [f"基于对 '{topic}' 的研究:\n"]
    for i, res in enumerate(research_results):
        content_parts.append(f"  - [{i+1}] 来自 {res['source']}:{res['content']}")
    
    draft = "\n".join(content_parts) + "\n\n(此为基于异步研究结果的初稿)"
    print("Writer 完成初稿撰写。")
    return {"final_content": draft}

# 模拟 Planner 节点
def planner_node(state: AgentState) -> AgentState:
    """
    规划研究主题和查询。
    Plans the research topic and queries.
    """
    print("\n--- Planner 节点启动 ---")
    topic = state["topic"]
    print(f"Planner 正在为主题 '{topic}' 规划研究查询...")
    # 模拟 Planner 生成多个查询
    # Simulate Planner generating multiple queries
    queries = [
        f"{topic} 市场趋势",
        f"{topic} 核心技术",
        f"{topic} 竞争对手分析"
    ]
    print(f"Planner 规划了 {len(queries)} 个查询。")
    return {"research_queries": queries}

4. 构建 LangGraph 工作流

现在,我们将这些节点组装成一个 LangGraph 工作流。

# 构建 LangGraph
# Build the LangGraph
workflow = StateGraph(AgentState)

# 添加节点
# Add nodes
workflow.add_node("planner", planner_node)
workflow.add_node("researcher", async_researcher_node) # 注意这里是异步节点
workflow.add_node("writer", writer_node)

# 设置入口和出口
# Set entry and exit points
workflow.set_entry_point("planner")
workflow.add_edge("planner", "researcher")
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", END)

# 编译图
# Compile the graph
app = workflow.compile()

print("\nLangGraph 工作流已编译完成。")

# 运行工作流
# Run the workflow
async def run_agency_async():
    print("\n--- 启动 AI 内容创作机构工作流 (异步版) ---")
    initial_state = {"topic": "AI 在教育领域的应用", "research_results": []}
    
    start_time = time.time()
    
    # LangGraph 的 .stream() 方法在遇到异步节点时,会自动在内部处理 async/await
    # 或者你可以使用 .invoke() 方法,它也会自动处理
    # LangGraph's .stream() method automatically handles async/await internally
    # when encountering async nodes, or you can use .invoke() which also handles it.
    final_state = await app.invoke(initial_state) # 注意这里我们需要 await app.invoke()
    
    end_time = time.time()
    
    print("\n--- AI 内容创作机构工作流 (异步版) 完成 ---")
    print(f"总耗时:{end_time - start_time:.2f} 秒")
    print("\n最终内容:")
    print(final_state["final_content"])

# 在主程序中运行异步函数
if __name__ == "__main__":
    # Python 3.7+ 可以直接运行顶层 await,或者使用 asyncio.run()
    # For Python 3.7+, you can directly run top-level await, or use asyncio.run()
    # 如果你在 Jupyter/Colab 环境,可能已经有一个事件循环在运行,可以直接 await
    # If in Jupyter/Colab, an event loop might already be running, you can directly await
    # 否则,需要显式调用 asyncio.run()
    # Otherwise, you need to explicitly call asyncio.run()
    asyncio.run(run_agency_async())

    print("\n--- 对比:如果我们用同步方式,会慢多少? ---")
    # 假设每个搜索任务平均耗时 2.5s,3个查询 * 3个源 = 9个任务
    # 同步耗时大约是 9 * 2.5s = 22.5s
    # 异步耗时大约是 max(2.5, 1.8, 3.0) = 3.0s (因为是并发执行,取最长耗时)
    # 实际运行可能会略有不同,但数量级上的差距是巨大的!
    # This comparison highlights the massive difference in magnitude.
    print("如果使用同步方式,3个查询 x 3个数据源 = 9个搜索任务。每个任务平均 2.5 秒。")
    print(f"同步总耗时预估:9 * 2.5 = {9 * 2.5:.2f} 秒。")
    print("而我们的异步实现,由于是并发执行,理论耗时接近单个最慢任务的耗时,即约 3.0 秒。")
    print("效率提升,一目了然!")

运行结果分析:

当你运行这段代码时,你会发现 Researcher 节点内部的三个模拟搜索任务几乎同时开始,并且它们的完成时间会非常接近单个最慢任务的耗时(本例中是 3.0 秒)。而不是像同步版本那样,一个接一个地等待 9 次搜索完成(大约 22.5 秒)。

关键点:

  • async def async_researcher_node(state: AgentState):函数定义前加上 async 关键字,使其成为一个协程。
  • await asyncio.gather(*tasks):这是核心!asyncio.gather 接收一系列协程对象,并并发地运行它们。它会等待所有这些协程都完成后,才返回一个包含所有结果的列表。
  • await app.invoke(initial_state):由于我们的图包含异步节点,我们调用 app.invoke() 时也需要 await 它。LangGraph 的 invokestream 方法都能够智能地处理图中的同步和异步节点。

通过这个实战,我们成功地将 Researcher 智能体升级为并发搜索专家,极大地提升了我们 AI 内容创作机构的数据收集效率!

坑与避坑指南

异步编程虽然强大,但也伴随着一些独特的挑战。作为一名高级架构师,你必须了解这些“坑”,并知道如何避开它们。

1. 事件循环管理:asyncio.run() 的正确姿势

  • 坑: 在已经有事件循环运行的环境中(如 Jupyter Notebook、某些 Web 框架)再次调用 asyncio.run() 会报错 RuntimeError: asyncio.run() cannot be called from a running event loop
  • 避坑:
    • 如果你在 Jupyter/Colab 环境,通常可以直接 await 你的顶层异步函数(例如 await run_agency_async()),因为它们底层已经有一个事件循环。
    • 在普通的 Python 脚本中,使用 asyncio.run(your_async_main_function()) 是启动事件循环并运行异步代码的标准方式。
    • 如果你需要在一个已运行的事件循环中调度新的协程,可以使用 asyncio.create_task()loop.run_until_complete()。LangGraph 的 app.invoke()app.stream() 已经为你处理了这些细节,它会检测当前事件循环状态并采取适当行动。

2. 同步与异步的混淆:await 缺失或滥用

  • 坑:
    • 在一个 async def 函数中,调用另一个 async def 函数却忘记 await。这将导致你得到一个协程对象,而不是其执行结果,并且该协程不会被调度执行。
    • 在同步函数中直接调用 async def 函数,同样会得到一个协程对象,但它不会被执行。
  • 避坑:
    • 记住黄金法则: await 只能在 async def 函数内部使用。
    • 当你在 async def 函数中调用另一个 async def 函数时,几乎总是需要 await 它。
    • 如果你的同步函数需要调用异步函数,你需要将同步函数包装在一个异步函数中,或者使用 asyncio.run()(如果它是一个顶层调用),或者更复杂地通过 loop.run_in_executor() 将同步代码放到线程池中执行,但这通常用于将阻塞的同步 I/O 操作变为非阻塞。

3. 共享状态与并发写入:LangGraph 的 operator.add

  • 坑: 在并发执行的异步节点中,如果多个节点尝试更新同一个列表或字典状态,可能会出现竞争条件或覆盖问题,导致数据丢失或不一致。
  • 避坑:
    • LangGraph 的救星:Annotated[Type, operator.add] 如我们示例所示,对于列表,使用 operator.add 可以让 LangGraph 自动将所有并发写入的结果列表进行拼接,而不是覆盖。对于字典,operator.add 会进行字典合并。
    • 如果需要更复杂的合并逻辑,你可以定义自己的合并函数。
    • 对于更复杂的共享资源(如数据库连接、缓存),需要考虑使用锁(asyncio.Lock)来保护共享资源的访问,以避免数据损坏,但这在 LangGraph 的状态管理层面较少直接涉及,更多是在工具函数内部。

4. 错误处理:asyncio.gatherreturn_exceptions

  • 坑: asyncio.gather(*tasks) 默认行为是,如果任何一个任务失败并抛出异常,gather 会立即抛出该异常,而不会等待其他任务完成。这可能导致你丢失其他已完成任务的结果。
  • 避坑:
    • 如果你希望即使有任务失败,也能收集到所有成功任务的结果,并统一处理异常,可以使用 asyncio.gather(*tasks, return_exceptions=True)
    • return_exceptions=True 时,如果一个任务失败,它的结果将是抛出的异常本身,而不是 None。你需要在 all_results 列表中迭代检查每个结果是否是异常,并进行相应的处理。
    • 在我们的示例中,简单地过滤掉了 None 结果,这适用于任务成功返回字典,失败可能返回 None 的情况(例如,在 _mock_async_search 内部捕获异常并返回 None)。更健壮的做法是捕获异常并记录。

5. 资源限制与背压:不要盲目并发

  • 坑: 理论上,你可以启动成千上万个并发任务。但实际上,外部 API 有速率限制,你的服务器有连接限制,过多的并发请求可能会导致:
    • 被外部 API 封禁 IP。
    • 服务器资源耗尽(内存、CPU、文件描述符)。
    • TCP 连接耗尽。
  • 避坑:
    • 限流 (Rate Limiting): 对于外部 API 调用,务必实现客户端限流,例如使用 asyncio.Semaphore 来限制同时进行的任务数量,或者使用 aiohttp 等库自带的连接池和限流功能。
    • 批处理 (Batching): 如果可能,将请求合并成批次发送。
    • 合理设置并发度: 根据你的服务器资源和外部服务的限制,找到一个合适的并发任务数量。不要一味追求最大并发。

6. 调试异步代码

  • 坑: 异步代码的堆栈跟踪可能比同步代码更复杂,因为控制流会在不同的协程之间跳转。
  • 避坑:
    • 使用 asyncio.create_task 命名任务: task = asyncio.create_task(coroutine(), name="my_research_task") 这样在调试时更容易识别任务。
    • 合理使用日志: 在异步函数的入口和出口打印日志,特别是耗时操作的开始和结束,可以帮助你追踪执行流程。
    • 逐步调试器: 现代 IDE(如 VS Code, PyCharm)对异步代码的调试支持越来越好,学会使用断点和单步执行。

掌握了这些“坑”与“避坑指南”,你就能更自信、更高效地在 LangGraph 中构建强大的异步 AI Agent 了!

📝 本期小结

各位顶级的 AI 架构师们,恭喜你们!本期我们深入探索了 LangGraph 的异步执行机制,这不仅仅是学会了几个 async/await 关键字,更是为你的 AI Agent 注入了“并行处理”的灵魂。

我们理解了:

  • 异步的必要性,尤其是在 I/O 密集型任务中,它是突破性能瓶颈的关键。
  • Python async/await 的工作原理,以及它如何与 LangGraph 无缝集成。
  • 如何将 Researcher 智能体重构,使其能够并发执行多项搜索任务,将原本串行的耗时操作变为并行的,效率提升立竿见影。
  • 异步编程中的常见陷阱,并掌握了有效的避坑策略,确保你的系统既高效又稳定。

现在,你的 AI Content AgencyResearcher 已经不再是那个慢吞吞的“图书管理员”,而是一位能够同时调度多路信息源的“情报专家”了!这将极大地加速我们机构的内容生产流程,让我们的 Agent 更加智能、响应更迅速。

记住,在 AI Agent 的世界里,性能就是用户体验,性能就是成本效益。掌握异步,你就在构建下一代智能系统的道路上迈出了坚实的一步!

下一期,我们将继续深入 LangGraph 的其他高级特性,敬请期待!