第 26 期 | 前端融合:LangGraph 的前后端分离架构规划

更新于 2026/4/17

欢迎回来,各位架构师们!我是你们的老朋友。

在过去的 25 期里,我们像是一群在地下室里闭关修炼的极客。我们精心打磨了「AI 万能内容创作机构 (AI Content Agency)」,亲手赋予了 Planner(主编)运筹帷幄的能力,让 Researcher(研究员)能全网搜刮资料,让 Writer(主笔)妙笔生花,还有 Editor(校对)的火眼金睛。

在终端里看着满屏滚动的绿字,你可能觉得:“哇,这太酷了!”

但是,醒醒吧同学们!你的老板、你的客户、以及最终的用户,他们是绝对不会打开终端去敲命令的! 他们需要的是一个漂亮的网页,一个输入框,一个闪烁的 Loading 动画,最后“叮”的一声,一篇完美的爆款文章出现在屏幕上。

从今天这期开始,我们要把我们的 AI Agency 搬到台面上来。我们要探讨的是:如何为 LangGraph 这种极其耗时的“慢思考”引擎,穿上一件标准、优雅的 Web API 外衣,并让前端(比如 React/Vue)能够舒服地与之交互。

别以为这很简单,如果你敢在 FastAPI 或 Express 里直接 await graph.invoke(),我保证你的前端会因为 HTTP 超时而崩溃,你的用户会盯着白屏怀疑人生。

准备好了吗?今天我们不仅讲“道”,更要手写“术”。


🎯 本期学习目标

  1. 认知升级:理解 LangGraph 状态机与传统 HTTP 请求-响应模型的本质冲突。
  2. 架构设计:掌握“异步任务派发 + 状态轮询 (Heartbeat)”的前后端分离架构。
  3. 后端实操:使用 FastAPI 构建标准的 LangGraph 包装层,将 Task ID 与 LangGraph 的 Thread ID 完美绑定。
  4. 前端对接:编写 React (TypeScript) 端的心跳轮询逻辑,实现丝滑的用户体验。

📖 原理解析

在传统的 Web 开发中,典型的 CRUD 接口是同步的:前端发请求 -> 后端查数据库 -> 返回结果。整个过程通常在 200 毫秒内完成。

但我们的 AI Content Agency 呢? Planner 要思考大纲(3秒),Researcher 要去 Google 搜索并总结(10秒),Writer 要写 2000 字的长文(20秒),Editor 还要审查修改(10秒)。一个完整的 Graph 跑下来,可能需要 40 秒到 1 分钟

如果你用传统的同步 HTTP 请求: 前端发出 POST 请求,然后傻傻地等。浏览器默认的 HTTP 超时时间通常是 30 秒到 60 秒。如果网络稍微抖动,或者大模型 API 抽风,连接就会断开。用户看到的是 504 Gateway Timeout,而你的后端其实还在苦哈哈地跑模型,跑出来的结果最终也无法返回给前端。这就是典型的**“钱花了(Token消耗),事办了,但客户跑了”**。

解决方案:餐厅取餐牌模式 (Asynchronous Polling)

我们要引入的是“异步任务 + 轮询”机制。就像你去肯德基点餐:

  1. 你在前台点了一份“全家桶”(提交创作主题)。
  2. 收银员不会让你一直站在柜台前等,而是给你一张取餐小票(Task ID),然后告诉你:“去旁边坐着看屏幕吧”。(后端立即返回 202 Accepted)。
  3. 你(前端)每隔几秒钟抬头看一眼大屏幕(发送 GET 请求查询状态)。
  4. 当大屏幕显示你的取餐号时,你拿着小票去领餐(获取最终文章)。

结合 LangGraph,我们有一个绝佳的优势:LangGraph 原生支持 Checkpointer(持久化),它的 thread_id 天生就可以作为我们的 Task ID

来看看这套架构的流转图:

