跳转到主要内容

端点

GET http://localhost:8080/api/v1/tasks/{id}

描述

获取指定任务的当前状态、结果和元数据。使用此端点检查任务进度或获取最终结果。

身份验证

必需:是 在请求头中包含 API 密钥:
X-API-Key: sk_test_123456

请求

路径参数

参数类型必需描述
idstring任务 ID(也用作工作流 ID)

请求头

请求头必需描述
X-API-Key身份验证密钥

响应

成功响应

状态200 OK 响应头
  • X-Workflow-ID:Temporal 工作流标识符(与任务 ID 相同)
响应体
{
  "task_id": "string",
  "status": "string",
  "result": "string",
  "response": {},
  "error": "string",
  "created_at": "timestamp",
  "updated_at": "timestamp",
  "query": "string",
  "session_id": "string",
  "mode": "string",
  "model_used": "string",
  "provider": "string",
  "usage": {
    "total_tokens": 0,
    "input_tokens": 0,
    "output_tokens": 0,
    "estimated_cost": 0.0
  }
}

响应字段

字段类型描述
task_idstring唯一任务标识符
statusstring当前任务状态
resultstring原始 LLM 输出(纯文本或 JSON 字符串)
responseobject解析后的 JSON(仅当 result 可解析为 JSON 时出现)
errorstring错误消息(无错误时为空)
created_attimestamp任务创建时间
updated_attimestamp最后更新时间
querystring原始任务查询
session_idstring会话标识符
modestring使用的执行模式
model_usedstring使用的主要模型(例如 gpt-5-mini-2025-08-07
providerstring提供商名称(例如 openaianthropic
usageobjectToken 使用量和成本:{ total_tokens, input_tokens?, output_tokens?, estimated_cost? }

状态值

  • TASK_STATUS_UNSPECIFIED - 状态未知
  • TASK_STATUS_QUEUED - 等待执行
  • TASK_STATUS_RUNNING - 正在执行
  • TASK_STATUS_COMPLETED - 成功完成
  • TASK_STATUS_FAILED - 执行失败
  • TASK_STATUS_CANCELLED - 用户已取消
另请参阅:POST /api/v1/tasks/{id}/cancel 可用于取消正在运行的任务。
  • TASK_STATUS_TIMEOUT - 超出超时限制

执行模式值

  • EXECUTION_MODE_SIMPLE - 单个 LLM 调用,无工具
  • EXECUTION_MODE_STANDARD - 多步骤执行,包含工具
  • EXECUTION_MODE_COMPLEX - 高级推理模式

示例

检查任务状态

curl -X GET "http://localhost:8080/api/v1/tasks/task_01HQZX3Y9K8M2P4N5S7T9W2V" \
  -H "X-API-Key: sk_test_123456"
响应(已排队)
{
  "task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "status": "TASK_STATUS_QUEUED",
  "response": null,
  "error": "",
  "created_at": "2025-10-22T10:30:00Z",
  "updated_at": "2025-10-22T10:30:00Z",
  "query": "法国的首都是什么?",
  "session_id": "user-123-session",
  "mode": "EXECUTION_MODE_SIMPLE"
}
响应(运行中)
{
  "task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "status": "TASK_STATUS_RUNNING",
  "response": null,
  "error": "",
  "created_at": "2025-10-22T10:30:00Z",
  "updated_at": "2025-10-22T10:30:02Z",
  "query": "法国的首都是什么?",
  "session_id": "user-123-session",
  "mode": "EXECUTION_MODE_SIMPLE"
}
响应(已完成)
{
  "task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "status": "TASK_STATUS_COMPLETED",
  "result": "法国的首都是巴黎。巴黎自 987 年以来一直是首都,位于法国的中北部。",
  "error": "",
  "created_at": "2025-10-22T10:30:00Z",
  "updated_at": "2025-10-22T10:30:05Z",
  "query": "法国的首都是什么?",
  "session_id": "user-123-session",
  "mode": "EXECUTION_MODE_SIMPLE",
  "model_used": "gpt-5-mini-2025-08-07",
  "provider": "openai",
  "usage": {
    "total_tokens": 300,
    "input_tokens": 200,
    "output_tokens": 100,
    "estimated_cost": 0.006
  }
}
响应(失败)
{
  "task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "status": "TASK_STATUS_FAILED",
  "response": null,
  "error": "LLM 服务不可用:连接超时",
  "created_at": "2025-10-22T10:30:00Z",
  "updated_at": "2025-10-22T10:30:10Z",
  "query": "法国的首都是什么?",
  "session_id": "user-123-session",
  "mode": "EXECUTION_MODE_SIMPLE"
}

错误响应

401 未授权

{
  "error": "Unauthorized"
}

404 未找到

{
  "error": "Task not found"
}

500 内部服务器错误

{
  "error": "Failed to get task status: database error"
}

代码示例

Python - 简单状态检查

import httpx

def get_task_status(task_id: str, api_key: str):
    """获取任务状态。"""
    response = httpx.get(
        f"http://localhost:8080/api/v1/tasks/{task_id}",
        headers={"X-API-Key": api_key}
    )

    if response.status_code == 404:
        return None

    return response.json()

status = get_task_status("task_abc123", "sk_test_123456")
if status:
    print(f"状态:{status['status']}")
    if status['status'] == 'TASK_STATUS_COMPLETED':
        print(f"结果:{status['result']}")

Python - 轮询直到完成

import httpx
import time

def wait_for_completion(task_id: str, api_key: str, timeout: int = 300):
    """轮询任务状态直到完成或超时。"""
    start_time = time.time()

    while True:
        response = httpx.get(
            f"http://localhost:8080/api/v1/tasks/{task_id}",
            headers={"X-API-Key": api_key}
        )

        status = response.json()
        current_status = status["status"]

        # 检查终止状态
        if current_status == "TASK_STATUS_COMPLETED":
            return status.get("result", "")
        elif current_status == "TASK_STATUS_FAILED":
            raise Exception(f"任务失败:{status['error']}")
        elif current_status == "TASK_STATUS_TIMEOUT":
            raise Exception("任务超时")
        elif current_status == "TASK_STATUS_CANCELLED":
            raise Exception("任务已被取消")

        # 检查轮询超时
        if time.time() - start_time > timeout:
            raise TimeoutError(f"轮询超时,已等待 {timeout}s")

        # 等待后再轮询
        time.sleep(2)

# 用法
try:
    result = wait_for_completion("task_abc123", "sk_test_123456")
    print("结果:", result)
except Exception as e:
    print("错误:", e)

JavaScript/Node.js

const axios = require('axios');

async function getTaskStatus(taskId) {
  try {
    const response = await axios.get(
      `http://localhost:8080/api/v1/tasks/${taskId}`,
      {
        headers: {
          'X-API-Key': 'sk_test_123456'
        }
      }
    );

    const status = response.data;

    console.log('状态:', status.status);

    if (status.status === 'TASK_STATUS_COMPLETED') {
      console.log('结果:', status.response.result);
    } else if (status.status === 'TASK_STATUS_FAILED') {
      console.error('错误:', status.error);
    }

    return status;
  } catch (error) {
    if (error.response?.status === 404) {
      console.error('任务未找到');
    } else {
      console.error('错误:', error.response?.data || error.message);
    }
    throw error;
  }
}

getTaskStatus('task_abc123');

JavaScript - 使用 Async/Await 轮询

async function waitForCompletion(taskId, timeout = 300000) {
  const startTime = Date.now();
  const pollInterval = 2000; // 2 秒

  while (true) {
    const response = await axios.get(
      `http://localhost:8080/api/v1/tasks/${taskId}`,
      { headers: { 'X-API-Key': 'sk_test_123456' } }
    );

    const { status, response: result, error } = response.data;

    if (status === 'TASK_STATUS_COMPLETED') {
      return result;
    } else if (status === 'TASK_STATUS_FAILED') {
      throw new Error(`任务失败:${error}`);
    } else if (status === 'TASK_STATUS_TIMEOUT') {
      throw new Error('任务超时');
    }

    // 检查超时
    if (Date.now() - startTime > timeout) {
      throw new Error(`轮询超时,已等待 ${timeout}ms`);
    }

    // 等待后再轮询
    await new Promise(resolve => setTimeout(resolve, pollInterval));
  }
}

// 用法
waitForCompletion('task_abc123')
  .then(result => console.log('结果:', result))
  .catch(error => console.error('错误:', error.message));

Go

package main

import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"
)

type TaskStatusResponse struct {
    TaskID    string                 `json:"task_id"`
    Status    string                 `json:"status"`
    Response  map[string]interface{} `json:"response"`
    Error     string                 `json:"error"`
    Query     string                 `json:"query"`
    SessionID string                 `json:"session_id"`
    Mode      string                 `json:"mode"`
}

func getTaskStatus(taskID string) (*TaskStatusResponse, error) {
    url := fmt.Sprintf("http://localhost:8080/api/v1/tasks/%s", taskID)

    req, _ := http.NewRequest("GET", url, nil)
    req.Header.Set("X-API-Key", "sk_test_123456")

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    if resp.StatusCode == 404 {
        return nil, fmt.Errorf("任务未找到")
    }

    var status TaskStatusResponse
    json.NewDecoder(resp.Body).Decode(&status)

    return &status, nil
}

func waitForCompletion(taskID string, timeout time.Duration) (map[string]interface{}, error) {
    start := time.Now()

    for {
        status, err := getTaskStatus(taskID)
        if err != nil {
            return nil, err
        }

        switch status.Status {
        case "TASK_STATUS_COMPLETED":
            return status.Response, nil
        case "TASK_STATUS_FAILED":
            return nil, fmt.Errorf("任务失败:%s", status.Error)
        case "TASK_STATUS_TIMEOUT":
            return nil, fmt.Errorf("任务超时")
        }

        if time.Since(start) > timeout {
            return nil, fmt.Errorf("轮询超时")
        }

        time.Sleep(2 * time.Second)
    }
}

func main() {
    result, err := waitForCompletion("task_abc123", 5*time.Minute)
    if err != nil {
        fmt.Println("错误:", err)
        return
    }

    fmt.Println("结果:", result)
}

Bash - 监控任务进度

#!/bin/bash

API_KEY="sk_test_123456"
TASK_ID="$1"

if [ -z "$TASK_ID" ]; then
  echo "用法:$0 <task_id>"
  exit 1
fi

echo "监控任务:$TASK_ID"
echo ""

while true; do
  RESPONSE=$(curl -s "http://localhost:8080/api/v1/tasks/$TASK_ID" \
    -H "X-API-Key: $API_KEY")

  STATUS=$(echo $RESPONSE | jq -r '.status')

  echo "[$(date +%T)] 状态:$STATUS"

  case $STATUS in
    "TASK_STATUS_COMPLETED")
      echo ""
      echo "✓ 任务完成!"
      echo ""
      echo $RESPONSE | jq -r '.response.result'
      exit 0
      ;;
    "TASK_STATUS_FAILED")
      echo ""
      echo "✗ 任务失败!"
      ERROR=$(echo $RESPONSE | jq -r '.error')
      echo "错误:$ERROR"
      exit 1
      ;;
    "TASK_STATUS_TIMEOUT")
      echo ""
      echo "✗ 任务超时!"
      exit 1
      ;;
    "TASK_STATUS_CANCELLED")
      echo ""
      echo "✗ 任务已取消!"
      exit 1
      ;;
  esac

  sleep 2
