跳转到主要内容

端点

GET http://localhost:8080/api/v1/tasks

描述

获取任务的分页列表,支持按状态和会话筛选。

身份验证

必需:是 在请求头中包含 API 密钥:
X-API-Key: sk_test_123456

请求

请求头

请求头必需描述
X-API-Key身份验证密钥

查询参数

参数类型必需描述默认值
limitinteger返回的任务数量(1-100)20
offsetinteger跳过的任务数量0
session_idstring按会话 ID 筛选-
statusstring按任务状态筛选-

状态筛选值

  • QUEUED - 等待执行的任务
  • RUNNING - 正在执行的任务
  • COMPLETED - 成功完成的任务
  • FAILED - 失败的任务
  • CANCELLEDCANCELED - 已取消的任务
  • TIMEOUT - 超时的任务

响应

成功响应

状态200 OK 响应体
{
  "tasks": [
    {
      "task_id": "string",
      "query": "string",
      "status": "string",
      "mode": "string",
      "created_at": "timestamp",
      "completed_at": "timestamp",
      "total_token_usage": {
        "total_tokens": 0,
        "cost_usd": 0.0,
        "prompt_tokens": 0,
        "completion_tokens": 0
      }
    }
  ],
  "total_count": 0
}

响应字段

字段类型描述
tasksarray任务摘要数组
total_countinteger匹配筛选条件的任务总数

任务摘要字段

