第 17 期:知识库维护与性能优化

Updated on 4/6/2026

[Translation Pending]\n\n## 知识库生命周期

graph LR
    Create[创建知识库] --> Import[导入文档]
    Import --> Index[索引构建]
    Index --> Use[在线使用]
    Use --> Monitor[监控召回质量]
    Monitor --> Update[增量更新]
    Update --> Reindex[必要时重建索引]
    Reindex --> Use

增量更新策略

import requests
from datetime import datetime, timedelta

def sync_updated_docs(dataset_id: str, since_hours: int = 24):
    """同步最近更新的文档到知识库"""
    cutoff = datetime.now() - timedelta(hours=since_hours)
    
    # 获取知识库中已有文档列表
    existing = requests.get(
        f"{BASE_URL}/datasets/{dataset_id}/documents",
        headers={"Authorization": f"Bearer {TOKEN}"},
        params={"page": 1, "limit": 100}
    ).json()
    
    existing_names = {doc["name"]: doc["id"] for doc in existing["data"]}
    
    # 扫描本地文件,找到新增/修改的
    for filename in os.listdir(DOCS_DIR):
        filepath = os.path.join(DOCS_DIR, filename)
        mod_time = datetime.fromtimestamp(os.path.getmtime(filepath))
        
        if mod_time > cutoff:
            if filename in existing_names:
                # 已存在 → 更新(删除旧版 + 重新上传)
                doc_id = existing_names[filename]
                requests.delete(
                    f"{BASE_URL}/datasets/{dataset_id}/documents/{doc_id}",
                    headers={"Authorization": f"Bearer {TOKEN}"}
                )
                print(f"🔄 更新: {filename}")
            else:
                print(f"➕ 新增: {filename}")
            
            # 上传文档
            upload_document(dataset_id, filepath)

# 定时任务: 每天凌晨同步
# crontab: 0 2 * * * python sync_docs.py

分段质量检测

def check_segment_quality(dataset_id: str) -> dict:
    """检测知识库分段质量"""
    segments = requests.get(
        f"{BASE_URL}/datasets/{dataset_id}/segments",
        headers={"Authorization": f"Bearer {TOKEN}"},
        params={"page": 1, "limit": 500}
    ).json()
    
    issues = {
        "too_short": [],     # < 50 字
        "too_long": [],      # > 1000 字
        "duplicates": [],    # 重复内容
    }
    
    contents = []
    for seg in segments["data"]:
        content = seg["content"]
        char_count = len(content)
        
        if char_count < 50:
            issues["too_short"].append(seg["id"])
        elif char_count > 1000:
            issues["too_long"].append(seg["id"])
        
        if content in contents:
            issues["duplicates"].append(seg["id"])
        contents.append(content)
    
    return {
        "total_segments": len(segments["data"]),
        "issues": issues,
        "healthy": not any(issues.values())
    }

性能优化清单

graph TB
    Perf[性能优化] --> Embed[Embedding 优化]
    Perf --> Search[检索优化]
    Perf --> Infra[基础设施优化]
    
    Embed --> E1[选择轻量 Embedding 模型]
    Embed --> E2[批量 Embedding 而非逐条]
    
    Search --> S1[合理设置 Top-K ≤ 10]
    Search --> S2[启用 Rerank 二次排序]
    Search --> S3[调高 Score 阈值过滤噪声]
    
    Infra --> I1[向量库使用 SSD 存储]
    Infra --> I2[PostgreSQL 连接池优化]
    Infra --> I3[Redis 缓存热点查询]
优化项 影响 操作
使用 text-embedding-3-small 替代 large 检索速度提升 2x 重新索引
Top-K 从 10 降到 5 减少 LLM Token 消耗 修改检索配置
开启 Rerank 准确率提升 15-30% 添加 Cohere Rerank 模型
分段长度控制在 300-500 字 召回精度提升 重新分段