跳转到主要内容

端点

POST http://localhost:8080/api/v1/tasks

描述

将新任务提交给 Shannon 以供执行。任务立即排队,由 Temporal 工作流引擎异步处理。

身份验证

必需:是 在请求头中包含 API 密钥:
X-API-Key: sk_test_123456

请求

请求头

请求头必需描述示例
X-API-Key身份验证密钥sk_test_123456
Content-Type必须是 application/jsonapplication/json
Idempotency-Key幂等性的唯一密钥550e8400-e29b-41d4-a716-446655440000
traceparentW3C 追踪上下文00-4bf92f...

请求体参数

参数类型必需描述
querystring自然语言任务描述
session_idstring多轮对话的会话标识符
contextobject作为键值对的附加上下文数据
modestring工作流路由:simplestandardcomplexsupervisor
model_tierstring模型层级:smallmediumlarge
model_overridestring指定模型名称(规范 ID;例如 gpt-5claude-sonnet-4-5-20250929
provider_overridestring强制指定提供商(如 openaianthropicgoogle

请求体架构

示例 1:通用 AI 驱动执行
{
  "query": "分析 8 月份网站流量趋势",                             // 必需:要执行的任务
  "session_id": "analytics-session-123",                        // 可选:多轮对话的会话 ID(若省略则自动生成)
  "mode": "supervisor",                                         // 可选:工作流路由 - "simple"、"standard"、"complex" 或 "supervisor"(默认:自动检测)
  "model_tier": "large",                                        // 可选:模型大小 - "small"、"medium" 或 "large"(默认:"small")
  "model_override": "gpt-5",                                   // 可选:指定模型(规范 ID)
  "provider_override": "openai",                                // 可选:强制指定提供商
  "context": {                                                  // 可选:执行上下文对象
    "role": "data_analytics",                                   // 可选:角色预设名称(如 "analysis"、"research"、"writer")
    "system_prompt": "你是专注于网站分析的数据分析师。",         // 可选:自定义系统提示(覆盖角色预设)
    "prompt_params": {                                         // 可选:提示/工具/适配器的任意键值对
      "profile_id": "49598h6e",                                // 示例:自定义参数(传递给工具/适配器)
      "aid": "fcb1cd29-9104-47b1-b914-31db6ba30c1a",          // 示例:自定义参数(应用程序 ID)
      "current_date": "2025-10-31"                             // 示例:自定义参数(当前日期)
    },
    "history_window_size": 75,                                 // 可选:最大对话历史消息数(默认:50)
    "primers_count": 3,                                        // 可选:保留的早期消息数(默认:5)
    "recents_count": 20,                                       // 可选:保留的最近消息数(默认:15)
    "compression_trigger_ratio": 0.75,                         // 可选:在窗口的 75% 时触发压缩(默认:0.8)
    "compression_target_ratio": 0.375                          // 可选:压缩到窗口的 37.5%(默认:0.5)
  }
}
示例 2:仅模板执行(无 AI)
{
  "query": "生成每周研究简报",                                    // 必需:任务描述
  "session_id": "research-session-456",                        // 可选:会话 ID
  "context": {                                                  // 可选:上下文对象
    "template": "research_summary",                            // 可选:要使用的模板名称
    "template_version": "1.0.0",                               // 可选:模板版本(默认:最新)
    "disable_ai": true,                                        // 可选:仅模板模式,无 AI 回退(默认:false)
    "prompt_params": {                                         // 可选:模板渲染的参数
      "week": "2025-W44"                                       // 示例:模板的自定义参数
    }
  }
}
避免参数冲突:
  • 不要同时使用 templatetemplate_name(它们是别名 - 仅使用 template
  • 不要将 disable_ai: true 与模型控制参数组合使用 - 网关检测到冲突时会返回 400 错误:
    • disable_ai: true + model_tier → 400
    • disable_ai: true + model_override → 400
    • disable_ai: true + provider_override → 400
  • 顶层参数会覆盖上下文中的等效参数:
    • 顶层 model_tier 会覆盖 context.model_tier
    • 顶层 model_override 会覆盖 context.model_override
    • 顶层 provider_override 会覆盖 context.provider_override

上下文参数 (context.*)

支持的键:
  • role — 角色预设(如 analysisresearchwriter
  • system_prompt — 覆盖角色提示;支持从 prompt_params 引用 ${var}
  • prompt_params — 提示/工具的自定义参数
  • model_tier — 当顶层未提供时作为回退
  • model_override — 指定具体模型(规范 ID;例如 gpt-5claude-sonnet-4-5-20250929
  • provider_override — 强制指定提供商(如 openaianthropicgoogle
  • template — 模板名称(别名:template_name
  • template_version — 模板版本
  • disable_ai — 仅模板模式(不回退到 AI)- 不能与模型控制参数组合使用
  • 窗口控制:history_window_sizeuse_case_presetprimers_countrecents_countcompression_trigger_ratiocompression_target_ratio
规则:
  • 顶层参数会覆盖上下文中的等效参数:model_tiermodel_overrideprovider_override
  • mode 支持:simple|standard|complex|supervisor(默认:自动检测)
  • model_tier 支持:small|medium|large
  • 冲突验证disable_ai: true 不能与 model_tiermodel_overrideprovider_override 组合使用(返回 400)

响应

成功响应

状态200 OK 响应头
  • X-Workflow-ID:Temporal 工作流标识符
  • X-Session-ID:会话标识符(如果未提供则自动生成)
响应体
{
  "task_id": "string",
  "status": "string",
  "message": "string (可选)",
  "created_at": "timestamp"
}

响应字段

字段类型描述
task_idstring唯一任务标识符(也是工作流 ID)
statusstring提交状态(例如 STATUS_CODE_OK
messagestring可选的状态消息
created_attimestamp任务创建时间(ISO 8601)

示例

基本任务提交

curl -X POST http://localhost:8080/api/v1/tasks \\
  -H "X-API-Key: sk_test_123456" \\
  -H "Content-Type: application/json" \\
  -d '{
    "query": "法国的首都是什么?"
  }'
响应
{
  "task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "status": "STATUS_CODE_OK",
  "message": "任务已成功提交",
  "created_at": "2025-10-22T10:30:00Z"
}

带会话 ID 的任务(多轮对话)

# 第一轮
curl -X POST http://localhost:8080/api/v1/tasks \\
  -H "X-API-Key: sk_test_123456" \\
  -H "Content-Type: application/json" \\
  -d '{
    "query": "Python 是什么?",
    "session_id": "user-123-chat"
  }'

# 第二轮(引用之前的上下文)
curl -X POST http://localhost:8080/api/v1/tasks \\
  -H "X-API-Key: sk_test_123456" \\
  -H "Content-Type: application/json" \\
  -d '{
    "query": "它的主要优点是什么?",
    "session_id": "user-123-chat"
  }'

带上下文的任务

curl -X POST http://localhost:8080/api/v1/tasks \\
  -H "X-API-Key: sk_test_123456" \\
  -H "Content-Type: application/json" \\
  -d '{
    "query": "总结此用户反馈",
    "context": {
      "user_id": "user_12345",
      "feedback_type": "bug_report",
      "severity": "high",
      "product": "mobile_app",
      "role": "analysis",
      "model_override": "gpt-5"
    }
  }'

强制层级(顶层)

curl -X POST http://localhost:8080/api/v1/tasks \\
  -H "X-API-Key: sk_test_123456" \\
  -H "Content-Type: application/json" \\
  -d '{
    "query": "复杂分析",
    "model_tier": "large"
  }'

仅模板执行

curl -X POST http://localhost:8080/api/v1/tasks \\
  -H "X-API-Key: sk_test_123456" \\
  -H "Content-Type: application/json" \\
  -d '{
    "query": "每周研究简报",
    "context": {"template": "research_summary", "template_version": "1.0.0", "disable_ai": true}
  }'

监督者模式(Supervisor)

curl -X POST http://localhost:8080/api/v1/tasks \\
  -H "X-API-Key: sk_test_123456" \\
  -H "Content-Type: application/json" \\
  -d '{
    "query": "评估系统可靠性",
    "mode": "supervisor"
  }'

带幂等性

# 生成幂等性密钥(使用 UUID)
IDEMPOTENCY_KEY=$(uuidgen)

curl -X POST http://localhost:8080/api/v1/tasks \\
  -H "X-API-Key: sk_test_123456" \\
  -H "Idempotency-Key: $IDEMPOTENCY_KEY" \\
  -H "Content-Type: application/json" \\
  -d '{
    "query": "分析 Q4 销售数据"
  }'