sequenceDiagram
    participant U as User (React UI)
    participant API as FastAPI (Backend Gate)
    participant BG as Background Task / Worker
    participant LG as LangGraph (AI Agency)
    participant DB as Checkpointer (SQLite/Redis)

    U->>API: 1. POST /api/agency/generate {topic: "AI未来"}
    API->>DB: 2. 生成 thread_id (Task ID)
    API->>BG: 3. 触发后台异步执行 Graph
    API-->>U: 4. 返回 202 Accepted {task_id: "1234-5678"}
    
    rect rgb(240, 248, 255)
    Note over U, API: 前端心跳轮询 (Heartbeat Polling)
    loop 每 3 秒执行一次
        U->>API: 5. GET /api/agency/status/1234-5678
        API->>DB: 6. 读取该 thread_id 的最新 State
        alt 任务执行中
            DB-->>API: State: {status: "Researcher 采集中..."}
            API-->>U: 返回 200 {status: "running", data: null}
        else 任务已完成
            DB-->>API: State: {status: "done", final_article: "..."}
            API-->>U: 返回 200 {status: "completed", article: "..."}
        end
    end
    end
    
    %% 后台执行流
    BG->>LG: 异步调用 graph.ainvoke(..., config={"configurable": {"thread_id": "1234-5678"}})
    LG->>DB: 实时更新每一步的 State

看到了吗?FastAPI 只负责接客和查状态,真正的重体力活交给了后台任务,而 LangGraph 的 Checkpointer 成为了前后端状态同步的桥梁。


💻 实战代码演练

为了让大家能直接跑起来,我将提供两部分代码:Python 后端 (FastAPI + LangGraph) 和 TypeScript 前端 (React)。

1. 后端:FastAPI + LangGraph 包装层

首先,我们需要一个模拟的 Agency Graph。为了不让代码太臃肿,我用一个简化版的 StateGraph 来代表我们复杂的 Planner/Researcher/Writer 流程,并引入 MemorySaver 来作为持久化层。

安装依赖:

pip install fastapi uvicorn langgraph langchain-openai pydantic

main.py 核心代码:

import asyncio
import uuid
from typing import Dict, TypedDict, Any
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

# ==========================================
# 1. 定义 LangGraph 状态与节点 (模拟我们的 AI Agency)
# ==========================================
class AgencyState(TypedDict):
    topic: str
    current_agent: str      # 当前正在工作的 Agent
    draft: str              # 草稿
    final_article: str      # 最终成文
    status: str             # 任务状态: "running", "completed", "failed"

# 模拟 Planner 节点
async def planner_node(state: AgencyState):
    print(f"[Planner] 正在为主题 '{state['topic']}' 构思大纲...")
    await asyncio.sleep(2) # 模拟大模型思考时间
    return {"current_agent": "Planner", "draft": "大纲:1. 背景 2. 发展 3. 结论"}

# 模拟 Writer 节点
async def writer_node(state: AgencyState):
    print(f"[Writer] 正在根据大纲撰写初稿...")
    await asyncio.sleep(3) # 模拟大模型写作时间
    return {"current_agent": "Writer", "draft": state["draft"] + "\n正文:AI 正在重塑世界..."}

# 模拟 Editor 节点
async def editor_node(state: AgencyState):
    print(f"[Editor] 正在润色文章...")
    await asyncio.sleep(2)
    final_text = state["draft"] + "\n[校对完毕,可以直接发布]"
    return {"current_agent": "Editor", "final_article": final_text, "status": "completed"}

# 构建工作流图
workflow = StateGraph(AgencyState)
workflow.add_node("planner", planner_node)
workflow.add_node("writer", writer_node)
workflow.add_node("editor", editor_node)

workflow.add_edge(START, "planner")
workflow.add_edge("planner", "writer")
workflow.add_edge("writer", "editor")
workflow.add_edge("editor", END)

# 核心:引入 Checkpointer,让图的每一步状态都能被存下来查询!
memory = MemorySaver()
agency_graph = workflow.compile(checkpointer=memory)

# ==========================================
# 2. FastAPI 包装层 (Web API)
# ==========================================
app = FastAPI(title="AI Content Agency API")

# 允许前端跨域请求
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# 请求模型
class GenerateRequest(BaseModel):
    topic: str

# 异步后台任务:负责真正执行 LangGraph
async def run_agency_graph_background(thread_id: str, topic: str):
    config = {"configurable": {"thread_id": thread_id}}
    initial_state = {
        "topic": topic,
        "current_agent": "Initializing",
        "draft": "",
        "final_article": "",
        "status": "running"
    }
    try:
        # 使用 ainvoke 异步执行,不会阻塞 FastAPI 主线程
        # 注意:这里我们一口气跑完。因为配置了 checkpointer,
        # LangGraph 会自动在后台把每一步的 state 写进 memory 中。
        await agency_graph.ainvoke(initial_state, config=config)
    except Exception as e:
        print(f"Graph execution failed: {e}")
        # 如果出错,我们需要手动更新一下状态为 failed (实际生产中建议用专门的异常处理节点)
        agency_graph.update_state(config, {"status": "failed"})

