完整的异步文档即将推出。基本模式如下所示。
概述
Shannon Python SDK 提供同步和异步客户端以实现最大灵活性。异步操作适用于:- 并发任务提交
- 非阻塞事件流式传输
- 并行 API 调用
- 高吞吐量应用
AsyncShannonClient
基本用法
Copy
import asyncio
from shannon import AsyncShannonClient
async def main():
async with AsyncShannonClient(base_url="http://localhost:8080") as client:
handle = await client.submit_task(
query=(
"将段落总结为 3 个重点关注收入趋势的要点。"
"输出:Markdown 列表。"
),
model_tier="small",
mode="standard",
)
final = await client.wait(handle.task_id)
print(final.result)
asyncio.run(main())
并发任务
同时提交多个任务:Copy
import asyncio
from shannon import AsyncShannonClient
async def main():
async with AsyncShannonClient() as client:
# 并发提交多个任务
tasks = [
client.submit_task(query="5+5 等于多少?"),
client.submit_task(query="10*2 等于多少?"),
client.submit_task(query="100/4 等于多少?"),
]
# 等待所有提交完成
handles = await asyncio.gather(*tasks)
# 获取所有结果
results = await asyncio.gather(*[client.wait(h.task_id) for h in handles])
for i, r in enumerate(results, 1):
print(f"任务 {i}: {r.result}")
asyncio.run(main())
异步流式传输
非阻塞流式事件。提示:不要在async for 循环内再调用其它 await 方法;先 break 退出循环,再进行后续 await:
Copy
import asyncio
from shannon import AsyncShannonClient
async def main():
async with AsyncShannonClient() as client:
# 提交任务
h = await client.submit_task(query="法国的首都是哪里?")
# 流式读取事件
async for e in client.stream(h.workflow_id):
print(f"{e.type}: {e.message[:50]}")
if e.type == "WORKFLOW_COMPLETED":
break # 先干净地退出循环
# 现在再进行 await 是安全的
final = await client.wait(h.task_id)
print(f"\n最终结果:{final.result}")
asyncio.run(main())
超时处理
Copy
async def with_timeout():
async with AsyncShannonClient() as client:
try:
# 带超时的提交
handle = await asyncio.wait_for(
client.submit_task(
query=(
"从段落中提取前 3 个见解,并返回 Markdown 项目符号列表。"
)
),
timeout=30.0
)
# 带超时等待完成
result = await asyncio.wait_for(
client.wait(handle.task_id),
timeout=300.0
)
except asyncio.TimeoutError:
print("Operation timed out")
后台任务
在执行其他工作时在后台运行任务:Copy
import asyncio
from shannon import AsyncShannonClient
async def main():
async with AsyncShannonClient() as client:
# 提交任务
handle = await client.submit_task(query="生成一篇约 500 字的季度营收驱动与风险报告。输出:Markdown。")
# 启动后台等待(不阻塞)
task = asyncio.create_task(client.wait(handle.task_id))
# 在任务运行时执行其他工作
print("在后台处理…")
await asyncio.sleep(2) # 这里放你的其他异步工作
# 检查状态
if not task.done():
print("仍在处理…")
# 需要时获取结果
result = await task
print(f"结果:{result.result}")
asyncio.run(main())
错误处理
Copy
import asyncio
from shannon import AsyncShannonClient, ConnectionError, TaskTimeoutError
async def main():
async with AsyncShannonClient() as client:
try:
handle = await client.submit_task(query="什么是 AI?")
result = await client.wait(handle.task_id)
print(f"结果:{result.result}")
except ConnectionError:
print("无法连接到 Shannon")
except TaskTimeoutError:
print("任务超时")
except Exception as e:
print(f"未预期的错误:{e}")
asyncio.run(main())
与 Web 框架集成
FastAPI 示例
Copy
from contextlib import asynccontextmanager
from fastapi import FastAPI
from shannon import AsyncShannonClient
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动:创建客户端
app.state.client = AsyncShannonClient()
yield
# 关闭:释放客户端
await app.state.client.close()
app = FastAPI(lifespan=lifespan)
@app.post("/analyze")
async def analyze(query: str):
client = app.state.client
handle = await client.submit_task(query=query)
result = await client.wait(handle.task_id)
return {"result": result.result}
Copy
# 安装 FastAPI 与 uvicorn
pip install fastapi uvicorn
# 启动服务
uvicorn your_file:app --reload
# 测试接口
curl -X POST "http://127.0.0.1:8000/analyze?query=What+is+AI?"
最佳实践
- 使用上下文管理器(
async with)以正确清理 - 处理超时用于长时间运行的操作
- 实现重试逻辑用于网络故障
- **使用 gather()**进行并发操作
- 流式传输事件以获得实时更新