done

用例

1. 提交并等待模式

import httpx
import time

def submit_and_wait(query: str, api_key: str):
    """提交任务并等待结果。"""
    # 提交
    submit_response = httpx.post(
        "http://localhost:8080/api/v1/tasks",
        headers={"X-API-Key": api_key},
        json={"query": query}
    )
    task_id = submit_response.json()["task_id"]
    print(f"任务已提交:{task_id}")

    # 等待
    while True:
        status_response = httpx.get(
            f"http://localhost:8080/api/v1/tasks/{task_id}",
            headers={"X-API-Key": api_key}
        )
        status = status_response.json()

        if status["status"] == "TASK_STATUS_COMPLETED":
            return status["response"]["result"]
        elif status["status"] == "TASK_STATUS_FAILED":
            raise Exception(status["error"])

        time.sleep(2)

result = submit_and_wait("Python 是什么?", "sk_test_123456")
print(result)

2. 仪表板状态部件

def get_task_summary(task_id: str, api_key: str):
    """获取用于仪表板的任务摘要。"""
    response = httpx.get(
        f"http://localhost:8080/api/v1/tasks/{task_id}",
        headers={"X-API-Key": api_key}
    )

    status = response.json()

    return {
        "id": task_id,
        "query": status["query"][:50] + "...",
        "status": status["status"].replace("TASK_STATUS_", ""),
        "mode": status["mode"].replace("EXECUTION_MODE_", ""),
        "created": status["created_at"]
    }