带分布式追踪

curl -X POST http://localhost:8080/api/v1/tasks \\
  -H "X-API-Key: sk_test_123456" \\
  -H "traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01" \\
  -H "Content-Type: application/json" \\
  -d '{
    "query": "研究最新的人工智能趋势"
  }'

错误响应

400 错误的请求

缺少查询
{
  "error": "Query is required"
}
无效的 JSON
{
  "error": "Invalid request body: unexpected EOF"
}

401 未授权

缺少 API 密钥
{
  "error": "Unauthorized"
}
无效的 API 密钥
{
  "error": "Unauthorized"
}

429 请求过多

{
  "error": "Rate limit exceeded"
}
响应头
  • X-RateLimit-Limit: 100
  • X-RateLimit-Remaining: 0
  • X-RateLimit-Reset: 1609459200
  • Retry-After: 60

500 内部服务器错误

{
  "error": "Failed to submit task: database connection failed"
}

代码示例

Python with httpx

import httpx

response = httpx.post(
    "http://localhost:8080/api/v1/tasks",
    headers={
        "X-API-Key": "sk_test_123456",
        "Content-Type": "application/json"
    },
    json={
        "query": "法国的首都是什么?"
    }
)

if response.status_code == 200:
    data = response.json()
    print(f"任务 ID:{data['task_id']}")
    print(f"状态:{data['status']}")