字段类型描述
task_idstring唯一任务标识符
querystring原始任务查询
statusstring当前任务状态
modestring执行模式(EXECUTION_MODE_SIMPLEEXECUTION_MODE_STANDARDEXECUTION_MODE_COMPLEX
created_attimestamp任务创建时间(ISO 8601)
completed_attimestamp任务完成时间(如果未完成则为 null)
total_token_usageobject令牌使用量和成本指标

令牌使用字段

字段类型描述
total_tokensinteger使用的总令牌数
cost_usdfloat以美元计的总成本
prompt_tokensinteger提示中的令牌数
completion_tokensinteger完成中的令牌数

示例

列出所有任务

curl -X GET "http://localhost:8080/api/v1/tasks" \
  -H "X-API-Key: sk_test_123456"
响应
{
  "tasks": [
    {
      "task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
      "query": "法国的首都是什么?",
      "status": "TASK_STATUS_COMPLETED",
      "mode": "EXECUTION_MODE_SIMPLE",
      "created_at": "2025-10-22T10:30:00Z",
      "completed_at": "2025-10-22T10:30:05Z",
      "total_token_usage": {
        "total_tokens": 150,
        "cost_usd": 0.0023,
        "prompt_tokens": 45,
        "completion_tokens": 105
      }
    },
    {
      "task_id": "task_01HQZX4Y9K8M2P4N5S7T9W2W",
      "query": "分析 Q4 销售趋势",
      "status": "TASK_STATUS_RUNNING",
      "mode": "EXECUTION_MODE_STANDARD",
      "created_at": "2025-10-22T10:31:00Z",
      "completed_at": null,
      "total_token_usage": null
    }
  ],
  "total_count": 2
}

按状态筛选

# 仅获取运行中的任务
curl -X GET "http://localhost:8080/api/v1/tasks?status=RUNNING" \
  -H "X-API-Key: sk_test_123456"
# 获取已完成的任务
curl -X GET "http://localhost:8080/api/v1/tasks?status=COMPLETED" \
  -H "X-API-Key: sk_test_123456"
# 获取失败的任务
curl -X GET "http://localhost:8080/api/v1/tasks?status=FAILED" \
  -H "X-API-Key: sk_test_123456"

按会话筛选

curl -X GET "http://localhost:8080/api/v1/tasks?session_id=user-123-chat" \
  -H "X-API-Key: sk_test_123456"
响应
{
  "tasks": [
    {
      "task_id": "task_abc123",
      "query": "Python 是什么?",
      "status": "TASK_STATUS_COMPLETED",
      "created_at": "2025-10-22T10:00:00Z",
      "completed_at": "2025-10-22T10:00:03Z"
    },
    {
      "task_id": "task_def456",
      "query": "它的主要优点是什么?",
      "status": "TASK_STATUS_COMPLETED",
      "created_at": "2025-10-22T10:01:00Z",
      "completed_at": "2025-10-22T10:01:04Z"
    }
  ],
  "total_count": 2
}

分页

# 第一页(20 个任务)
curl -X GET "http://localhost:8080/api/v1/tasks?limit=20&offset=0" \
  -H "X-API-Key: sk_test_123456"

# 第二页
curl -X GET "http://localhost:8080/api/v1/tasks?limit=20&offset=20" \
  -H "X-API-Key: sk_test_123456"

# 第三页
curl -X GET "http://localhost:8080/api/v1/tasks?limit=20&offset=40" \
  -H "X-API-Key: sk_test_123456"

组合筛选

# 获取会话中已完成的任务,分页显示
curl -X GET "http://localhost:8080/api/v1/tasks?session_id=user-123&status=COMPLETED&limit=10&offset=0" \
  -H "X-API-Key: sk_test_123456"

错误响应

401 未授权

{
  "error": "Unauthorized"
}

429 请求过多

{
  "error": "Rate limit exceeded"
}

500 内部服务器错误

{
  "error": "Failed to list tasks: database error"
}

代码示例

Python with httpx

import httpx

response = httpx.get(
    "http://localhost:8080/api/v1/tasks",
    headers={"X-API-Key": "sk_test_123456"},
    params={
        "status": "COMPLETED",
        "limit": 50,
        "offset": 0
    }
)

data = response.json()
print(f"总计:{data['total_count']} 个任务")

for task in data["tasks"]:
    print(f"- {task['task_id']}{task['status']}")
    if task['total_token_usage']:
        print(f"  成本:${task['total_token_usage']['cost_usd']:.4f}")

Python - 列出所有任务(分页)

import httpx

def list_all_tasks(api_key: str, status: str = None):
    """列出所有任务,带分页。"""
    all_tasks = []
    offset = 0
    limit = 100

    while True:
        response = httpx.get(
            "http://localhost:8080/api/v1/tasks",
            headers={"X-API-Key": api_key},
            params={
                "status": status,
                "limit": limit,
                "offset": offset
            }
        )

        data = response.json()
        tasks = data["tasks"]
        all_tasks.extend(tasks)

        # 检查是否已获取所有任务
        if len(tasks) < limit:
            break

        offset += limit

    return all_tasks

# 用法
completed_tasks = list_all_tasks("sk_test_123456", status="COMPLETED")
print(f"找到 {len(completed_tasks)} 个已完成的任务")

JavaScript/Node.js

const axios = require('axios');

async function listTasks(filters = {}) {
  try {
    const response = await axios.get(
      'http://localhost:8080/api/v1/tasks',
      {
        headers: {
          'X-API-Key': 'sk_test_123456'
        },
        params: {
          status: filters.status,
          session_id: filters.sessionId,
          limit: filters.limit || 20,
          offset: filters.offset || 0
        }
      }
    );

    console.log(`总任务数:${response.data.total_count}`);

    response.data.tasks.forEach(task => {
      console.log(`${task.task_id}${task.status}`);
    });

    return response.data;
  } catch (error) {
    console.error('错误:', error.response?.data || error.message);
    throw error;
  }
}

// 列出运行中的任务
listTasks({ status: 'RUNNING' });

// 列出会话中的任务
listTasks({ sessionId: 'user-123-chat', limit: 50 });

Bash 脚本 - 监控运行中的任务

#!/bin/bash

API_KEY="sk_test_123456"
BASE_URL="http://localhost:8080"

echo "监控运行中的任务..."

while true; do
  clear
  echo "=== 运行中的任务 ==="
  echo ""

  RESPONSE=$(curl -s "$BASE_URL/api/v1/tasks?status=RUNNING" \
    -H "X-API-Key: $API_KEY")

  TOTAL=$(echo $RESPONSE | jq -r '.total_count')
  echo "运行中:$TOTAL"
  echo ""

  echo $RESPONSE | jq -r '.tasks[] | "\(.task_id):\(.query)"'

  sleep 5
done

Go

package main

import (
    "encoding/json"
    "fmt"
    "net/http"
    "net/url"
)

type ListTasksResponse struct {
    Tasks      []TaskSummary `json:"tasks"`
    TotalCount int32         `json:"total_count"`
}

type TaskSummary struct {
    TaskID      string  `json:"task_id"`
    Query       string  `json:"query"`
    Status      string  `json:"status"`
    Mode        string  `json:"mode"`
    CreatedAt   string  `json:"created_at"`
    CompletedAt *string `json:"completed_at"`
}

func listTasks(status string, limit, offset int) (*ListTasksResponse, error) {
    baseURL := "http://localhost:8080/api/v1/tasks"

    // 构建查询参数
    params := url.Values{}
    if status != "" {
        params.Add("status", status)
    }
    params.Add("limit", fmt.Sprintf("%d", limit))
    params.Add("offset", fmt.Sprintf("%d", offset))

    reqURL := baseURL + "?" + params.Encode()

    req, _ := http.NewRequest("GET", reqURL, nil)
    req.Header.Set("X-API-Key", "sk_test_123456")

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    var result ListTasksResponse
    json.NewDecoder(resp.Body).Decode(&result)

    return &result, nil
}

func main() {
    tasks, err := listTasks("COMPLETED", 10, 0)
    if err != nil {
        fmt.Println("错误:", err)
        return
    }

    fmt.Printf("总任务数:%d\n", tasks.TotalCount)
    for _, task := range tasks.Tasks {
        fmt.Printf("- %s%s\n", task.TaskID, task.Status)
    }
}

用例

1. 仪表板 - 显示最近的任务

def get_recent_tasks(api_key: str, limit: int = 10):
    """获取用于仪表板的最近任务。"""
    response = httpx.get(
        "http://localhost:8080/api/v1/tasks",
        headers={"X-API-Key": api_key},
        params={"limit": limit, "offset": 0}
    )
    return response.json()["tasks"]

recent = get_recent_tasks("sk_test_123456", limit=5)
for task in recent:
    print(f"{task['created_at']}{task['query'][:50]}...")

2. 成本追踪

def calculate_session_cost(api_key: str, session_id: str):
    """计算会话的总成本。"""
    response = httpx.get(
        "http://localhost:8080/api/v1/tasks",
        headers={"X-API-Key": api_key},
        params={"session_id": session_id, "limit": 100}
    )

    total_cost = 0
    for task in response.json()["tasks"]:
        if task["total_token_usage"]:
            total_cost += task["total_token_usage"]["cost_usd"]

    return total_cost

cost = calculate_session_cost("sk_test_123456", "user-123-chat")
print(f"会话成本:${cost:.4f}")

3. 失败监控

import time

def monitor_failures(api_key: str, interval: int = 60):
    """监控失败的任务。"""
    while True:
        response = httpx.get(
            "http://localhost:8080/api/v1/tasks",
            headers={"X-API-Key": api_key},
            params={"status": "FAILED", "limit": 10}
        )

        failed_tasks = response.json()["tasks"]

        if failed_tasks:
            print(f"⚠️  {len(failed_tasks)} 个失败任务:")
            for task in failed_tasks:
                print(f"  - {task['task_id']}{task['query']}")

                # 获取详情
                details = httpx.get(
                    f"http://localhost:8080/api/v1/tasks/{task['task_id']}",
                    headers={"X-API-Key": api_key}
                )
                error = details.json().get("error", "未知错误")
                print(f"    错误:{error}")

        time.sleep(interval)

monitor_failures("sk_test_123456")

实现注意事项

速率限制

此端点受速率限制。默认限制:
  • 每分钟 100 个请求 每个 API 密钥
  • 每秒 20 个请求 突发

性能

  • 索引查询:按 statussession_id 筛选很快
  • 分页:使用 limitoffset 避免大型响应
  • 总计数:包括所有匹配的任务,不仅是返回的页面

排序

任务按反向时间顺序返回(最新优先)。

最佳实践

1. 使用分页

总是对大型结果集进行分页:
def fetch_page(offset, limit=20):
    return httpx.get(
        "http://localhost:8080/api/v1/tasks",
        headers={"X-API-Key": "sk_test_123456"},
        params={"limit": limit, "offset": offset}
    ).json()

# 获取前 3 页
for page in range(3):
    tasks = fetch_page(offset=page * 20, limit=20)
    process_tasks(tasks)

2. 按状态筛选

如果只需要特定状态,不要获取所有任务:
# ✅ 好 - 服务器端筛选
running = httpx.get(...)

# ❌ 不好 - 客户端筛选
all_tasks = httpx.get(...)
running = [t for t in all_tasks if t["status"] == "RUNNING"]

3. 缓存结果

缓存任务列表用于仪表板:
import time

cache = {"data": None, "timestamp": 0}

def get_cached_tasks():
    now = time.time()
    if cache["data"] is None or now - cache["timestamp"] > 60:
        # 每 60 秒刷新一次
        response = httpx.get(...)
        cache["data"] = response.json()
        cache["timestamp"] = now

    return cache["data"]

相关端点

注意

SDK 替代方案:Python SDK 不公开 list_tasks() 方法。此端点仅限 REST。对于 SDK 使用,存储任务 ID 并使用 client.get_status(task_id) 单独查询状态。