# 在 UI 中显示
summary = get_task_summary("task_abc123", "sk_test_123456")
print(f"{summary['status']}{summary['query']}")

3. 批量状态检查

def check_multiple_tasks(task_ids: list, api_key: str):
    """检查多个任务的状态。"""
    results = {}

    for task_id in task_ids:
        try:
            response = httpx.get(
                f"http://localhost:8080/api/v1/tasks/{task_id}",
                headers={"X-API-Key": api_key},
                timeout=5.0
            )
            results[task_id] = response.json()["status"]
        except Exception as e:
            results[task_id] = f"错误:{e}"

    return results

# 检查 5 个任务
task_ids = ["task_1", "task_2", "task_3", "task_4", "task_5"]
statuses = check_multiple_tasks(task_ids, "sk_test_123456")

for task_id, status in statuses.items():
    print(f"{task_id}{status}")

最佳实践

1. 使用流式传输而不是轮询

对于长时间运行的任务,使用 SSE 流式传输而不是轮询:
# ❌ 不好 - 每 2 秒轮询一次
while True:
    status = httpx.get(f".../{task_id}")
    if status["status"] == "COMPLETED":
        break
    time.sleep(2)

# ✅ 好 - 使用流式传输
for event in client.stream(task_id):
    print(event.type, event.message)
    if event.type == "TASK_COMPLETED":
        break

