跳转到主要内容
完整的异步文档即将推出。基本模式如下所示。

概述

Shannon Python SDK 提供同步和异步客户端以实现最大灵活性。异步操作适用于:
  • 并发任务提交
  • 非阻塞事件流式传输
  • 并行 API 调用
  • 高吞吐量应用

AsyncShannonClient

基本用法

import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient(base_url="http://localhost:8080") as client:
        handle = await client.submit_task(
            query=(
                "将段落总结为 3 个重点关注收入趋势的要点。"
                "输出:Markdown 列表。"
            ),
            model_tier="small",
            mode="standard",
        )
        final = await client.wait(handle.task_id)
        print(final.result)

asyncio.run(main())

并发任务

同时提交多个任务:
import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient() as client:
        # 并发提交多个任务
        tasks = [
            client.submit_task(query="5+5 等于多少?"),
            client.submit_task(query="10*2 等于多少?"),
            client.submit_task(query="100/4 等于多少?"),
        ]

        # 等待所有提交完成
        handles = await asyncio.gather(*tasks)

        # 获取所有结果
        results = await asyncio.gather(*[client.wait(h.task_id) for h in handles])

        for i, r in enumerate(results, 1):
            print(f"任务 {i}: {r.result}")

asyncio.run(main())

异步流式传输

非阻塞流式事件。提示:不要在 async for 循环内再调用其它 await 方法;先 break 退出循环,再进行后续 await
import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient() as client:
        # 提交任务
        h = await client.submit_task(query="法国的首都是哪里?")

        # 流式读取事件
        async for e in client.stream(h.workflow_id):
            print(f"{e.type}: {e.message[:50]}")
            if e.type == "WORKFLOW_COMPLETED":
                break  # 先干净地退出循环

        # 现在再进行 await 是安全的
        final = await client.wait(h.task_id)
        print(f"\n最终结果:{final.result}")

asyncio.run(main())

超时处理

async def with_timeout():
    async with AsyncShannonClient() as client:
        try:
            # 带超时的提交
            handle = await asyncio.wait_for(
                client.submit_task(
                    query=(
                        "从段落中提取前 3 个见解,并返回 Markdown 项目符号列表。"
                    )
                ),
                timeout=30.0
            )

            # 带超时等待完成
            result = await asyncio.wait_for(
                client.wait(handle.task_id),
                timeout=300.0
            )

        except asyncio.TimeoutError:
            print("Operation timed out")

后台任务

在执行其他工作时在后台运行任务:
import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient() as client:
        # 提交任务
        handle = await client.submit_task(query="生成一篇约 500 字的季度营收驱动与风险报告。输出:Markdown。")

        # 启动后台等待(不阻塞)
        task = asyncio.create_task(client.wait(handle.task_id))

        # 在任务运行时执行其他工作
        print("在后台处理…")
        await asyncio.sleep(2)  # 这里放你的其他异步工作

        # 检查状态
        if not task.done():
            print("仍在处理…")

        # 需要时获取结果
        result = await task
        print(f"结果:{result.result}")

asyncio.run(main())

错误处理

import asyncio
from shannon import AsyncShannonClient, ConnectionError, TaskTimeoutError

async def main():
    async with AsyncShannonClient() as client:
        try:
            handle = await client.submit_task(query="什么是 AI?")
            result = await client.wait(handle.task_id)
            print(f"结果:{result.result}")

        except ConnectionError:
            print("无法连接到 Shannon")
        except TaskTimeoutError:
            print("任务超时")
        except Exception as e:
            print(f"未预期的错误:{e}")

asyncio.run(main())

与 Web 框架集成

FastAPI 示例

from contextlib import asynccontextmanager
from fastapi import FastAPI
from shannon import AsyncShannonClient

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动:创建客户端
    app.state.client = AsyncShannonClient()
    yield
    # 关闭:释放客户端
    await app.state.client.close()

app = FastAPI(lifespan=lifespan)

@app.post("/analyze")
async def analyze(query: str):
    client = app.state.client
    handle = await client.submit_task(query=query)
    result = await client.wait(handle.task_id)
    return {"result": result.result}
测试方法:
# 安装 FastAPI 与 uvicorn
pip install fastapi uvicorn

# 启动服务
uvicorn your_file:app --reload

# 测试接口
curl -X POST "http://127.0.0.1:8000/analyze?query=What+is+AI?"

最佳实践

  1. 使用上下文管理器async with)以正确清理
  2. 处理超时用于长时间运行的操作
  3. 实现重试逻辑用于网络故障
  4. **使用 gather()**进行并发操作
  5. 流式传输事件以获得实时更新

下一步