端点
Copy
GET http://localhost:8080/api/v1/tasks
描述
获取任务的分页列表,支持按状态和会话筛选。身份验证
必需:是 在请求头中包含 API 密钥:Copy
X-API-Key: sk_test_123456
请求
请求头
| 请求头 | 必需 | 描述 |
|---|---|---|
X-API-Key | 是 | 身份验证密钥 |
查询参数
| 参数 | 类型 | 必需 | 描述 | 默认值 |
|---|---|---|---|---|
limit | integer | 否 | 返回的任务数量(1-100) | 20 |
offset | integer | 否 | 跳过的任务数量 | 0 |
session_id | string | 否 | 按会话 ID 筛选 | - |
status | string | 否 | 按任务状态筛选 | - |
状态筛选值
QUEUED- 等待执行的任务RUNNING- 正在执行的任务COMPLETED- 成功完成的任务FAILED- 失败的任务CANCELLED或CANCELED- 已取消的任务TIMEOUT- 超时的任务
响应
成功响应
状态:200 OK
响应体:
Copy
{
"tasks": [
{
"task_id": "string",
"query": "string",
"status": "string",
"mode": "string",
"created_at": "timestamp",
"completed_at": "timestamp",
"total_token_usage": {
"total_tokens": 0,
"cost_usd": 0.0,
"prompt_tokens": 0,
"completion_tokens": 0
}
}
],
"total_count": 0
}
响应字段
| 字段 | 类型 | 描述 |
|---|---|---|
tasks | array | 任务摘要数组 |
total_count | integer | 匹配筛选条件的任务总数 |
任务摘要字段
| 字段 | 类型 | 描述 |
|---|---|---|
task_id | string | 唯一任务标识符 |
query | string | 原始任务查询 |
status | string | 当前任务状态 |
mode | string | 执行模式(EXECUTION_MODE_SIMPLE、EXECUTION_MODE_STANDARD、EXECUTION_MODE_COMPLEX) |
created_at | timestamp | 任务创建时间(ISO 8601) |
completed_at | timestamp | 任务完成时间(如果未完成则为 null) |
total_token_usage | object | 令牌使用量和成本指标 |
令牌使用字段
| 字段 | 类型 | 描述 |
|---|---|---|
total_tokens | integer | 使用的总令牌数 |
cost_usd | float | 以美元计的总成本 |
prompt_tokens | integer | 提示中的令牌数 |
completion_tokens | integer | 完成中的令牌数 |
示例
列出所有任务
Copy
curl -X GET "http://localhost:8080/api/v1/tasks" \
-H "X-API-Key: sk_test_123456"
Copy
{
"tasks": [
{
"task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"query": "法国的首都是什么?",
"status": "TASK_STATUS_COMPLETED",
"mode": "EXECUTION_MODE_SIMPLE",
"created_at": "2025-10-22T10:30:00Z",
"completed_at": "2025-10-22T10:30:05Z",
"total_token_usage": {
"total_tokens": 150,
"cost_usd": 0.0023,
"prompt_tokens": 45,
"completion_tokens": 105
}
},
{
"task_id": "task_01HQZX4Y9K8M2P4N5S7T9W2W",
"query": "分析 Q4 销售趋势",
"status": "TASK_STATUS_RUNNING",
"mode": "EXECUTION_MODE_STANDARD",
"created_at": "2025-10-22T10:31:00Z",
"completed_at": null,
"total_token_usage": null
}
],
"total_count": 2
}
按状态筛选
Copy
# 仅获取运行中的任务
curl -X GET "http://localhost:8080/api/v1/tasks?status=RUNNING" \
-H "X-API-Key: sk_test_123456"
Copy
# 获取已完成的任务
curl -X GET "http://localhost:8080/api/v1/tasks?status=COMPLETED" \
-H "X-API-Key: sk_test_123456"
Copy
# 获取失败的任务
curl -X GET "http://localhost:8080/api/v1/tasks?status=FAILED" \
-H "X-API-Key: sk_test_123456"
按会话筛选
Copy
curl -X GET "http://localhost:8080/api/v1/tasks?session_id=user-123-chat" \
-H "X-API-Key: sk_test_123456"
Copy
{
"tasks": [
{
"task_id": "task_abc123",
"query": "Python 是什么?",
"status": "TASK_STATUS_COMPLETED",
"created_at": "2025-10-22T10:00:00Z",
"completed_at": "2025-10-22T10:00:03Z"
},
{
"task_id": "task_def456",
"query": "它的主要优点是什么?",
"status": "TASK_STATUS_COMPLETED",
"created_at": "2025-10-22T10:01:00Z",
"completed_at": "2025-10-22T10:01:04Z"
}
],
"total_count": 2
}
分页
Copy
# 第一页(20 个任务)
curl -X GET "http://localhost:8080/api/v1/tasks?limit=20&offset=0" \
-H "X-API-Key: sk_test_123456"
# 第二页
curl -X GET "http://localhost:8080/api/v1/tasks?limit=20&offset=20" \
-H "X-API-Key: sk_test_123456"
# 第三页
curl -X GET "http://localhost:8080/api/v1/tasks?limit=20&offset=40" \
-H "X-API-Key: sk_test_123456"
组合筛选
Copy
# 获取会话中已完成的任务,分页显示
curl -X GET "http://localhost:8080/api/v1/tasks?session_id=user-123&status=COMPLETED&limit=10&offset=0" \
-H "X-API-Key: sk_test_123456"
错误响应
401 未授权
Copy
{
"error": "Unauthorized"
}
429 请求过多
Copy
{
"error": "Rate limit exceeded"
}
500 内部服务器错误
Copy
{
"error": "Failed to list tasks: database error"
}
代码示例
Python with httpx
Copy
import httpx
response = httpx.get(
"http://localhost:8080/api/v1/tasks",
headers={"X-API-Key": "sk_test_123456"},
params={
"status": "COMPLETED",
"limit": 50,
"offset": 0
}
)
data = response.json()
print(f"总计:{data['total_count']} 个任务")
for task in data["tasks"]:
print(f"- {task['task_id']}:{task['status']}")
if task['total_token_usage']:
print(f" 成本:${task['total_token_usage']['cost_usd']:.4f}")
Python - 列出所有任务(分页)
Copy
import httpx
def list_all_tasks(api_key: str, status: str = None):
"""列出所有任务,带分页。"""
all_tasks = []
offset = 0
limit = 100
while True:
response = httpx.get(
"http://localhost:8080/api/v1/tasks",
headers={"X-API-Key": api_key},
params={
"status": status,
"limit": limit,
"offset": offset
}
)
data = response.json()
tasks = data["tasks"]
all_tasks.extend(tasks)
# 检查是否已获取所有任务
if len(tasks) < limit:
break
offset += limit
return all_tasks
# 用法
completed_tasks = list_all_tasks("sk_test_123456", status="COMPLETED")
print(f"找到 {len(completed_tasks)} 个已完成的任务")
JavaScript/Node.js
Copy
const axios = require('axios');
async function listTasks(filters = {}) {
try {
const response = await axios.get(
'http://localhost:8080/api/v1/tasks',
{
headers: {
'X-API-Key': 'sk_test_123456'
},
params: {
status: filters.status,
session_id: filters.sessionId,
limit: filters.limit || 20,
offset: filters.offset || 0
}
}
);
console.log(`总任务数:${response.data.total_count}`);
response.data.tasks.forEach(task => {
console.log(`${task.task_id}:${task.status}`);
});
return response.data;
} catch (error) {
console.error('错误:', error.response?.data || error.message);
throw error;
}
}
// 列出运行中的任务
listTasks({ status: 'RUNNING' });
// 列出会话中的任务
listTasks({ sessionId: 'user-123-chat', limit: 50 });
Bash 脚本 - 监控运行中的任务
Copy
#!/bin/bash
API_KEY="sk_test_123456"
BASE_URL="http://localhost:8080"
echo "监控运行中的任务..."
while true; do
clear
echo "=== 运行中的任务 ==="
echo ""
RESPONSE=$(curl -s "$BASE_URL/api/v1/tasks?status=RUNNING" \
-H "X-API-Key: $API_KEY")
TOTAL=$(echo $RESPONSE | jq -r '.total_count')
echo "运行中:$TOTAL"
echo ""
echo $RESPONSE | jq -r '.tasks[] | "\(.task_id):\(.query)"'
sleep 5
done
Go
Copy
package main
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
)
type ListTasksResponse struct {
Tasks []TaskSummary `json:"tasks"`
TotalCount int32 `json:"total_count"`
}
type TaskSummary struct {
TaskID string `json:"task_id"`
Query string `json:"query"`
Status string `json:"status"`
Mode string `json:"mode"`
CreatedAt string `json:"created_at"`
CompletedAt *string `json:"completed_at"`
}
func listTasks(status string, limit, offset int) (*ListTasksResponse, error) {
baseURL := "http://localhost:8080/api/v1/tasks"
// 构建查询参数
params := url.Values{}
if status != "" {
params.Add("status", status)
}
params.Add("limit", fmt.Sprintf("%d", limit))
params.Add("offset", fmt.Sprintf("%d", offset))
reqURL := baseURL + "?" + params.Encode()
req, _ := http.NewRequest("GET", reqURL, nil)
req.Header.Set("X-API-Key", "sk_test_123456")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result ListTasksResponse
json.NewDecoder(resp.Body).Decode(&result)
return &result, nil
}
func main() {
tasks, err := listTasks("COMPLETED", 10, 0)
if err != nil {
fmt.Println("错误:", err)
return
}
fmt.Printf("总任务数:%d\n", tasks.TotalCount)
for _, task := range tasks.Tasks {
fmt.Printf("- %s:%s\n", task.TaskID, task.Status)
}
}
用例
1. 仪表板 - 显示最近的任务
Copy
def get_recent_tasks(api_key: str, limit: int = 10):
"""获取用于仪表板的最近任务。"""
response = httpx.get(
"http://localhost:8080/api/v1/tasks",
headers={"X-API-Key": api_key},
params={"limit": limit, "offset": 0}
)
return response.json()["tasks"]
recent = get_recent_tasks("sk_test_123456", limit=5)
for task in recent:
print(f"{task['created_at']}:{task['query'][:50]}...")
2. 成本追踪
Copy
def calculate_session_cost(api_key: str, session_id: str):
"""计算会话的总成本。"""
response = httpx.get(
"http://localhost:8080/api/v1/tasks",
headers={"X-API-Key": api_key},
params={"session_id": session_id, "limit": 100}
)
total_cost = 0
for task in response.json()["tasks"]:
if task["total_token_usage"]:
total_cost += task["total_token_usage"]["cost_usd"]
return total_cost
cost = calculate_session_cost("sk_test_123456", "user-123-chat")
print(f"会话成本:${cost:.4f}")
3. 失败监控
Copy
import time
def monitor_failures(api_key: str, interval: int = 60):
"""监控失败的任务。"""
while True:
response = httpx.get(
"http://localhost:8080/api/v1/tasks",
headers={"X-API-Key": api_key},
params={"status": "FAILED", "limit": 10}
)
failed_tasks = response.json()["tasks"]
if failed_tasks:
print(f"⚠️ {len(failed_tasks)} 个失败任务:")
for task in failed_tasks:
print(f" - {task['task_id']}:{task['query']}")
# 获取详情
details = httpx.get(
f"http://localhost:8080/api/v1/tasks/{task['task_id']}",
headers={"X-API-Key": api_key}
)
error = details.json().get("error", "未知错误")
print(f" 错误:{error}")
time.sleep(interval)
monitor_failures("sk_test_123456")
实现注意事项
速率限制
此端点受速率限制。默认限制:- 每分钟 100 个请求 每个 API 密钥
- 每秒 20 个请求 突发
性能
- 索引查询:按
status和session_id筛选很快 - 分页:使用
limit和offset避免大型响应 - 总计数:包括所有匹配的任务,不仅是返回的页面
排序
任务按反向时间顺序返回(最新优先)。最佳实践
1. 使用分页
总是对大型结果集进行分页:Copy
def fetch_page(offset, limit=20):
return httpx.get(
"http://localhost:8080/api/v1/tasks",
headers={"X-API-Key": "sk_test_123456"},
params={"limit": limit, "offset": offset}
).json()
# 获取前 3 页
for page in range(3):
tasks = fetch_page(offset=page * 20, limit=20)
process_tasks(tasks)
2. 按状态筛选
如果只需要特定状态,不要获取所有任务:Copy
# ✅ 好 - 服务器端筛选
running = httpx.get(...)
# ❌ 不好 - 客户端筛选
all_tasks = httpx.get(...)
running = [t for t in all_tasks if t["status"] == "RUNNING"]
3. 缓存结果
缓存任务列表用于仪表板:Copy
import time
cache = {"data": None, "timestamp": 0}
def get_cached_tasks():
now = time.time()
if cache["data"] is None or now - cache["timestamp"] > 60:
# 每 60 秒刷新一次
response = httpx.get(...)
cache["data"] = response.json()
cache["timestamp"] = now
return cache["data"]
相关端点
注意
SDK 替代方案:Python SDK 不公开
list_tasks() 方法。此端点仅限 REST。对于 SDK 使用,存储任务 ID 并使用 client.get_status(task_id) 单独查询状态。