else:
    print(f"错误:{response.status_code}")
    print(response.json())

Python with requests

import requests

response = requests.post(
    "http://localhost:8080/api/v1/tasks",
    headers={
        "X-API-Key": "sk_test_123456"
    },
    json={
        "query": "分析客户情绪",
        "context": {
            "source": "twitter",
            "date_range": "2025-10-01 to 2025-10-22"
        }
    }
)

task = response.json()
print(f"任务已提交:{task['task_id']}")

JavaScript/Node.js

const axios = require('axios');

async function submitTask(query) {
  try {
    const response = await axios.post(
      'http://localhost:8080/api/v1/tasks',
      {
        query: query
      },
      {
        headers: {
          'X-API-Key': 'sk_test_123456',
          'Content-Type': 'application/json'
        }
      }
    );

    console.log('任务 ID:', response.data.task_id);
    console.log('状态:', response.data.status);

    return response.data;
  } catch (error) {
    console.error('错误:', error.response?.data || error.message);
    throw error;
  }
}

submitTask('什么是量子计算?');

cURL with 幂等性

#!/bin/bash

API_KEY="sk_test_123456"
IDEMPOTENCY_KEY=$(uuidgen)

# 提交任务
RESPONSE=$(curl -s -X POST http://localhost:8080/api/v1/tasks \\
  -H "X-API-Key: $API_KEY" \\
  -H "Idempotency-Key: $IDEMPOTENCY_KEY" \\
  -H "Content-Type: application/json" \\
  -d '{
    "query": "分析 Q4 收入趋势"
  }')

echo $RESPONSE | jq

TASK_ID=$(echo $RESPONSE | jq -r '.task_id')
echo "追踪进度:http://localhost:8088/workflows/$TASK_ID"

Go

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "net/http"
)

type TaskRequest struct {
    Query     string                 `json:"query"`
    SessionID string                 `json:"session_id,omitempty"`
    Context   map[string]interface{} `json:"context,omitempty"`
}

type TaskResponse struct {
    TaskID    string `json:"task_id"`
    Status    string `json:"status"`
    Message   string `json:"message,omitempty"`
}

func submitTask(query string) (*TaskResponse, error) {
    req := TaskRequest{
        Query: query,
    }

    body, _ := json.Marshal(req)

    httpReq, _ := http.NewRequest(
        "POST",
        "http://localhost:8080/api/v1/tasks",
        bytes.NewBuffer(body),
    )

    httpReq.Header.Set("X-API-Key", "sk_test_123456")
    httpReq.Header.Set("Content-Type", "application/json")

    client := &http.Client{}
    resp, err := client.Do(httpReq)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    var taskResp TaskResponse
    json.NewDecoder(resp.Body).Decode(&taskResp)

    return &taskResp, nil
}

func main() {
    task, err := submitTask("什么是机器学习?")
    if err != nil {
        fmt.Println("错误:", err)
        return
    }

    fmt.Printf("任务 ID:%s\n", task.TaskID)
    fmt.Printf("状态:%s\n", task.Status)
}

实现详情

工作流创建