@app.post("/api/v1/agency/generate", status_code=202)
async def start_generation(req: GenerateRequest, background_tasks: BackgroundTasks):
    """
    前端调用此接口提交任务,获取取餐牌 (task_id)
    """
    # 1. 生成唯一的 Task ID (等同于 LangGraph 的 Thread ID)
    task_id = str(uuid.uuid4())
    
    # 2. 将耗时的 Graph 执行丢入后台任务
    background_tasks.add_task(run_agency_graph_background, task_id, req.topic)
    
    # 3. 立即返回 Task ID 给前端
    return {
        "message": "Task accepted. Please poll the status.",
        "task_id": task_id
    }

@app.get("/api/v1/agency/status/{task_id}")
async def get_status(task_id: str):
    """
    前端通过心跳轮询此接口,获取最新进度
    """
    config = {"configurable": {"thread_id": task_id}}
    
    # 从 Checkpointer 中读取最新的 State
    state_snapshot = agency_graph.get_state(config)
    
    # 如果没有找到快照,说明任务还没开始或者 ID 错误
    if not state_snapshot or not state_snapshot.values:
        raise HTTPException(status_code=404, detail="Task not found or not started yet.")
    
    current_state = state_snapshot.values
    
    # 构造返回给前端的友好数据
    response = {
        "task_id": task_id,
        "status": current_state.get("status", "running"),
        "current_agent": current_state.get("current_agent", "Unknown"),
    }
    
    # 如果完成了,把最终文章返回
    if current_state.get("status") == "completed":
        response["final_article"] = current_state.get("final_article")
        
    return response

if __name__ == "__main__":
    import uvicorn
    # 运行:python main.py
    uvicorn.run(app, host="0.0.0.0", port=8000)

2. 前端:React 端的心跳轮询 (Heartbeat)

现在,后端已经准备好了。前端不需要再傻等 1 分钟了。我们用 TypeScript + React hooks 来写一个优雅的轮询机制。

// Frontend: React Component (AgencyClient.tsx)
import React, { useState, useEffect, useRef } from 'react';

// 定义接口类型
interface TaskResponse {
  task_id: string;
  status: 'running' | 'completed' | 'failed';
  current_agent: string;
  final_article?: string;
}

export const AgencyClient: React.FC = () => {
  const [topic, setTopic] = useState<string>('');
  const [taskId, setTaskId] = useState<string | null>(null);
  const [status, setStatus] = useState<string>('idle');
  const [agentMsg, setAgentMsg] = useState<string>('');
  const [article, setArticle] = useState<string>('');
  
  // 使用 ref 存储定时器 ID,方便清理
  const pollingIntervalRef = useRef<NodeJS.Timeout | null>(null);

  // 1. 提交任务 (点餐)
  const handleGenerate = async () => {
    setStatus('submitting');
    setArticle('');
    try {
      const res = await fetch('http://localhost:8000/api/v1/agency/generate', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ topic })
      });
      const data = await res.json();
      setTaskId(data.task_id); // 拿到取餐牌!
      setStatus('running');
    } catch (error) {
      console.error("提交失败", error);
      setStatus('failed');
    }
  };

  // 2. 监听 taskId 变化,启动心跳轮询
  useEffect(() => {
    if (!taskId || status !== 'running') return;

    // 定义轮询函数
    const pollStatus = async () => {
      try {
        const res = await fetch(`http://localhost:8000/api/v1/agency/status/${taskId}`);
        if (!res.ok) return;
        
        const data: TaskResponse = await res.json();
        
        // 更新 UI 展示当前是哪个 Agent 在干活
        setAgentMsg(`当前处理节点: ${data.current_agent}...`);

        if (data.status === 'completed') {
          // 任务完成,停止轮询,展示文章
          setStatus('completed');
          setArticle(data.final_article || '');
          if (pollingIntervalRef.current) clearInterval(pollingIntervalRef.current);
        } else if (data.status === 'failed') {
          setStatus('failed');
          if (pollingIntervalRef.current) clearInterval(pollingIntervalRef.current);
        }
      } catch (error) {
        console.error("轮询失败", error);
      }
    };

    // 立即查一次,然后每 2 秒查一次 (Heartbeat: 2000ms)
    pollStatus();
    pollingIntervalRef.current = setInterval(pollStatus, 2000);

    // 组件卸载时清理定时器
    return () => {
      if (pollingIntervalRef.current) clearInterval(pollingIntervalRef.current);
    };
  }, [taskId, status]);

  return (
    <div style={{ padding: '20px', maxWidth: '600px', margin: '0 auto' }}>
      <h2>AI Content Agency 客户端</h2>
      
      <div style={{ display: 'flex', gap: '10px', marginBottom: '20px' }}>
        <input 
          type="text" 
          value={topic} 
          onChange={(e) => setTopic(e.target.value)} 
          placeholder="输入文章主题,如:AI未来"
          disabled={status === 'running'}
          style={{ flex: 1, padding: '8px' }}
        />
        <button 
          onClick={handleGenerate} 
          disabled={!topic || status === 'running'}
        >
          {status === 'running' ? '创作中...' : '开始创作'}
        </button>
      </div>

      {/* 状态展示区 */}
      {status === 'running' && (
        <div style={{ color: 'blue', fontStyle: 'italic' }}>
          <p>⏳ 正在为您疯狂肝稿中,请稍候...</p>
          <p>🤖 {agentMsg}</p>
        </div>
      )}

      {/* 结果展示区 */}
      {status === 'completed' && (
        <div style={{ border: '1px solid #ccc', padding: '15px', borderRadius: '8px', backgroundColor: '#f9f9f9' }}>
          <h3>🎉 创作完成:</h3>
          <pre style={{ whiteSpace: 'pre-wrap', fontFamily: 'inherit' }}>{article}</pre>
        </div>
      )}
    </div>
  );
};

