此示例集合正在增长。将定期添加更多用例。
基本示例
模型选择
Copy
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
handle = client.submit_task(
"将段落总结为三个重点关注收入趋势的要点。输出:Markdown 列表。",
model_tier="small",
# model_override="gpt-5-nano-2025-08-07",
# provider_override="openai",
mode="simple",
)
print(client.wait(handle.task_id).result)
简单问答
Copy
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# 提出一个简单问题
handle = client.submit_task(query="法国的首都是什么?")
final = client.wait(handle.task_id)
print(f"答案:{final.result}")
数据分析
Copy
from shannon import ShannonClient
client = ShannonClient()
# 贴近真实的数据分析
handle = client.submit_task(
query=(
"给定以下销售数据,请计算:\n"
"1. 各产品的总营收\n"
"2. 环比增长百分比\n"
"3. 返回总营收 Top 3\n\n"
"数据:\n"
"ProductA: Jan=$10000, Feb=$12000\n"
"ProductB: Jan=$8000, Feb=$9500\n"
"ProductC: Jan=$15000, Feb=$14000\n"
"ProductD: Jan=$5000, Feb=$6000\n\n"
"格式:JSON 数组 [{product, revenue, mom_growth_pct}]"
)
)
result = client.wait(handle.task_id)
print(result.result)
高级示例
多步工作流(带 session_id)
Copy
"""多步工作流示例(Shannon SDK)"""
from shannon import ShannonClient
def main():
client = ShannonClient()
session_id = "quarterly-analysis-demo"
print("=" * 60)
print("多步工作流示例")
print("=" * 60)
# 第 1 步:初始数据查询
print("\n[步骤 1] 加载 Q4 数据...")
h1 = client.submit_task(
query="1000 + 500 等于多少?其中 1000 表示 Q4 营收,500 表示支出。",
session_id=session_id
)
result1 = client.wait(h1.task_id)
print(f"结果:{result1.result}")
# 第 2 步:基于上一结果的分析
print("\n[步骤 2] 分析趋势...")
h2 = client.submit_task(
query="基于你刚刚计算的 Q4 数字,利润率(百分比)是多少?",
session_id=session_id
)
result2 = client.wait(h2.task_id)
print(f"结果:{result2.result}")
# 第 3 步:汇总
print("\n[步骤 3] 生成管理摘要...")
h3 = client.submit_task(
query="用两句话总结 Q4 财务分析。",
session_id=session_id
)
result3 = client.wait(h3.task_id)
print(f"结果:{result3.result}")
print("\n" + "=" * 60)
print("✅ 多步工作流完成!")
print(f"会话 ID: {session_id}")
print("=" * 60)
if __name__ == "__main__":
main()
并行处理
Copy
"""并行处理示例(Shannon SDK)"""
import asyncio
from shannon import AsyncShannonClient
async def main():
print("=" * 60)
print("并行处理示例")
print("=" * 60)
async with AsyncShannonClient() as client:
# 并行处理的主题
topics = ["AI", "量子计算", "生物技术"]
print(f"\n[步骤 1] 并行提交 {len(topics)} 个任务…")
# 非阻塞并发提交
tasks = [
client.submit_task(
query=f"用一句话概述 {topic} 及其主要应用。"
)
for topic in topics
]
# 等待所有提交完成
handles = await asyncio.gather(*tasks)
print(f"✅ 已提交 {len(handles)} 个任务")
print("\n[步骤 2] 并行等待所有结果…")
# 并行等待所有结果
results = await asyncio.gather(
*[client.wait(h.task_id) for h in handles]
)
print(f"✅ 已完成 {len(results)} 个任务")
print("\n[步骤 3] 结果:")
print("-" * 60)
# 展示结果
for topic, result in zip(topics, results):
print(f"\n📌 {topic}:")
print(f" {result.result}")
# 如有可用,安全访问元数据
metadata = []
if hasattr(result, 'metadata') and result.metadata:
model = result.metadata.get('model_used') or result.metadata.get('model')
if model:
metadata.append(f"Model: {model}")
tokens = result.metadata.get('total_tokens')
if tokens:
metadata.append(f"Tokens: {tokens}")
if metadata:
print(f" ({', '.join(metadata)})")
print("\n" + "=" * 60)
print("✅ 并行处理完成!")
print(f"共并行处理 {len(topics)} 个主题")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())
带过滤的流式
Copy
"""带过滤器的流式传输示例"""
from shannon import ShannonClient, EventType
def main():
print("=" * 60)
print("带过滤器的流式传输示例")
print("=" * 60)
client = ShannonClient()
print("\n[步骤 1] 提交任务…")
handle = client.submit_task(
query=(
"用 3 句话比较 AWS、Azure 与 GCP 的定价模式。"
"在生成时以流式方式输出。"
)
)
print(f"✅ 已提交任务:{handle.task_id}")
print("\n[步骤 2] 使用过滤器流式传输 [LLM_OUTPUT, WORKFLOW_COMPLETED]…")
print("-" * 60)
event_count = 0
llm_outputs = []
# 仅流式 LLM_OUTPUT 和 WORKFLOW_COMPLETED
for event in client.stream(
handle.workflow_id,
types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
):
event_count += 1
print(f"[{event_count}] {event.type}")
if event.type == EventType.LLM_OUTPUT:
print(f" 内容: {event.message[:80]}…")
llm_outputs.append(event.message)
if event.type == EventType.WORKFLOW_COMPLETED:
print(f" 状态: {event.message}")
break
print("\n" + "-" * 60)
print(f"✅ 收到 {event_count} 条过滤后的事件")
print(f"✅ 捕获 {len(llm_outputs)} 条 LLM 输出")
print("\n[步骤 3] 完整结果:")
print("-" * 60)
final = client.wait(handle.task_id)
print(final.result)
print("\n" + "=" * 60)
print("✅ 带过滤器的流式传输完成!")
print("=" * 60)
if __name__ == "__main__":
main()
可用事件类型
LLM_OUTPUT— 最终 LLM 响应LLM_PARTIAL— 实时流式 tokenWORKFLOW_COMPLETED— 任务完成AGENT_THINKING— 代理推理/思考AGENT_STARTED— 代理开始工作AGENT_COMPLETED— 代理完成工作PROGRESS— 进度更新ERROR_OCCURRED— 错误
- 仅最终输出:
types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED] - 实时流式输出:
types=[EventType.LLM_PARTIAL, EventType.WORKFLOW_COMPLETED] - 监控代理生命周期:
types=[EventType.AGENT_STARTED, EventType.AGENT_COMPLETED] - 全部(不加过滤):
types=None(或省略该参数)
真实用例
错误处理示例
参见专门页面获取完整的重试逻辑与模式:错误处理。 运行方法:Copy
# 确保 Shannon 已运行
make dev
# 运行示例
python3 error_handling.py
- 网络不稳定
- 临时性服务问题
- 对可靠性有要求的生产系统
- 超时(任务本身过于复杂/耗时)
- API 错误(参数无效等)
- 认证失败