端点
Copy
GET http://localhost:8080/api/v1/tasks/{id}
描述
获取指定任务的当前状态、结果和元数据。使用此端点检查任务进度或获取最终结果。身份验证
必需:是 在请求头中包含 API 密钥:Copy
X-API-Key: sk_test_123456
请求
路径参数
| 参数 | 类型 | 必需 | 描述 |
|---|---|---|---|
id | string | 是 | 任务 ID(也用作工作流 ID) |
请求头
| 请求头 | 必需 | 描述 |
|---|---|---|
X-API-Key | 是 | 身份验证密钥 |
响应
成功响应
状态:200 OK
响应头:
X-Workflow-ID:Temporal 工作流标识符(与任务 ID 相同)
Copy
{
"task_id": "string",
"status": "string",
"result": "string",
"response": {},
"error": "string",
"created_at": "timestamp",
"updated_at": "timestamp",
"query": "string",
"session_id": "string",
"mode": "string",
"model_used": "string",
"provider": "string",
"usage": {
"total_tokens": 0,
"input_tokens": 0,
"output_tokens": 0,
"estimated_cost": 0.0
}
}
响应字段
| 字段 | 类型 | 描述 |
|---|---|---|
task_id | string | 唯一任务标识符 |
status | string | 当前任务状态 |
result | string | 原始 LLM 输出(纯文本或 JSON 字符串) |
response | object | 解析后的 JSON(仅当 result 可解析为 JSON 时出现) |
error | string | 错误消息(无错误时为空) |
created_at | timestamp | 任务创建时间 |
updated_at | timestamp | 最后更新时间 |
query | string | 原始任务查询 |
session_id | string | 会话标识符 |
mode | string | 使用的执行模式 |
model_used | string | 使用的主要模型(例如 gpt-5-mini-2025-08-07) |
provider | string | 提供商名称(例如 openai、anthropic) |
usage | object | Token 使用量和成本:{ total_tokens, input_tokens?, output_tokens?, estimated_cost? } |
状态值
TASK_STATUS_UNSPECIFIED- 状态未知TASK_STATUS_QUEUED- 等待执行TASK_STATUS_RUNNING- 正在执行TASK_STATUS_COMPLETED- 成功完成TASK_STATUS_FAILED- 执行失败TASK_STATUS_CANCELLED- 用户已取消
POST /api/v1/tasks/{id}/cancel 可用于取消正在运行的任务。
TASK_STATUS_TIMEOUT- 超出超时限制
执行模式值
EXECUTION_MODE_SIMPLE- 单个 LLM 调用,无工具EXECUTION_MODE_STANDARD- 多步骤执行,包含工具EXECUTION_MODE_COMPLEX- 高级推理模式
示例
检查任务状态
Copy
curl -X GET "http://localhost:8080/api/v1/tasks/task_01HQZX3Y9K8M2P4N5S7T9W2V" \
-H "X-API-Key: sk_test_123456"
Copy
{
"task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"status": "TASK_STATUS_QUEUED",
"response": null,
"error": "",
"created_at": "2025-10-22T10:30:00Z",
"updated_at": "2025-10-22T10:30:00Z",
"query": "法国的首都是什么?",
"session_id": "user-123-session",
"mode": "EXECUTION_MODE_SIMPLE"
}
Copy
{
"task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"status": "TASK_STATUS_RUNNING",
"response": null,
"error": "",
"created_at": "2025-10-22T10:30:00Z",
"updated_at": "2025-10-22T10:30:02Z",
"query": "法国的首都是什么?",
"session_id": "user-123-session",
"mode": "EXECUTION_MODE_SIMPLE"
}
Copy
{
"task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"status": "TASK_STATUS_COMPLETED",
"result": "法国的首都是巴黎。巴黎自 987 年以来一直是首都,位于法国的中北部。",
"error": "",
"created_at": "2025-10-22T10:30:00Z",
"updated_at": "2025-10-22T10:30:05Z",
"query": "法国的首都是什么?",
"session_id": "user-123-session",
"mode": "EXECUTION_MODE_SIMPLE",
"model_used": "gpt-5-mini-2025-08-07",
"provider": "openai",
"usage": {
"total_tokens": 300,
"input_tokens": 200,
"output_tokens": 100,
"estimated_cost": 0.006
}
}
Copy
{
"task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"status": "TASK_STATUS_FAILED",
"response": null,
"error": "LLM 服务不可用:连接超时",
"created_at": "2025-10-22T10:30:00Z",
"updated_at": "2025-10-22T10:30:10Z",
"query": "法国的首都是什么?",
"session_id": "user-123-session",
"mode": "EXECUTION_MODE_SIMPLE"
}
错误响应
401 未授权
Copy
{
"error": "Unauthorized"
}
404 未找到
Copy
{
"error": "Task not found"
}
500 内部服务器错误
Copy
{
"error": "Failed to get task status: database error"
}
代码示例
Python - 简单状态检查
Copy
import httpx
def get_task_status(task_id: str, api_key: str):
"""获取任务状态。"""
response = httpx.get(
f"http://localhost:8080/api/v1/tasks/{task_id}",
headers={"X-API-Key": api_key}
)
if response.status_code == 404:
return None
return response.json()
status = get_task_status("task_abc123", "sk_test_123456")
if status:
print(f"状态:{status['status']}")
if status['status'] == 'TASK_STATUS_COMPLETED':
print(f"结果:{status['result']}")
Python - 轮询直到完成
Copy
import httpx
import time
def wait_for_completion(task_id: str, api_key: str, timeout: int = 300):
"""轮询任务状态直到完成或超时。"""
start_time = time.time()
while True:
response = httpx.get(
f"http://localhost:8080/api/v1/tasks/{task_id}",
headers={"X-API-Key": api_key}
)
status = response.json()
current_status = status["status"]
# 检查终止状态
if current_status == "TASK_STATUS_COMPLETED":
return status.get("result", "")
elif current_status == "TASK_STATUS_FAILED":
raise Exception(f"任务失败:{status['error']}")
elif current_status == "TASK_STATUS_TIMEOUT":
raise Exception("任务超时")
elif current_status == "TASK_STATUS_CANCELLED":
raise Exception("任务已被取消")
# 检查轮询超时
if time.time() - start_time > timeout:
raise TimeoutError(f"轮询超时,已等待 {timeout}s")
# 等待后再轮询
time.sleep(2)
# 用法
try:
result = wait_for_completion("task_abc123", "sk_test_123456")
print("结果:", result)
except Exception as e:
print("错误:", e)
JavaScript/Node.js
Copy
const axios = require('axios');
async function getTaskStatus(taskId) {
try {
const response = await axios.get(
`http://localhost:8080/api/v1/tasks/${taskId}`,
{
headers: {
'X-API-Key': 'sk_test_123456'
}
}
);
const status = response.data;
console.log('状态:', status.status);
if (status.status === 'TASK_STATUS_COMPLETED') {
console.log('结果:', status.response.result);
} else if (status.status === 'TASK_STATUS_FAILED') {
console.error('错误:', status.error);
}
return status;
} catch (error) {
if (error.response?.status === 404) {
console.error('任务未找到');
} else {
console.error('错误:', error.response?.data || error.message);
}
throw error;
}
}
getTaskStatus('task_abc123');
JavaScript - 使用 Async/Await 轮询
Copy
async function waitForCompletion(taskId, timeout = 300000) {
const startTime = Date.now();
const pollInterval = 2000; // 2 秒
while (true) {
const response = await axios.get(
`http://localhost:8080/api/v1/tasks/${taskId}`,
{ headers: { 'X-API-Key': 'sk_test_123456' } }
);
const { status, response: result, error } = response.data;
if (status === 'TASK_STATUS_COMPLETED') {
return result;
} else if (status === 'TASK_STATUS_FAILED') {
throw new Error(`任务失败:${error}`);
} else if (status === 'TASK_STATUS_TIMEOUT') {
throw new Error('任务超时');
}
// 检查超时
if (Date.now() - startTime > timeout) {
throw new Error(`轮询超时,已等待 ${timeout}ms`);
}
// 等待后再轮询
await new Promise(resolve => setTimeout(resolve, pollInterval));
}
}
// 用法
waitForCompletion('task_abc123')
.then(result => console.log('结果:', result))
.catch(error => console.error('错误:', error.message));
Go
Copy
package main
import (
"encoding/json"
"fmt"
"net/http"
"time"
)
type TaskStatusResponse struct {
TaskID string `json:"task_id"`
Status string `json:"status"`
Response map[string]interface{} `json:"response"`
Error string `json:"error"`
Query string `json:"query"`
SessionID string `json:"session_id"`
Mode string `json:"mode"`
}
func getTaskStatus(taskID string) (*TaskStatusResponse, error) {
url := fmt.Sprintf("http://localhost:8080/api/v1/tasks/%s", taskID)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("X-API-Key", "sk_test_123456")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return nil, fmt.Errorf("任务未找到")
}
var status TaskStatusResponse
json.NewDecoder(resp.Body).Decode(&status)
return &status, nil
}
func waitForCompletion(taskID string, timeout time.Duration) (map[string]interface{}, error) {
start := time.Now()
for {
status, err := getTaskStatus(taskID)
if err != nil {
return nil, err
}
switch status.Status {
case "TASK_STATUS_COMPLETED":
return status.Response, nil
case "TASK_STATUS_FAILED":
return nil, fmt.Errorf("任务失败:%s", status.Error)
case "TASK_STATUS_TIMEOUT":
return nil, fmt.Errorf("任务超时")
}
if time.Since(start) > timeout {
return nil, fmt.Errorf("轮询超时")
}
time.Sleep(2 * time.Second)
}
}
func main() {
result, err := waitForCompletion("task_abc123", 5*time.Minute)
if err != nil {
fmt.Println("错误:", err)
return
}
fmt.Println("结果:", result)
}
Bash - 监控任务进度
Copy
#!/bin/bash
API_KEY="sk_test_123456"
TASK_ID="$1"
if [ -z "$TASK_ID" ]; then
echo "用法:$0 <task_id>"
exit 1
fi
echo "监控任务:$TASK_ID"
echo ""
while true; do
RESPONSE=$(curl -s "http://localhost:8080/api/v1/tasks/$TASK_ID" \
-H "X-API-Key: $API_KEY")
STATUS=$(echo $RESPONSE | jq -r '.status')
echo "[$(date +%T)] 状态:$STATUS"
case $STATUS in
"TASK_STATUS_COMPLETED")
echo ""
echo "✓ 任务完成!"
echo ""
echo $RESPONSE | jq -r '.response.result'
exit 0
;;
"TASK_STATUS_FAILED")
echo ""
echo "✗ 任务失败!"
ERROR=$(echo $RESPONSE | jq -r '.error')
echo "错误:$ERROR"
exit 1
;;
"TASK_STATUS_TIMEOUT")
echo ""
echo "✗ 任务超时!"
exit 1
;;
"TASK_STATUS_CANCELLED")
echo ""
echo "✗ 任务已取消!"
exit 1
;;
esac
sleep 2
done
用例
1. 提交并等待模式
Copy
import httpx
import time
def submit_and_wait(query: str, api_key: str):
"""提交任务并等待结果。"""
# 提交
submit_response = httpx.post(
"http://localhost:8080/api/v1/tasks",
headers={"X-API-Key": api_key},
json={"query": query}
)
task_id = submit_response.json()["task_id"]
print(f"任务已提交:{task_id}")
# 等待
while True:
status_response = httpx.get(
f"http://localhost:8080/api/v1/tasks/{task_id}",
headers={"X-API-Key": api_key}
)
status = status_response.json()
if status["status"] == "TASK_STATUS_COMPLETED":
return status["response"]["result"]
elif status["status"] == "TASK_STATUS_FAILED":
raise Exception(status["error"])
time.sleep(2)
result = submit_and_wait("Python 是什么?", "sk_test_123456")
print(result)
2. 仪表板状态部件
Copy
def get_task_summary(task_id: str, api_key: str):
"""获取用于仪表板的任务摘要。"""
response = httpx.get(
f"http://localhost:8080/api/v1/tasks/{task_id}",
headers={"X-API-Key": api_key}
)
status = response.json()
return {
"id": task_id,
"query": status["query"][:50] + "...",
"status": status["status"].replace("TASK_STATUS_", ""),
"mode": status["mode"].replace("EXECUTION_MODE_", ""),
"created": status["created_at"]
}
# 在 UI 中显示
summary = get_task_summary("task_abc123", "sk_test_123456")
print(f"{summary['status']}:{summary['query']}")
3. 批量状态检查
Copy
def check_multiple_tasks(task_ids: list, api_key: str):
"""检查多个任务的状态。"""
results = {}
for task_id in task_ids:
try:
response = httpx.get(
f"http://localhost:8080/api/v1/tasks/{task_id}",
headers={"X-API-Key": api_key},
timeout=5.0
)
results[task_id] = response.json()["status"]
except Exception as e:
results[task_id] = f"错误:{e}"
return results
# 检查 5 个任务
task_ids = ["task_1", "task_2", "task_3", "task_4", "task_5"]
statuses = check_multiple_tasks(task_ids, "sk_test_123456")
for task_id, status in statuses.items():
print(f"{task_id}:{status}")
最佳实践
1. 使用流式传输而不是轮询
对于长时间运行的任务,使用 SSE 流式传输而不是轮询:Copy
# ❌ 不好 - 每 2 秒轮询一次
while True:
status = httpx.get(f".../{task_id}")
if status["status"] == "COMPLETED":
break
time.sleep(2)
# ✅ 好 - 使用流式传输
for event in client.stream(task_id):
print(event.type, event.message)
if event.type == "TASK_COMPLETED":
break
2. 处理所有状态
Copy
status = get_task_status(task_id, api_key)
match status["status"]:
case "TASK_STATUS_QUEUED":
print("任务已排队...")
case "TASK_STATUS_RUNNING":
print("任务正在运行...")
case "TASK_STATUS_COMPLETED":
result = status["response"]["result"]
print(f"结果:{result}")
case "TASK_STATUS_FAILED":
print(f"失败:{status['error']}")
case "TASK_STATUS_TIMEOUT":
print("任务超时")
case "TASK_STATUS_CANCELLED":
print("任务已被取消")
3. 实施指数退避
Copy
import time
def poll_with_backoff(task_id, api_key, max_wait=60):
"""使用指数退避轮询。"""
wait_time = 1
while True:
status = get_task_status(task_id, api_key)
if status["status"] in ["TASK_STATUS_COMPLETED", "TASK_STATUS_FAILED"]:
return status
time.sleep(wait_time)
wait_time = min(wait_time * 2, max_wait) # 上限为 60 秒
4. 缓存状态响应
Copy
from functools import lru_cache
import time
@lru_cache(maxsize=1000)
def get_cached_status(task_id: str, api_key: str, timestamp: int):
"""缓存状态 5 秒。"""
return get_task_status(task_id, api_key)
# 用法
current_time = int(time.time() / 5) # 5 秒单位
status = get_cached_status("task_abc123", "sk_test_123456", current_time)
5. 提取元数据
Copy
def extract_task_info(task_id: str, api_key: str):
"""提取有用的元数据。"""
status = get_task_status(task_id, api_key)
return {
"task_id": status["task_id"],
"query": status["query"],
"status": status["status"],
"mode": status["mode"],
"session_id": status["session_id"],
"has_result": status["response"] is not None,
"has_error": bool(status["error"]),
"workflow_url": f"http://localhost:8088/workflows/{task_id}"
}
相关端点
注意
勿在生产环境轮询:对于长时间运行的任务,使用流式端点而不是轮询状态。轮询会增加不必要的负载并增加延迟。
会话追踪:
session_id 字段允许跟踪任务属于哪个会话,这对多轮对话和成本归属很有用。