坑与避坑指南

同学们,上面的代码虽然跑得通,但在生产环境里,有几个大坑你必须知道。作为有着 10 年踩坑经验的导师,我得给你们打个预防针:

🚨 坑一:内存爆炸(MemorySaver 的隐患)

在 Demo 中,我们用了 MemorySaver() 作为 Checkpointer。这意味着所有任务的状态都保存在 FastAPI 进程的内存里。 后果:如果你的网站火了,一天有 1 万个任务,你的服务器内存会迅速被撑爆。一旦你重启 FastAPI 服务,所有还在 running 的任务状态全部丢失,前端永远只能轮询到 404! 避坑指南:在生产环境中,绝对不要用 MemorySaver。请使用 AsyncSqliteSaverAsyncPostgresSaver 或者 RedisSaver。把状态持久化到数据库里,这样不仅不怕重启,还能横向扩展多个 FastAPI 节点。

🚨 坑二:BackgroundTasks 的阻塞危机

FastAPI 的 BackgroundTasks 默认是在同一个事件循环(Event Loop)里跑的。如果你的 LangGraph 里面某个 Node 包含了同步阻塞的代码(比如使用了没有 asyncrequests.get 或者 CPU 密集型的文本处理),它会卡死整个 FastAPI,导致前端的轮询请求(GET status)也排队超时。 避坑指南

  1. 确保你 Graph 里的每一个节点都是真正的异步(使用 async def 和异步的 HTTP 客户端如 httpx)。
  2. 如果必须运行同步的、耗时的老代码,请使用 asyncio.to_thread() 将其放到线程池中执行,或者干脆引入 Celery / Redis Queue (RQ) 将任务彻底分离到独立的 Worker 进程中。

🚨 坑三:DDoS 你自己(轮询频率过高)

前端的 setInterval 如果设置成 100 毫秒,100 个用户同时点餐,你的后端一秒钟就要承受 1000 次查询。 避坑指南: 引入指数退避 (Exponential Backoff) 策略。刚开始 2 秒查一次,如果过了 10 秒还没好,变成 5 秒查一次,再过 30 秒变成 10 秒查一次。不要傻傻地一直以高频请求服务器。


📝 本期小结

今天,我们成功地带领 AI Content Agency 走出了终端的地下室,穿上了 FastAPI 的西装,并与 React 前端完成了第一次“握手”。

我们学到了:

  1. LangGraph 的长耗时特性决定了我们必须采用前后端分离的异步流
  2. 取餐牌模式:POST 触发任务并返回 Task ID,GET 轮询状态。
  3. LangGraph 的杀手锏:直接利用 checkpointerthread_id,完美实现了业务 Task ID 与底层状态图的绑定,后端完全不需要自己去维护一个额外的状态字典!

下期预告: 轮询虽然好用,但不够“性感”。你有没有发现 ChatGPT 生成内容时,是一个字一个字往外蹦的(打字机效果)? 在第 27 期,我们将进阶挑战:抛弃轮询,引入 SSE (Server-Sent Events) 与 Streaming 流式输出,让你的前端能够实时看到 Writer Agent 敲下的每一个字!

各位架构师,把今天的代码跑通,我们下期见!别忘了给你们的 API 加上 Postgres 持久化哦!