第 14 期 | 时光机!使用 SqliteSaver/MemorySaver 构建 Checkpoints

更新于 2026/4/14

🎯 本期学习目标

各位未来的 AI 架构师们,欢迎来到《LangGraph 多智能体专家课》的第 14 期!还记得我们之前构建的那个雄心勃勃的「AI 万能内容创作机构」吗?它已经能规划、研究、写作甚至编辑了。但现在,我们得给它加上一个超级能力——“时光机”,让它能够“穿越”回过去,或者说,从中断的地方恢复。

本期课程结束后,你将能够:

  1. 透彻理解 LangGraph Checkpoints 的核心价值: 为什么它是构建健壮、高效多智能体系统的“杀手锏”,以及它在实际项目中的应用场景。
  2. 熟练掌握 SqliteSaverMemorySaver 的配置与使用: 学习如何在你的 LangGraph 应用中引入这两种持久化存储方案,并理解它们各自的优缺点。
  3. 为你的「AI 内容机构」引入持久化状态存储: 确保即使系统崩溃、网络中断或进程重启,我们的智能体也能从上次中断的地方无缝恢复工作,避免重复劳动和资源浪费。
  4. 掌握从历史状态快照恢复工作流的实战技巧: 学习如何利用 thread_id 机制,精准地定位和恢复特定会话的运行状态。

准备好了吗?让我们一起开启 LangGraph 的时光机之旅!

📖 原理解析

掉线不掉链:为什么我们需要 Checkpoints?

想象一下,你的「AI 万能内容创作机构」正在夜以继日地为你生成一篇万字长文。Planner 刚刚完成了复杂的结构规划,Researcher 正在海量资料中穿梭,Writer 已经写了一半……突然,服务器掉线了!停电了!或者你的程序因为某个小 bug 崩溃了!

如果你没有 Checkpoints,恭喜你,所有智能体之前辛辛苦苦的工作——那些昂贵的 LLM 调用、耗时的数据处理——全部付诸东流。你不得不从头开始,不仅浪费了宝贵的计算资源和 LLM token(这可是真金白银啊!),更浪费了大量时间。这简直是“吃土”的节奏!

这就是 LangGraph Checkpoints 的核心价值所在:它就像游戏的自动存档功能,在你的多智能体工作流的每一步关键节点,都默默地为你保存一份完整的“游戏进度”。 无论发生什么突发状况,你都能随时“读档”,从上次保存的地方继续,真正做到“掉线不掉链”。

Checkpoints 的核心工作机制

LangGraph 的 Checkpoints 机制,是建立在其核心的 State (状态) 抽象之上的。在 LangGraph 中,整个多智能体工作流的运行状态都被封装在一个 State 对象里。每当一个 Agent (节点)执行完毕,它会接收当前 State,根据其逻辑进行处理,然后返回一个新的 State 更新。

Checkpoints 的工作原理就是:每当 State 被更新后,LangGraph 就会自动将这个最新的 State 快照存储起来。 当你的应用需要恢复时,它会从存储中加载指定会话(通过 thread_id 标识)的最新 State,然后工作流就能从这个 State 开始继续执行。

这就像一部电影,每拍完一个镜头(Agent 执行),导演都会把当前的场景、演员位置、道具摆放(State)记录下来(保存快照)。如果中途停拍,下次开拍时,只需要对照记录,就能精确地恢复到上次停拍时的状态。

两种常见的 Saver:内存与持久化

LangGraph 提供了多种 BaseCheckpointSaver 的实现,其中最常用、也最容易上手的有:

  1. MemorySaver (内存存储):
    • 特点: 顾名思义,它将所有的状态快照都存储在程序的内存中。
    • 优点: 速度极快,配置简单,适合开发调试、测试,或者对持久性要求不高的短生命周期任务。
    • 缺点: 非持久化! 一旦程序进程终止,所有存储的状态都会丢失。就像电脑内存条一样,断电即清空。
  2. SqliteSaver (SQLite 文件存储):
    • 特点: 将状态快照存储在一个本地的 SQLite 数据库文件中。SQLite 是一个轻量级的、嵌入式数据库,不需要独立的服务器进程。
    • 优点: 持久化! 状态被写入磁盘文件,即使程序进程终止,下次启动时也能从文件中恢复。配置相对简单,对大多数小型到中型应用来说,性能也足够。
    • 缺点: 对于超大规模、高并发或需要分布式部署的场景,SQLite 可能不是最佳选择。文件 I/O 性能可能会成为瓶颈。

此外,LangGraph 还支持 PostgresSaverRedisSaver 等更强大的持久化方案,它们适用于生产环境中对性能、可扩展性和高可用性有更高要求的场景。但对于我们目前的「AI 内容机构」项目来说,SqliteSaver 已经绰绰有余,并且能很好地演示持久化存储的核心概念。

