第 24 期:RESTful API 开发实战

更新于 2026/4/6

API 体系概览

graph TB
    subgraph "Dify API"
        Conv[Conversation API] --> CM[chat-messages]
        Conv --> Comp[completion-messages]
        Conv --> ConvList[conversations]
        Conv --> MsgList[messages]
        
        WF[Workflow API] --> Run[workflows/run]
        
        KB[Knowledge API] --> DS[datasets]
        KB --> Doc[documents]
        KB --> Seg[segments]
    end

Conversation API

发送聊天消息

# 基础聊天请求 (阻塞模式)
curl -X POST 'http://localhost/api/chat-messages' \
  -H 'Authorization: Bearer app-xxxxxxxx' \
  -H 'Content-Type: application/json' \
  -d '{
    "inputs": {},
    "query": "帮我解释什么是 Kubernetes",
    "response_mode": "blocking",
    "conversation_id": "",
    "user": "user-001",
    "files": []
  }'

# 响应
# {
#   "message_id": "msg-xxx",
#   "conversation_id": "conv-xxx",
#   "answer": "Kubernetes 是一个开源的容器编排平台...",
#   "metadata": {
#     "usage": {"total_tokens": 256},
#     "retriever_resources": [...]
#   }
# }

流式输出 (SSE)

import requests
import json

def stream_chat(query: str, conversation_id: str = "") -> str:
    """流式聊天,实时打印输出"""
    response = requests.post(
        "http://localhost/api/chat-messages",
        headers={
            "Authorization": "Bearer app-xxxxxxxx",
            "Content-Type": "application/json"
        },
        json={
            "inputs": {},
            "query": query,
            "response_mode": "streaming",
            "conversation_id": conversation_id,
            "user": "demo"
        },
        stream=True
    )
    
    full_text = ""
    new_conv_id = ""
    
    for line in response.iter_lines():
        if line:
            decoded = line.decode("utf-8")
            if decoded.startswith("data: "):
                event = json.loads(decoded[6:])
                
                if event["event"] == "message":
                    chunk = event["answer"]
                    print(chunk, end="", flush=True)
                    full_text += chunk
                    
                elif event["event"] == "message_end":
                    new_conv_id = event["conversation_id"]
                    usage = event["metadata"]["usage"]
                    print(f"\n[Tokens: {usage['total_tokens']}]")
    
    return full_text, new_conv_id

Workflow API

# 运行 Workflow (阻塞模式)
curl -X POST 'http://localhost/api/workflows/run' \
  -H 'Authorization: Bearer app-xxxxxxxx' \
  -H 'Content-Type: application/json' \
  -d '{
    "inputs": {
      "article_url": "https://example.com/article",
      "target_language": "英文"
    },
    "response_mode": "blocking",
    "user": "api-user"
  }'

会话管理

# 获取会话列表
def list_conversations(user: str, limit: int = 20):
    response = requests.get(
        "http://localhost/api/conversations",
        headers={"Authorization": "Bearer app-xxxxxxxx"},
        params={"user": user, "limit": limit}
    )
    return response.json()["data"]

# 获取会话消息历史
def get_messages(conversation_id: str, user: str):
    response = requests.get(
        "http://localhost/api/messages",
        headers={"Authorization": "Bearer app-xxxxxxxx"},
        params={
            "conversation_id": conversation_id,
            "user": user,
            "limit": 100
        }
    )
    return response.json()["data"]

# 删除会话
def delete_conversation(conversation_id: str, user: str):
    response = requests.delete(
        f"http://localhost/api/conversations/{conversation_id}",
        headers={"Authorization": "Bearer app-xxxxxxxx"},
        json={"user": user}
    )
    return response.status_code == 200