第 14 期 | 时光机!使用 SqliteSaver/MemorySaver 构建 Checkpoints
🎯 本期学习目标
各位未来的 AI 架构师们,欢迎来到《LangGraph 多智能体专家课》的第 14 期!还记得我们之前构建的那个雄心勃勃的「AI 万能内容创作机构」吗?它已经能规划、研究、写作甚至编辑了。但现在,我们得给它加上一个超级能力——“时光机”,让它能够“穿越”回过去,或者说,从中断的地方恢复。
本期课程结束后,你将能够:
- 透彻理解 LangGraph Checkpoints 的核心价值: 为什么它是构建健壮、高效多智能体系统的“杀手锏”,以及它在实际项目中的应用场景。
- 熟练掌握
SqliteSaver和MemorySaver的配置与使用: 学习如何在你的 LangGraph 应用中引入这两种持久化存储方案,并理解它们各自的优缺点。 - 为你的「AI 内容机构」引入持久化状态存储: 确保即使系统崩溃、网络中断或进程重启,我们的智能体也能从上次中断的地方无缝恢复工作,避免重复劳动和资源浪费。
- 掌握从历史状态快照恢复工作流的实战技巧: 学习如何利用
thread_id机制,精准地定位和恢复特定会话的运行状态。
准备好了吗?让我们一起开启 LangGraph 的时光机之旅!
📖 原理解析
掉线不掉链:为什么我们需要 Checkpoints?
想象一下,你的「AI 万能内容创作机构」正在夜以继日地为你生成一篇万字长文。Planner 刚刚完成了复杂的结构规划,Researcher 正在海量资料中穿梭,Writer 已经写了一半……突然,服务器掉线了!停电了!或者你的程序因为某个小 bug 崩溃了!
如果你没有 Checkpoints,恭喜你,所有智能体之前辛辛苦苦的工作——那些昂贵的 LLM 调用、耗时的数据处理——全部付诸东流。你不得不从头开始,不仅浪费了宝贵的计算资源和 LLM token(这可是真金白银啊!),更浪费了大量时间。这简直是“吃土”的节奏!
这就是 LangGraph Checkpoints 的核心价值所在:它就像游戏的自动存档功能,在你的多智能体工作流的每一步关键节点,都默默地为你保存一份完整的“游戏进度”。 无论发生什么突发状况,你都能随时“读档”,从上次保存的地方继续,真正做到“掉线不掉链”。
Checkpoints 的核心工作机制
LangGraph 的 Checkpoints 机制,是建立在其核心的 State (状态) 抽象之上的。在 LangGraph 中,整个多智能体工作流的运行状态都被封装在一个 State 对象里。每当一个 Agent (节点)执行完毕,它会接收当前 State,根据其逻辑进行处理,然后返回一个新的 State 更新。
Checkpoints 的工作原理就是:每当 State 被更新后,LangGraph 就会自动将这个最新的 State 快照存储起来。 当你的应用需要恢复时,它会从存储中加载指定会话(通过 thread_id 标识)的最新 State,然后工作流就能从这个 State 开始继续执行。
这就像一部电影,每拍完一个镜头(Agent 执行),导演都会把当前的场景、演员位置、道具摆放(State)记录下来(保存快照)。如果中途停拍,下次开拍时,只需要对照记录,就能精确地恢复到上次停拍时的状态。
两种常见的 Saver:内存与持久化
LangGraph 提供了多种 BaseCheckpointSaver 的实现,其中最常用、也最容易上手的有:
MemorySaver(内存存储):- 特点: 顾名思义,它将所有的状态快照都存储在程序的内存中。
- 优点: 速度极快,配置简单,适合开发调试、测试,或者对持久性要求不高的短生命周期任务。
- 缺点: 非持久化! 一旦程序进程终止,所有存储的状态都会丢失。就像电脑内存条一样,断电即清空。
SqliteSaver(SQLite 文件存储):- 特点: 将状态快照存储在一个本地的 SQLite 数据库文件中。SQLite 是一个轻量级的、嵌入式数据库,不需要独立的服务器进程。
- 优点: 持久化! 状态被写入磁盘文件,即使程序进程终止,下次启动时也能从文件中恢复。配置相对简单,对大多数小型到中型应用来说,性能也足够。
- 缺点: 对于超大规模、高并发或需要分布式部署的场景,SQLite 可能不是最佳选择。文件 I/O 性能可能会成为瓶颈。
此外,LangGraph 还支持 PostgresSaver、RedisSaver 等更强大的持久化方案,它们适用于生产环境中对性能、可扩展性和高可用性有更高要求的场景。但对于我们目前的「AI 内容机构」项目来说,SqliteSaver 已经绰绰有余,并且能很好地演示持久化存储的核心概念。
Mermaid 图解:Checkpoints 工作流
让我们用一张 Mermaid 图来直观地看看 Checkpoints 是如何在我们的「AI 内容机构」中工作的。
graph TD
subgraph AI 内容机构工作流
A[开始任务 (例如: 生成社交媒体内容)] --> B{Agent: Planner 规划师};
B -- 更新 AgencyState --> C(Checkpoint: 保存当前状态快照);
C --> D{Agent: Researcher 研究员};
D -- 更新 AgencyState --> E(Checkpoint: 保存当前状态快照);
E --> F{Agent: Writer 作家};
F -- 更新 AgencyState --> G(Checkpoint: 保存当前状态快照);
G --> H{Agent: Editor 编辑器};
H -- 更新 AgencyState --> I(Checkpoint: 保存当前状态快照);
I --> J[任务完成];
end
subgraph 异常与恢复机制
K[系统中断/错误/进程重启];
K --> L{重新启动应用};
L -- 使用相同的 thread_id --> M[从最新的 Checkpoint 加载 AgencyState];
M --> N[继续执行工作流];
end
style C fill:#bbf,stroke:#333,stroke-width:2px,color:#000;
style E fill:#bbf,stroke:#333,stroke-width:2px,color:#000;
style G fill:#bbf,stroke:#333,stroke-width:2px,color:#000;
style I fill:#bbf,stroke:#333,stroke-width:2px,color:#000;
style M fill:#fcc,stroke:#333,stroke-width:2px,color:#000;
style N fill:#afa,stroke:#333,stroke-width:2px,color:#000;图解说明:
- AI 内容机构工作流: 我们的智能体(Planner, Researcher, Writer, Editor)依次执行任务,每当一个 Agent 完成其工作并更新
AgencyState后,Checkpoint节点就会被触发,将当前的AgencyState完整地保存下来。 - 异常与恢复机制: 如果在任何一个 Agent 执行过程中发生系统中断或错误(K),当应用重新启动(L)时,我们可以通过传入相同的
thread_id,让 LangGraph 自动从Checkpoint中加载(M)最新的AgencyState,然后工作流就能从上次中断的那个 Agent 之后继续(N),而不是从头开始。
看到了吗?Checkpoints 就像是给你的多智能体系统加了一道“不死金身”,大大提升了系统的韧性和可靠性。这对于任何复杂的、长时间运行的 AI 应用来说,都是一个不可或缺的关键特性!
💻 实战代码演练 (Agency项目中的具体应用)
现在,让我们把这些原理应用到我们的「AI 万能内容创作机构」项目中。我们将模拟一个 Planner 代理人进行内容规划的过程,并在中途模拟中断,然后利用 SqliteSaver 从中断处恢复。
1. 定义 AgencyState
首先,我们需要一个能够承载我们机构工作状态的 AgencyState。为了演示 Checkpoints,我们简化它,只包含规划任务和已完成任务。
import operator
from typing import Annotated, TypedDict, List
from langchain_core.messages import BaseMessage # 尽管本期未使用,但保留AgencyState的通用性
from langgraph.graph import StateGraph, END
from langgraph.checkpoint import MemorySaver, SqliteSaver # 引入两种Saver
import os
import time
import json # 用于SqliteSaver的数据序列化
# 1. 定义我们AI内容机构的工作状态
# TypedDict 定义了状态的结构,Annotated 结合 operator.add 表示列表类型是可累加的
class AgencyState(TypedDict):
"""
代表AI内容机构的整体工作状态。
"""
# 待规划的任务列表
planning_tasks: Annotated[List[str], operator.add]
# 已完成的规划任务列表
completed_plans: Annotated[List[str], operator.add]
# 当前正在处理的任务名称
current_task: str
# 代理人执行路径(用于追踪和调试)
agent_path: Annotated[List[str], operator.add]
# 清理可能存在的旧数据库文件,确保每次运行都是全新的开始
# 真实生产环境不应随意删除,这里仅为演示方便
if os.path.exists("agency_checkpoints.sqlite"):
os.remove("agency_checkpoints.sqlite")
print("已清理旧的 agency_checkpoints.sqlite 文件。")
2. 定义 Agent Node (PlannerAgent)
我们创建一个简化的 PlannerAgent,它会模拟规划任务的执行,并通过 time.sleep() 来模拟耗时操作,以便我们能“抓住”它中断的时机。
# 2. 定义一个简化的 Planner Agent 节点
class PlannerAgent:
"""
内容规划师代理,负责将大的内容任务分解为小的规划步骤。
"""
def __init__(self, name: str):
self.name = name
def plan(self, state: AgencyState) -> AgencyState:
"""
Planner Agent 的核心逻辑:接收当前状态,规划任务,并返回更新后的状态。
"""
print(f"\n[{self.name}] 正在处理任务: '{state['current_task']}'...")
time.sleep(1.5) # 模拟规划过程中的思考时间
new_tasks = []
# 根据当前任务,模拟不同的规划输出
if state['current_task'] == "生成社交媒体内容规划":
print(f"[{self.name}] 正在分解 '生成社交媒体内容规划'...")
new_tasks = ["确定目标受众", "构思主题", "撰写初稿大纲"]
elif state['current_task'] == "确定目标受众":
print(f"[{self.name}] 正在分解 '确定目标受众'...")
new_tasks = ["分析用户画像", "定义受众特征"]
elif state['current_task'] == "构思主题":
print(f"[{self.name}] 正在分解 '构思主题'...")
new_tasks = ["头脑风暴热门话题", "市场趋势分析"]
elif state['current_task'] == "撰写初稿大纲":
print(f"[{self.name}] 正在分解 '撰写初稿大纲'...")
new_tasks = ["确定文章结构", "分配章节任务"]
else:
# 如果当前任务没有进一步的子任务,则视为完成
print(f"[{self.name}] 完成任务: '{state['current_task']}'")
return {
"completed_plans": [state['current_task']], # 将当前任务添加到已完成列表
"planning_tasks": [], # 清空待规划任务,因为这个分支只处理单个任务完成
"current_task": "", # 清空当前任务
"agent_path": [self.name]
}
# 如果有新的子任务,更新状态
if new_tasks:
next_task = new_tasks.pop(0) # 取出第一个作为下一个当前任务
return {
"planning_tasks": new_tasks, # 剩余的子任务放入待规划列表
"current_task": next_task, # 更新当前任务
"agent_path": [self.name]
}
else:
# 理论上不会走到这里,因为上面已经处理了完成情况
return {"agent_path": [self.name]}
3. 构建 Graph 并集成 Checkpoints
我们将创建一个简单的线性工作流,由 PlannerAgent 循环执行,直到所有规划任务完成。关键在于,我们将 SqliteSaver 实例传入 StateGraph 的 checkpointer 参数。
# 3. 构建 LangGraph 工作流
def create_agency_workflow(checkpointer=None):
"""
创建AI内容机构的工作流,并根据需要集成Checkpointer。
"""
workflow = StateGraph(AgencyState)
planner = PlannerAgent("内容规划师")
# 添加Planner节点
workflow.add_node("planner", planner.plan)
# 定义路由逻辑:如果还有规划任务,就继续交给Planner;否则,结束。
def route_next_task(state: AgencyState):
"""
根据当前状态决定下一步路由。
"""
# 检查 planning_tasks 和 current_task,确保能正确判断是否完成
remaining_tasks = state.get('planning_tasks', [])
current_task_val = state.get('current_task', '')
# 如果当前任务已完成(且没有新的子任务生成),并且待规划列表为空
if not current_task_val and not remaining_tasks:
print(f"[Router] 所有规划任务已完成,工作流结束。")
return END
# 如果当前任务已完成,但待规划列表还有任务
# 或者当前任务还未完成,但它已经生成了新的子任务(即 current_task 被更新为新的子任务)
# 此时,如果 planning_tasks 还有内容,或者 current_task 还有值,都应该继续
if current_task_val or remaining_tasks:
print(f"[Router] 还有任务,继续交给 Planner 处理。")
return "planner"
else:
print(f"[Router] 未知状态,默认结束。")
return END # 兜底,理论上不会走到
workflow.set_entry_point("planner") # 设置入口点为 planner
workflow.add_conditional_edges(
"planner",
route_next_task,
{"planner": "planner", END: END} # 路由到 planner 或结束
)
# 编译工作流,并传入 checkpointer
app = workflow.compile(checkpointer=checkpointer)
return app
4. 运行与恢复 Demo
现在,我们来演示 SqliteSaver 如何实现断点续传,以及 MemorySaver 的非持久性。
if __name__ == "__main__":
# --- 演示 SqliteSaver (持久化存储) ---
print("--- 演示 SqliteSaver (持久化存储) ---")
db_file = "agency_checkpoints.sqlite"
# 确保每次演示都是从一个干净的数据库文件