跳转到主要内容
正在扩展错误处理文档。下面显示了核心模式。

概述

Shannon Python SDK 提供全面的错误处理以帮助您构建强大的应用程序。所有 SDK 异常都继承自 ShannonError

异常层次结构

ShannonError                    # 基础异常
├── ConnectionError             # 网络/连接问题(SDK 自定义异常)
├── AuthenticationError         # API 密钥/身份验证问题
├── PermissionDeniedError       # 权限不足/禁止访问
├── ValidationError             # 无效参数
├── RateLimitError              # 速率限制/请求过多
├── ServerError                 # 上游 5xx 服务器错误
├── TaskNotFoundError           # 任务不存在
├── TaskTimeoutError            # 任务超出超时
├── TaskCancelledError          # 任务被取消
├── SessionNotFoundError        # 会话不存在
├── SessionExpiredError         # 会话已过期
├── TemplateError               # 模板/路由相关错误
└── TemplateNotFoundError       # 模板不存在
预算和任务失败处理:预算超出和任务失败不是异常。失败请检查 status.status。令牌用量与成本合计请通过 list_tasks() 读取任务列表中的 total_token_usage

基本错误处理

Try-Catch 模式

from shannon import (
    ShannonClient,
    ShannonError,
    AuthenticationError,
    PermissionDeniedError,
    ConnectionError,
    RateLimitError,
    ServerError,
    TaskTimeoutError,
    TaskStatusEnum,
)

client = ShannonClient(base_url="http://localhost:8080")

try:
    handle = client.submit_task(query="分析此数据")
    status = client.wait(handle.task_id, timeout=120)

    if status.status == TaskStatusEnum.FAILED:
        print(f"任务失败:{status.error_message}")
    else:
        print("结果:", status.result)

except ConnectionError:
    print("无法连接到 Shannon 服务器")
except AuthenticationError:
    print("无效的 API 凭证")
except PermissionDeniedError:
    print("禁止访问:权限不足")
except RateLimitError:
    print("触发速率限制,请降低频率或加入退避")
except ServerError:
    print("服务器错误(5xx),稍后再试")
except TaskTimeoutError:
    print("任务超出超时限制")
except ShannonError as e:
    print(f"Shannon 错误:{e}")
except Exception as e:
    print(f"意外错误:{e}")

特定错误类型

连接错误

处理网络和连接问题:
import time
from shannon import ShannonClient, ConnectionError

def connect_with_retry(max_retries=3):
    client = ShannonClient()

    for attempt in range(max_retries):
        try:
            # 使用简单任务测试连接
            handle = client.submit_task(query="ping")
            return client

        except ConnectionError as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # 指数退避
                print(f"连接失败,{wait_time}s 后重试...")
                time.sleep(wait_time)
            else:
                print(f"{max_retries} 次尝试后无法连接")
                raise

成本与失败检查

通过状态判断失败,并使用 list_tasks() 获取用量与成本合计:
from shannon import ShannonClient, TaskStatusEnum

client = ShannonClient()
handle = client.submit_task(query="Analyze data")
status = client.wait(handle.task_id)

# 失败检查(TaskStatus 返回的是枚举)
if status.status == TaskStatusEnum.FAILED:
    print(f"失败并出现错误:{status.error_message}")

# 用量与成本(从任务列表读取)
tasks, _ = client.list_tasks(limit=50)
summary = next((t for t in tasks if t.task_id == handle.task_id), None)
usage = summary.total_token_usage if summary else None
if usage:
    print(f"tokens={usage.total_tokens} prompt={usage.prompt_tokens} completion={usage.completion_tokens} cost=${usage.cost_usd:.6f}")

# 注意:TaskSummary.status 是字符串
if summary and summary.status == "FAILED":
    print("列表摘要指示失败")

超时错误

处理长时间运行的操作:
import asyncio
from shannon import AsyncShannonClient, TaskTimeoutError

async def with_timeout_handling():
    async with AsyncShannonClient() as client:
        try:
            # asyncio.TimeoutError 来自 wait_for 包装器(客户端)
            handle = await asyncio.wait_for(
                client.submit_task(query="复杂分析"),
                timeout=10.0,
            )
            # TaskTimeoutError 来自 Shannon,如果任务超出了自己的超时
            result = await asyncio.wait_for(client.wait(handle.task_id), timeout=60.0)
            return result

        except asyncio.TimeoutError:
            print("操作超时(客户端 asyncio 超时)")
            return None
        except TaskTimeoutError:
            print("任务超时(Shannon 报告超时)")
            return None

速率限制

优雅地处理 API 速率限制:
from shannon import ShannonClient, ConnectionError
import time

def handle_rate_limits(queries):
    client = ShannonClient()
    results = []

    for query in queries:
        while True:
            try:
                handle = client.submit_task(query=query)
                result = client.wait(handle.task_id)
                results.append(result)
                break  # 成功,移到下一个

            except ConnectionError:
                # 速率限制或瞬时网络错误
                print(f"触发速率限制/暂时性错误,{backoff}s 后重试…")
                time.sleep(backoff)
                backoff = min(backoff * 2, 30)

    return results
导入说明:from shannon import ConnectionError 指的是 SDK 的异常类型(不是 Python 内置的 ConnectionError)。速率限制与断路器等高级模式属于参考实现,请根据生产环境验证与调整。

验证错误

处理无效参数:
from shannon import ShannonClient, ValidationError

def validate_and_submit(query, session_id=None):
    client = ShannonClient()

    try:
        return client.submit_task(query=query, session_id=session_id)
    except ValidationError as e:
        print(f"无效参数:{e}")
        return None

任务失败处理