提交任务时:
  1. 网关接收请求 → 验证身份验证、速率限制
  2. 生成会话 ID → 如果未提供,自动生成 UUID
  3. 调用 Orchestrator gRPCSubmitTask(metadata, query, context)
  4. Orchestrator 启动 Temporal 工作流 → 持久执行
  5. 返回响应 → 任务 ID、初始状态
  6. 任务异步执行 → 独立于 HTTP 连接

幂等性行为

幂等性密钥用于确保网络重试或重复调用不会生成重复任务。
  1. 首次请求(携带 Idempotency-Key):
    • Shannon 创建任务
    • 将响应缓存到 Redis,TTL 默认为 24 小时
    • 返回任务 ID 和状态
  2. 重复请求(相同 Idempotency-Key,请求体完全一致):
    • Shannon 命中缓存
    • 返回与首次请求相同的任务 ID
    • 响应内容完全一致
  3. 24 小时后
    • 缓存过期
    • 再次提交同一密钥会创建一个全新的任务
缓存详情
  • 存储:Redis
  • TTL:24 小时(86400 秒)
  • 键格式idempotency:<16-char-hash>(对幂等性密钥、用户 ID、请求路径及请求体进行 SHA-256 计算后取前 16 位)
  • 作用域:按用户隔离(哈希包含用户 ID;关闭鉴权时则退化为基于头部、路径和请求体的哈希)
  • 缓存条件:仅缓存 2xx 成功响应;命中缓存时会额外返回 X-Idempotency-Cached: trueX-Idempotency-Key: <your-key> 头部
请求体行为: 若请求体发生变化,生成的哈希也会不同,网关会视为新请求并重新执行;只有头部、用户、路径与请求体完全一致时才会命中缓存。

会话管理

  • 无 session_id:自动生成 UUID、新鲜上下文
  • 带 session_id:从 Redis 加载以前的对话历史
  • 会话持久性:默认 TTL 30 天
  • 多轮对话:具有相同 session_id 的所有任务共享上下文

上下文对象

context 对象存储为元数据并传递给:
  • 代理执行环境
  • 工具调用(可通过 ctx.get("key") 访问)
  • 会话内存(供将来轮次参考)
示例使用场景
  • 用户偏好:{"language": "spanish", "format": "markdown"}
  • 业务上下文:{"company_id": "acme", "department": "sales"}
  • 约束:{"max_length": 500, "tone": "formal"}

最佳实践

1. 始终为关键任务使用幂等性密钥

import uuid

idempotency_key = str(uuid.uuid4())

response = httpx.post(
    "http://localhost:8080/api/v1/tasks",
    headers={
        "X-API-Key": "sk_test_123456",
        "Idempotency-Key": idempotency_key
    },
    json={"query": "处理订单 #12345 的付款"}
)

2. 对对话使用会话

session_id = "user-456-chat"

# 轮次 1
httpx.post(..., json={
    "query": "加载 Q4 销售数据",
    "session_id": session_id
})

# 轮次 2(引用第一轮中的 Q4 数据)
httpx.post(..., json={
    "query": "将其与 Q3 比较",
    "session_id": session_id
})

3. 提供丰富的上下文

httpx.post(..., json={
    "query": "分析此客户反馈",
    "context": {
        "customer_id": "cust_789",
        "subscription_tier": "enterprise",
        "account_age_days": 365,
        "previous_tickets": 3,
        "sentiment_history": ["positive", "neutral", "negative"]
    }
})

4. 妥善处理错误

try:
    response = httpx.post(..., timeout=30.0)
    response.raise_for_status()
    task = response.json()
except httpx.TimeoutException:
    print("请求超时")
except httpx.HTTPStatusError as e:
    if e.response.status_code == 429:
        retry_after = int(e.response.headers.get("Retry-After", 60))
        time.sleep(retry_after)
        # 重试...
    else:
        print(f"错误:{e.response.json()}")

5. 存储任务 ID 以供追踪

response = httpx.post(...)
task_id = response.json()["task_id"]
workflow_id = response.headers["X-Workflow-ID"]

# 保存到数据库
db.tasks.insert({
    "task_id": task_id,
    "workflow_id": workflow_id,
    "user_id": "user_123",
    "query": "...",
    "created_at": datetime.now()
})

# 稍后:检查状态
status = httpx.get(f"http://localhost:8080/api/v1/tasks/{task_id}")

一次调用提交 + 流式传输

需要实时更新? 使用 POST /api/v1/tasks/stream 在一次调用中提交任务并获取流 URL。非常适合需要立即进度更新的前端应用。查看统一提交 + 流式传输以获取示例。

相关端点