Mermaid 图解:Checkpoints 工作流

让我们用一张 Mermaid 图来直观地看看 Checkpoints 是如何在我们的「AI 内容机构」中工作的。

graph TD
    subgraph AI 内容机构工作流
        A[开始任务 (例如: 生成社交媒体内容)] --> B{Agent: Planner 规划师};
        B -- 更新 AgencyState --> C(Checkpoint: 保存当前状态快照);
        C --> D{Agent: Researcher 研究员};
        D -- 更新 AgencyState --> E(Checkpoint: 保存当前状态快照);
        E --> F{Agent: Writer 作家};
        F -- 更新 AgencyState --> G(Checkpoint: 保存当前状态快照);
        G --> H{Agent: Editor 编辑器};
        H -- 更新 AgencyState --> I(Checkpoint: 保存当前状态快照);
        I --> J[任务完成];
    end

    subgraph 异常与恢复机制
        K[系统中断/错误/进程重启];
        K --> L{重新启动应用};
        L -- 使用相同的 thread_id --> M[从最新的 Checkpoint 加载 AgencyState];
        M --> N[继续执行工作流];
    end

    style C fill:#bbf,stroke:#333,stroke-width:2px,color:#000;
    style E fill:#bbf,stroke:#333,stroke-width:2px,color:#000;
    style G fill:#bbf,stroke:#333,stroke-width:2px,color:#000;
    style I fill:#bbf,stroke:#333,stroke-width:2px,color:#000;
    style M fill:#fcc,stroke:#333,stroke-width:2px,color:#000;
    style N fill:#afa,stroke:#333,stroke-width:2px,color:#000;

图解说明:

  • AI 内容机构工作流: 我们的智能体(Planner, Researcher, Writer, Editor)依次执行任务,每当一个 Agent 完成其工作并更新 AgencyState 后,Checkpoint 节点就会被触发,将当前的 AgencyState 完整地保存下来。
  • 异常与恢复机制: 如果在任何一个 Agent 执行过程中发生系统中断或错误(K),当应用重新启动(L)时,我们可以通过传入相同的 thread_id,让 LangGraph 自动从 Checkpoint 中加载(M)最新的 AgencyState,然后工作流就能从上次中断的那个 Agent 之后继续(N),而不是从头开始。

看到了吗?Checkpoints 就像是给你的多智能体系统加了一道“不死金身”,大大提升了系统的韧性和可靠性。这对于任何复杂的、长时间运行的 AI 应用来说,都是一个不可或缺的关键特性!

💻 实战代码演练 (Agency项目中的具体应用)

现在,让我们把这些原理应用到我们的「AI 万能内容创作机构」项目中。我们将模拟一个 Planner 代理人进行内容规划的过程,并在中途模拟中断,然后利用 SqliteSaver 从中断处恢复。

1. 定义 AgencyState

首先,我们需要一个能够承载我们机构工作状态的 AgencyState。为了演示 Checkpoints,我们简化它,只包含规划任务和已完成任务。

import operator
from typing import Annotated, TypedDict, List
from langchain_core.messages import BaseMessage # 尽管本期未使用,但保留AgencyState的通用性
from langgraph.graph import StateGraph, END
from langgraph.checkpoint import MemorySaver, SqliteSaver # 引入两种Saver
import os
import time
import json # 用于SqliteSaver的数据序列化

# 1. 定义我们AI内容机构的工作状态
# TypedDict 定义了状态的结构,Annotated 结合 operator.add 表示列表类型是可累加的
class AgencyState(TypedDict):
    """
    代表AI内容机构的整体工作状态。
    """
    # 待规划的任务列表
    planning_tasks: Annotated[List[str], operator.add]
    # 已完成的规划任务列表
    completed_plans: Annotated[List[str], operator.add]
    # 当前正在处理的任务名称
    current_task: str
    # 代理人执行路径(用于追踪和调试)
    agent_path: Annotated[List[str], operator.add]

# 清理可能存在的旧数据库文件,确保每次运行都是全新的开始
# 真实生产环境不应随意删除,这里仅为演示方便
if os.path.exists("agency_checkpoints.sqlite"):
    os.remove("agency_checkpoints.sqlite")
    print("已清理旧的 agency_checkpoints.sqlite 文件。")

2. 定义 Agent Node (PlannerAgent)

我们创建一个简化的 PlannerAgent,它会模拟规划任务的执行,并通过 time.sleep() 来模拟耗时操作,以便我们能“抓住”它中断的时机。