处理任务执行失败:
from shannon import ShannonClient

def handle_task_failure(query):
    client = ShannonClient()

    try:
        handle = client.submit_task(query=query)
        status = client.wait(handle.task_id, timeout=120)

        if status.status == TaskStatusEnum.FAILED:
            print(f"任务失败:{status.error_message}")
            return None
        return status
    except TaskTimeoutError:
        print("任务超时;请考虑增加超时或简化请求。")
        return None

记录错误

实现全面的错误日志:
import logging
from shannon import ShannonClient, ShannonError

# 配置日志记录
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('shannon')

def logged_task_submission(query):
    client = ShannonClient()

    try:
        logger.info(f"提交任务:{query[:50]}...")
        handle = client.submit_task(query=query)

        logger.info(f"任务已提交:task_id={handle.task_id}")
        result = client.wait(handle.task_id)

        logger.info("任务成功完成")
        return result.result

    except ShannonError as e:
        logger.error(f"Shannon 错误:{e}", exc_info=True)
        raise

    except Exception as e:
        logger.critical(f"意外错误:{e}", exc_info=True)
        raise

断路器模式

为弹性实现断路器:
class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open

    def call(self, func, *args, **kwargs):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.reset_timeout:
                self.state = "half-open"
            else:
                raise Exception("断路器是打开的")

        try:
            result = func(*args, **kwargs)
            if self.state == "half-open":
                self.state = "closed"
                self.failure_count = 0
            return result

        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = "open"

            raise

# 用法
breaker = CircuitBreaker()
client = ShannonClient()

try:
    result = breaker.call(
        client.submit_task,
        query="分析数据"
    )
except Exception as e:
    print(f"服务不可用:{e}")

最佳实践

  1. 始终在泛型异常之前捕获特定异常
  2. 使用指数退避实现重试逻辑
  3. 记录错误以供调试和监控
  4. 为关键操作提供回退选项
  5. 设置合理的超时以避免挂起
  6. 在提交前验证输入
  7. 对外部依赖项使用断路器

后续步骤

完整示例:带重试逻辑

#!/usr/bin/env python3
"""带重试逻辑的错误处理示例"""

import time
from shannon import ShannonClient, ConnectionError, TaskTimeoutError, ShannonError

def robust_task_submission(query: str, max_retries: int = 3):
    """
    使用重试逻辑提交任务,并包含全面的错误处理。

    参数:
        query: 任务查询内容
        max_retries: 可重试错误的最大重试次数

    返回:
        TaskStatus 对象;若所有尝试均失败则返回 None
    """
    client = ShannonClient()

    for attempt in range(max_retries):
        try:
            print(f"\n[尝试 {attempt + 1}/{max_retries}] 正在提交任务…")
            handle = client.submit_task(query=query)
            print(f"✅ 已提交任务:{handle.task_id}")
            print("⏳ 等待结果(300s 超时)…")
            result = client.wait(handle.task_id, timeout=300)
            print("✅ 任务成功完成")
            return result

        except ConnectionError as e:
            wait_time = 2 ** attempt  # 1s, 2s, 4s
            print(f"❌ 连接错误:{e}")
            if attempt < max_retries - 1:
                print(f"⏳ {wait_time} 秒后重试…")
                time.sleep(wait_time)
            else:
                print(f"❌ 已达最大重试次数({max_retries}),放弃。")
                raise

        except TaskTimeoutError as e:
            print(f"❌ 任务超时:{e}")
            print("⚠️  超过 300 秒限制,不再重试。")
            raise

        except ShannonError as e:
            print(f"❌ Shannon API 错误:{e}")
            print("⚠️  遇到 API 错误,不再重试。")
            raise

        except Exception as e:
            print(f"❌ 未预期错误:{type(e).__name__}: {e}")
            raise

    return None


def main():
    print("=" * 60)
    print("错误处理示例")
    print("=" * 60)

    print("\n📝 示例 1:正常执行")
    print("-" * 60)
    try:
        result = robust_task_submission("2+2 等于多少?")
        if result:
            print(f"\n最终结果:{result.result}")
    except Exception as e:
        print(f"\n⚠️  失败:{e}")

    print("\n\n📝 示例 2:超时场景")
    print("-" * 60)
    print("提示:此处演示超时处理")
    try:
        result = robust_task_submission(
            "分析一个复杂数据集…(模拟耗时较长的任务)"
        )
        if result:
            print(f"\n最终结果:{result.result}")
    except TaskTimeoutError:
        print("\n⚠️  任务超时——不重试(预期行为)")
    except Exception as e:
        print(f"\n⚠️  错误:{e}")

    print("\n\n📝 示例 3:简单的 try/except 模式")
    print("-" * 60)
    client = ShannonClient()
    try:
        handle = client.submit_task(query="法国的首都是哪里?")
        result = client.wait(handle.task_id)
        print(f"✅ 成功:{result.result}")
    except ConnectionError:
        print("❌ 网络问题——请检查 Shannon 服务")
    except TaskTimeoutError:
        print("❌ 任务耗时过长")
    except ShannonError as e:
        print(f"❌ API 错误:{e}")
    except Exception as e:
        print(f"❌ 未预期错误:{e}")

    print("\n" + "=" * 60)
    print("✅ 错误处理示例完成!")
    print("=" * 60)


if __name__ == "__main__":
    main()
运行方法:
# 确保 Shannon 已运行
make dev

# 运行示例
python3 error_handling.py
何时使用重试:
  • 网络不稳定
  • 临时性服务问题
  • 对可靠性有要求的生产系统
不建议重试:
  • 超时(任务本身过于复杂/耗时)
  • API 错误(参数无效等)
  • 认证失败