正在扩展错误处理文档。下面显示了核心模式。
概述
Shannon Python SDK 提供全面的错误处理以帮助您构建强大的应用程序。所有 SDK 异常都继承自ShannonError。
异常层次结构
Copy
ShannonError # 基础异常
├── ConnectionError # 网络/连接问题(SDK 自定义异常)
├── AuthenticationError # API 密钥/身份验证问题
├── PermissionDeniedError # 权限不足/禁止访问
├── ValidationError # 无效参数
├── RateLimitError # 速率限制/请求过多
├── ServerError # 上游 5xx 服务器错误
├── TaskNotFoundError # 任务不存在
├── TaskTimeoutError # 任务超出超时
├── TaskCancelledError # 任务被取消
├── SessionNotFoundError # 会话不存在
├── SessionExpiredError # 会话已过期
├── TemplateError # 模板/路由相关错误
└── TemplateNotFoundError # 模板不存在
预算和任务失败处理:预算超出和任务失败不是异常。失败请检查
status.status。令牌用量与成本合计请通过 list_tasks() 读取任务列表中的 total_token_usage。基本错误处理
Try-Catch 模式
Copy
from shannon import (
ShannonClient,
ShannonError,
AuthenticationError,
PermissionDeniedError,
ConnectionError,
RateLimitError,
ServerError,
TaskTimeoutError,
TaskStatusEnum,
)
client = ShannonClient(base_url="http://localhost:8080")
try:
handle = client.submit_task(query="分析此数据")
status = client.wait(handle.task_id, timeout=120)
if status.status == TaskStatusEnum.FAILED:
print(f"任务失败:{status.error_message}")
else:
print("结果:", status.result)
except ConnectionError:
print("无法连接到 Shannon 服务器")
except AuthenticationError:
print("无效的 API 凭证")
except PermissionDeniedError:
print("禁止访问:权限不足")
except RateLimitError:
print("触发速率限制,请降低频率或加入退避")
except ServerError:
print("服务器错误(5xx),稍后再试")
except TaskTimeoutError:
print("任务超出超时限制")
except ShannonError as e:
print(f"Shannon 错误:{e}")
except Exception as e:
print(f"意外错误:{e}")
特定错误类型
连接错误
处理网络和连接问题:Copy
import time
from shannon import ShannonClient, ConnectionError
def connect_with_retry(max_retries=3):
client = ShannonClient()
for attempt in range(max_retries):
try:
# 使用简单任务测试连接
handle = client.submit_task(query="ping")
return client
except ConnectionError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 指数退避
print(f"连接失败,{wait_time}s 后重试...")
time.sleep(wait_time)
else:
print(f"{max_retries} 次尝试后无法连接")
raise
成本与失败检查
通过状态判断失败,并使用list_tasks() 获取用量与成本合计:
Copy
from shannon import ShannonClient, TaskStatusEnum
client = ShannonClient()
handle = client.submit_task(query="Analyze data")
status = client.wait(handle.task_id)
# 失败检查(TaskStatus 返回的是枚举)
if status.status == TaskStatusEnum.FAILED:
print(f"失败并出现错误:{status.error_message}")
# 用量与成本(从任务列表读取)
tasks, _ = client.list_tasks(limit=50)
summary = next((t for t in tasks if t.task_id == handle.task_id), None)
usage = summary.total_token_usage if summary else None
if usage:
print(f"tokens={usage.total_tokens} prompt={usage.prompt_tokens} completion={usage.completion_tokens} cost=${usage.cost_usd:.6f}")
# 注意:TaskSummary.status 是字符串
if summary and summary.status == "FAILED":
print("列表摘要指示失败")
超时错误
处理长时间运行的操作:Copy
import asyncio
from shannon import AsyncShannonClient, TaskTimeoutError
async def with_timeout_handling():
async with AsyncShannonClient() as client:
try:
# asyncio.TimeoutError 来自 wait_for 包装器(客户端)
handle = await asyncio.wait_for(
client.submit_task(query="复杂分析"),
timeout=10.0,
)
# TaskTimeoutError 来自 Shannon,如果任务超出了自己的超时
result = await asyncio.wait_for(client.wait(handle.task_id), timeout=60.0)
return result
except asyncio.TimeoutError:
print("操作超时(客户端 asyncio 超时)")
return None
except TaskTimeoutError:
print("任务超时(Shannon 报告超时)")
return None
速率限制
优雅地处理 API 速率限制:Copy
from shannon import ShannonClient, ConnectionError
import time
def handle_rate_limits(queries):
client = ShannonClient()
results = []
for query in queries:
while True:
try:
handle = client.submit_task(query=query)
result = client.wait(handle.task_id)
results.append(result)
break # 成功,移到下一个
except ConnectionError:
# 速率限制或瞬时网络错误
print(f"触发速率限制/暂时性错误,{backoff}s 后重试…")
time.sleep(backoff)
backoff = min(backoff * 2, 30)
return results
导入说明:
from shannon import ConnectionError 指的是 SDK 的异常类型(不是 Python 内置的 ConnectionError)。速率限制与断路器等高级模式属于参考实现,请根据生产环境验证与调整。验证错误
处理无效参数:Copy
from shannon import ShannonClient, ValidationError
def validate_and_submit(query, session_id=None):
client = ShannonClient()
try:
return client.submit_task(query=query, session_id=session_id)
except ValidationError as e:
print(f"无效参数:{e}")
return None
任务失败处理
处理任务执行失败:Copy
from shannon import ShannonClient
def handle_task_failure(query):
client = ShannonClient()
try:
handle = client.submit_task(query=query)
status = client.wait(handle.task_id, timeout=120)
if status.status == TaskStatusEnum.FAILED:
print(f"任务失败:{status.error_message}")
return None
return status
except TaskTimeoutError:
print("任务超时;请考虑增加超时或简化请求。")
return None
记录错误
实现全面的错误日志:Copy
import logging
from shannon import ShannonClient, ShannonError
# 配置日志记录
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('shannon')
def logged_task_submission(query):
client = ShannonClient()
try:
logger.info(f"提交任务:{query[:50]}...")
handle = client.submit_task(query=query)
logger.info(f"任务已提交:task_id={handle.task_id}")
result = client.wait(handle.task_id)
logger.info("任务成功完成")
return result.result
except ShannonError as e:
logger.error(f"Shannon 错误:{e}", exc_info=True)
raise
except Exception as e:
logger.critical(f"意外错误:{e}", exc_info=True)
raise
断路器模式
为弹性实现断路器:Copy
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "half-open"
else:
raise Exception("断路器是打开的")
try:
result = func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
# 用法
breaker = CircuitBreaker()
client = ShannonClient()
try:
result = breaker.call(
client.submit_task,
query="分析数据"
)
except Exception as e:
print(f"服务不可用:{e}")
最佳实践
- 始终在泛型异常之前捕获特定异常
- 使用指数退避实现重试逻辑
- 记录错误以供调试和监控
- 为关键操作提供回退选项
- 设置合理的超时以避免挂起
- 在提交前验证输入
- 对外部依赖项使用断路器
后续步骤
完整示例:带重试逻辑
Copy
#!/usr/bin/env python3
"""带重试逻辑的错误处理示例"""
import time
from shannon import ShannonClient, ConnectionError, TaskTimeoutError, ShannonError
def robust_task_submission(query: str, max_retries: int = 3):
"""
使用重试逻辑提交任务,并包含全面的错误处理。
参数:
query: 任务查询内容
max_retries: 可重试错误的最大重试次数
返回:
TaskStatus 对象;若所有尝试均失败则返回 None
"""
client = ShannonClient()
for attempt in range(max_retries):
try:
print(f"\n[尝试 {attempt + 1}/{max_retries}] 正在提交任务…")
handle = client.submit_task(query=query)
print(f"✅ 已提交任务:{handle.task_id}")
print("⏳ 等待结果(300s 超时)…")
result = client.wait(handle.task_id, timeout=300)
print("✅ 任务成功完成")
return result
except ConnectionError as e:
wait_time = 2 ** attempt # 1s, 2s, 4s
print(f"❌ 连接错误:{e}")
if attempt < max_retries - 1:
print(f"⏳ {wait_time} 秒后重试…")
time.sleep(wait_time)
else:
print(f"❌ 已达最大重试次数({max_retries}),放弃。")
raise
except TaskTimeoutError as e:
print(f"❌ 任务超时:{e}")
print("⚠️ 超过 300 秒限制,不再重试。")
raise
except ShannonError as e:
print(f"❌ Shannon API 错误:{e}")
print("⚠️ 遇到 API 错误,不再重试。")
raise
except Exception as e:
print(f"❌ 未预期错误:{type(e).__name__}: {e}")
raise
return None
def main():
print("=" * 60)
print("错误处理示例")
print("=" * 60)
print("\n📝 示例 1:正常执行")
print("-" * 60)
try:
result = robust_task_submission("2+2 等于多少?")
if result:
print(f"\n最终结果:{result.result}")
except Exception as e:
print(f"\n⚠️ 失败:{e}")
print("\n\n📝 示例 2:超时场景")
print("-" * 60)
print("提示:此处演示超时处理")
try:
result = robust_task_submission(
"分析一个复杂数据集…(模拟耗时较长的任务)"
)
if result:
print(f"\n最终结果:{result.result}")
except TaskTimeoutError:
print("\n⚠️ 任务超时——不重试(预期行为)")
except Exception as e:
print(f"\n⚠️ 错误:{e}")
print("\n\n📝 示例 3:简单的 try/except 模式")
print("-" * 60)
client = ShannonClient()
try:
handle = client.submit_task(query="法国的首都是哪里?")
result = client.wait(handle.task_id)
print(f"✅ 成功:{result.result}")
except ConnectionError:
print("❌ 网络问题——请检查 Shannon 服务")
except TaskTimeoutError:
print("❌ 任务耗时过长")
except ShannonError as e:
print(f"❌ API 错误:{e}")
except Exception as e:
print(f"❌ 未预期错误:{e}")
print("\n" + "=" * 60)
print("✅ 错误处理示例完成!")
print("=" * 60)
if __name__ == "__main__":
main()
Copy
# 确保 Shannon 已运行
make dev
# 运行示例
python3 error_handling.py
- 网络不稳定
- 临时性服务问题
- 对可靠性有要求的生产系统
- 超时(任务本身过于复杂/耗时)
- API 错误(参数无效等)
- 认证失败