2. 处理所有状态

status = get_task_status(task_id, api_key)

match status["status"]:
    case "TASK_STATUS_QUEUED":
        print("任务已排队...")
    case "TASK_STATUS_RUNNING":
        print("任务正在运行...")
    case "TASK_STATUS_COMPLETED":
        result = status["response"]["result"]
        print(f"结果:{result}")
    case "TASK_STATUS_FAILED":
        print(f"失败:{status['error']}")
    case "TASK_STATUS_TIMEOUT":
        print("任务超时")
    case "TASK_STATUS_CANCELLED":
        print("任务已被取消")

3. 实施指数退避

import time

def poll_with_backoff(task_id, api_key, max_wait=60):
    """使用指数退避轮询。"""
    wait_time = 1

    while True:
        status = get_task_status(task_id, api_key)

        if status["status"] in ["TASK_STATUS_COMPLETED", "TASK_STATUS_FAILED"]:
            return status

        time.sleep(wait_time)
        wait_time = min(wait_time * 2, max_wait)  # 上限为 60 秒

4. 缓存状态响应

from functools import lru_cache
import time

@lru_cache(maxsize=1000)
def get_cached_status(task_id: str, api_key: str, timestamp: int):
    """缓存状态 5 秒。"""
    return get_task_status(task_id, api_key)

# 用法
current_time = int(time.time() / 5)  # 5 秒单位
status = get_cached_status("task_abc123", "sk_test_123456", current_time)

5. 提取元数据

def extract_task_info(task_id: str, api_key: str):
    """提取有用的元数据。"""
    status = get_task_status(task_id, api_key)

    return {
        "task_id": status["task_id"],
        "query": status["query"],
        "status": status["status"],
        "mode": status["mode"],
        "session_id": status["session_id"],
        "has_result": status["response"] is not None,
        "has_error": bool(status["error"]),
        "workflow_url": f"http://localhost:8088/workflows/{task_id}"
    }

相关端点

注意

勿在生产环境轮询:对于长时间运行的任务,使用流式端点而不是轮询状态。轮询会增加不必要的负载并增加延迟。
会话追踪session_id 字段允许跟踪任务属于哪个会话,这对多轮对话和成本归属很有用。