# 2. 定义一个简化的 Planner Agent 节点
class PlannerAgent:
    """
    内容规划师代理,负责将大的内容任务分解为小的规划步骤。
    """
    def __init__(self, name: str):
        self.name = name

    def plan(self, state: AgencyState) -> AgencyState:
        """
        Planner Agent 的核心逻辑:接收当前状态,规划任务,并返回更新后的状态。
        """
        print(f"\n[{self.name}] 正在处理任务: '{state['current_task']}'...")
        time.sleep(1.5) # 模拟规划过程中的思考时间

        new_tasks = []
        # 根据当前任务,模拟不同的规划输出
        if state['current_task'] == "生成社交媒体内容规划":
            print(f"[{self.name}] 正在分解 '生成社交媒体内容规划'...")
            new_tasks = ["确定目标受众", "构思主题", "撰写初稿大纲"]
        elif state['current_task'] == "确定目标受众":
            print(f"[{self.name}] 正在分解 '确定目标受众'...")
            new_tasks = ["分析用户画像", "定义受众特征"]
        elif state['current_task'] == "构思主题":
            print(f"[{self.name}] 正在分解 '构思主题'...")
            new_tasks = ["头脑风暴热门话题", "市场趋势分析"]
        elif state['current_task'] == "撰写初稿大纲":
            print(f"[{self.name}] 正在分解 '撰写初稿大纲'...")
            new_tasks = ["确定文章结构", "分配章节任务"]
        else:
            # 如果当前任务没有进一步的子任务,则视为完成
            print(f"[{self.name}] 完成任务: '{state['current_task']}'")
            return {
                "completed_plans": [state['current_task']], # 将当前任务添加到已完成列表
                "planning_tasks": [], # 清空待规划任务,因为这个分支只处理单个任务完成
                "current_task": "", # 清空当前任务
                "agent_path": [self.name]
            }

        # 如果有新的子任务,更新状态
        if new_tasks:
            next_task = new_tasks.pop(0) # 取出第一个作为下一个当前任务
            return {
                "planning_tasks": new_tasks, # 剩余的子任务放入待规划列表
                "current_task": next_task, # 更新当前任务
                "agent_path": [self.name]
            }
        else:
            # 理论上不会走到这里,因为上面已经处理了完成情况
            return {"agent_path": [self.name]}

3. 构建 Graph 并集成 Checkpoints

我们将创建一个简单的线性工作流,由 PlannerAgent 循环执行,直到所有规划任务完成。关键在于,我们将 SqliteSaver 实例传入 StateGraphcheckpointer 参数。

# 3. 构建 LangGraph 工作流
def create_agency_workflow(checkpointer=None):
    """
    创建AI内容机构的工作流,并根据需要集成Checkpointer。
    """
    workflow = StateGraph(AgencyState)
    planner = PlannerAgent("内容规划师")

    # 添加Planner节点
    workflow.add_node("planner", planner.plan)

    # 定义路由逻辑:如果还有规划任务,就继续交给Planner;否则,结束。
    def route_next_task(state: AgencyState):
        """
        根据当前状态决定下一步路由。
        """
        # 检查 planning_tasks 和 current_task,确保能正确判断是否完成
        remaining_tasks = state.get('planning_tasks', [])
        current_task_val = state.get('current_task', '')

        # 如果当前任务已完成(且没有新的子任务生成),并且待规划列表为空
        if not current_task_val and not remaining_tasks:
            print(f"[Router] 所有规划任务已完成,工作流结束。")
            return END
        
        # 如果当前任务已完成,但待规划列表还有任务
        # 或者当前任务还未完成,但它已经生成了新的子任务(即 current_task 被更新为新的子任务)
        # 此时,如果 planning_tasks 还有内容,或者 current_task 还有值,都应该继续
        if current_task_val or remaining_tasks:
             print(f"[Router] 还有任务,继续交给 Planner 处理。")
             return "planner"
        else:
            print(f"[Router] 未知状态,默认结束。")
            return END # 兜底,理论上不会走到

    workflow.set_entry_point("planner") # 设置入口点为 planner
    workflow.add_conditional_edges(
        "planner",
        route_next_task,
        {"planner": "planner", END: END} # 路由到 planner 或结束
    )

    # 编译工作流,并传入 checkpointer
    app = workflow.compile(checkpointer=checkpointer)
    return app

4. 运行与恢复 Demo

现在,我们来演示 SqliteSaver 如何实现断点续传,以及 MemorySaver 的非持久性。

if __name__ == "__main__":
    # --- 演示 SqliteSaver (持久化存储) ---
    print("--- 演示 SqliteSaver (持久化存储) ---")
    db_file = "agency_checkpoints.sqlite"
    # 确保每次演示都是从一个干净的数据库文件