跳转到主要内容
此示例集合正在增长。将定期添加更多用例。

基本示例

模型选择

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")
handle = client.submit_task(
  "将段落总结为三个重点关注收入趋势的要点。输出:Markdown 列表。",
  model_tier="small",
  # model_override="gpt-5-nano-2025-08-07",
  # provider_override="openai",
  mode="simple",
)
print(client.wait(handle.task_id).result)

简单问答

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# 提出一个简单问题
handle = client.submit_task(query="法国的首都是什么?")
final = client.wait(handle.task_id)
print(f"答案:{final.result}")

数据分析

from shannon import ShannonClient

client = ShannonClient()

# 贴近真实的数据分析
handle = client.submit_task(
    query=(
        "给定以下销售数据,请计算:\n"
        "1. 各产品的总营收\n"
        "2. 环比增长百分比\n"
        "3. 返回总营收 Top 3\n\n"
        "数据:\n"
        "ProductA: Jan=$10000, Feb=$12000\n"
        "ProductB: Jan=$8000, Feb=$9500\n"
        "ProductC: Jan=$15000, Feb=$14000\n"
        "ProductD: Jan=$5000, Feb=$6000\n\n"
        "格式:JSON 数组 [{product, revenue, mom_growth_pct}]"
    )
)

result = client.wait(handle.task_id)
print(result.result)

高级示例

多步工作流(带 session_id)

"""多步工作流示例(Shannon SDK)"""
from shannon import ShannonClient

def main():
    client = ShannonClient()
    session_id = "quarterly-analysis-demo"

    print("=" * 60)
    print("多步工作流示例")
    print("=" * 60)

    # 第 1 步:初始数据查询
    print("\n[步骤 1] 加载 Q4 数据...")
    h1 = client.submit_task(
        query="1000 + 500 等于多少?其中 1000 表示 Q4 营收,500 表示支出。",
        session_id=session_id
    )
    result1 = client.wait(h1.task_id)
    print(f"结果:{result1.result}")

    # 第 2 步:基于上一结果的分析
    print("\n[步骤 2] 分析趋势...")
    h2 = client.submit_task(
        query="基于你刚刚计算的 Q4 数字,利润率(百分比)是多少?",
        session_id=session_id
    )
    result2 = client.wait(h2.task_id)
    print(f"结果:{result2.result}")

    # 第 3 步:汇总
    print("\n[步骤 3] 生成管理摘要...")
    h3 = client.submit_task(
        query="用两句话总结 Q4 财务分析。",
        session_id=session_id
    )
    result3 = client.wait(h3.task_id)
    print(f"结果:{result3.result}")

    print("\n" + "=" * 60)
    print("✅ 多步工作流完成!")
    print(f"会话 ID: {session_id}")
    print("=" * 60)

if __name__ == "__main__":
    main()

并行处理

"""并行处理示例(Shannon SDK)"""
import asyncio
from shannon import AsyncShannonClient

async def main():
    print("=" * 60)
    print("并行处理示例")
    print("=" * 60)

    async with AsyncShannonClient() as client:
        # 并行处理的主题
        topics = ["AI", "量子计算", "生物技术"]

        print(f"\n[步骤 1] 并行提交 {len(topics)} 个任务…")

        # 非阻塞并发提交
        tasks = [
            client.submit_task(
                query=f"用一句话概述 {topic} 及其主要应用。"
            )
            for topic in topics
        ]

        # 等待所有提交完成
        handles = await asyncio.gather(*tasks)
        print(f"✅ 已提交 {len(handles)} 个任务")

        print("\n[步骤 2] 并行等待所有结果…")

        # 并行等待所有结果
        results = await asyncio.gather(
            *[client.wait(h.task_id) for h in handles]
        )
        print(f"✅ 已完成 {len(results)} 个任务")

        print("\n[步骤 3] 结果:")
        print("-" * 60)

        # 展示结果
        for topic, result in zip(topics, results):
            print(f"\n📌 {topic}:")
            print(f"   {result.result}")

            # 如有可用,安全访问元数据
            metadata = []
            if hasattr(result, 'metadata') and result.metadata:
                model = result.metadata.get('model_used') or result.metadata.get('model')
                if model:
                    metadata.append(f"Model: {model}")
                tokens = result.metadata.get('total_tokens')
                if tokens:
                    metadata.append(f"Tokens: {tokens}")

            if metadata:
                print(f"   ({', '.join(metadata)})")

        print("\n" + "=" * 60)
        print("✅ 并行处理完成!")
        print(f"共并行处理 {len(topics)} 个主题")
        print("=" * 60)

if __name__ == "__main__":
    asyncio.run(main())

带过滤的流式

"""带过滤器的流式传输示例"""
from shannon import ShannonClient, EventType

def main():
    print("=" * 60)
    print("带过滤器的流式传输示例")
    print("=" * 60)

    client = ShannonClient()

    print("\n[步骤 1] 提交任务…")
    handle = client.submit_task(
        query=(
            "用 3 句话比较 AWS、Azure 与 GCP 的定价模式。"
            "在生成时以流式方式输出。"
        )
    )
    print(f"✅ 已提交任务:{handle.task_id}")

    print("\n[步骤 2] 使用过滤器流式传输 [LLM_OUTPUT, WORKFLOW_COMPLETED]…")
    print("-" * 60)

    event_count = 0
    llm_outputs = []

    # 仅流式 LLM_OUTPUT 和 WORKFLOW_COMPLETED
    for event in client.stream(
        handle.workflow_id,
        types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
    ):
        event_count += 1
        print(f"[{event_count}] {event.type}")

        if event.type == EventType.LLM_OUTPUT:
            print(f"    内容: {event.message[:80]}…")
            llm_outputs.append(event.message)

        if event.type == EventType.WORKFLOW_COMPLETED:
            print(f"    状态: {event.message}")
            break

    print("\n" + "-" * 60)
    print(f"✅ 收到 {event_count} 条过滤后的事件")
    print(f"✅ 捕获 {len(llm_outputs)} 条 LLM 输出")

    print("\n[步骤 3] 完整结果:")
    print("-" * 60)
    final = client.wait(handle.task_id)
    print(final.result)

    print("\n" + "=" * 60)
    print("✅ 带过滤器的流式传输完成!")
    print("=" * 60)

if __name__ == "__main__":
    main()
完整事件类型列表参见事件类型目录
可用事件类型
  • LLM_OUTPUT — 最终 LLM 响应
  • LLM_PARTIAL — 实时流式 token
  • WORKFLOW_COMPLETED — 任务完成
  • AGENT_THINKING — 代理推理/思考
  • AGENT_STARTED — 代理开始工作
  • AGENT_COMPLETED — 代理完成工作
  • PROGRESS — 进度更新
  • ERROR_OCCURRED — 错误
常见过滤组合
  • 仅最终输出:types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
  • 实时流式输出:types=[EventType.LLM_PARTIAL, EventType.WORKFLOW_COMPLETED]
  • 监控代理生命周期:types=[EventType.AGENT_STARTED, EventType.AGENT_COMPLETED]
  • 全部(不加过滤):types=None(或省略该参数)

真实用例

错误处理示例

参见专门页面获取完整的重试逻辑与模式:错误处理 运行方法:
# 确保 Shannon 已运行
make dev

# 运行示例
python3 error_handling.py
何时使用重试:
  • 网络不稳定
  • 临时性服务问题
  • 对可靠性有要求的生产系统
不建议重试:
  • 超时(任务本身过于复杂/耗时)
  • API 错误(参数无效等)
  • 认证失败

后续步骤