什么是工作流?
Shannon 中的工作流是持久的、有状态的流程,用于编排 AI 智能体完成复杂任务。基于 Temporal 构建,它们提供:
持久性 :工作流在服务重启和故障后仍然存活
确定性 :可以重放以进行调试
可见性 :完整的执行历史和状态检查
可靠性 :自动重试和错误处理
工作流架构
认知模式
Shannon 为不同类型的任务实现了几种经过验证的认知模式:
思维链(CoT)
顺序推理,每个步骤在逻辑上建立在前一步之上。
最适合:
示例:
client.submit_task(
query = "计算 10 万美元投资在 8% 年回报率下 5 年的投资回报率"
)
Shannon 会自动对顺序数学和分析任务应用思维链推理。
执行:
步骤 1:确定公式(ROI = (最终价值 - 初始价值) / 初始价值)
步骤 2:逐年计算增长
步骤 3:计算最终价值($146,933)
步骤 4:计算 ROI(46.93%)
思维树(ToT)
同时探索多个解决方案路径,评估它们,并选择最佳方案。
最适合:
示例:
client.submit_task(
query = "设计一个可扩展的消息队列系统。评估多个架构并推荐最佳方法。"
)
Shannon 会自动对具有多个解决方案路径的设计和规划任务应用思维树探索。
执行:
分支 1:基于 Redis 的解决方案
├─ 优点:快速、简单、熟悉
├─ 缺点:持久性有限、受内存限制
└─ 评分:7/10
分支 2:基于 Kafka 的解决方案
├─ 优点:高吞吐量、持久、经过实战检验
├─ 缺点:运维复杂、资源密集
└─ 评分:9/10
分支 3:基于 RabbitMQ 的解决方案
├─ 优点:功能丰富、平衡性好
├─ 缺点:吞吐量低于 Kafka
└─ 评分:8/10
选择:Kafka(对需求评分最高)
ReAct(推理 + 行动)
在循环中交替思考和行动,非常适合动态问题解决。
最适合:
示例:
client.submit_task(
query = "查找最新的 Rust 异步运行时基准测试"
)
Shannon 会自动对研究和迭代发现任务应用 ReAct(推理 + 行动)模式。
执行:
思考:"我需要搜索最近的基准测试"
行动:web_search("Rust 异步运行时基准测试 2024")
观察:找到 3 篇来自 2024 年的文章
思考:"让我获取详细的比较数据"
行动:web_search("tokio vs async-std vs smol 基准比较")
观察:找到性能数据
思考:"现在我可以总结发现"
行动:将结果综合成报告
辩论模式
多个智能体争论不同观点,然后综合共识。
最适合:
示例:
client.submit_task(
query = "我们应该将单体架构迁移到微服务吗?提供赞成和反对的论据,然后推荐。"
)
Shannon 会自动对需要多个视角的决策任务应用辩论模式。
执行:
智能体 1(支持微服务):
- 更好的可扩展性
- 独立部署
- 技术灵活性
智能体 2(支持单体):
- 运维更简单
- 更容易调试
- 延迟更低
智能体 3(综合者):
- 评估两方论据
- 考虑上下文
- 提供推荐
任务分解
对于复杂任务,Shannon 会自动将它们分解为子任务:
DAG(有向无环图)执行
并行执行:
没有依赖关系的子任务并行运行,减少延迟。
示例:
# 此查询将自动分解为 3 个并行子任务
client.submit_task(
query = "研究:1) GPT-5 能力,2) Claude 3 功能,3) Gemini 更新"
)
分解策略
Shannon 在内部分析任务并创建执行计划。虽然分解结构不会直接在 SDK 响应中暴露,但您可以通过事件观察工作流执行:
# 流式传输事件以查看分解的实际操作
for event in client.stream(workflow_id):
if event.type == "TASK_DECOMPOSED" :
print ( f "任务已分解: { event.message } " )
elif event.type == "SUBTASK_STARTED" :
print ( f "子任务已开始: { event.message } " )
elif event.type == "SUBTASK_COMPLETED" :
print ( f "子任务已完成: { event.message } " )
内部结构 (用于理解,不能直接访问):
当不存在依赖关系时,子任务并行运行
综合任务等待所有子任务完成
每个子任务分配给专门的智能体
工作流活动
Temporal 工作流由活动组成 - 离散的工作单元:
活动 目的
DecomposeTask 分析任务并创建子任务 ExecuteAgent 运行单个智能体任务 SynthesizeResults 组合多个智能体的输出 UpdateSessionResult 持久化会话状态并附加助手消息到会话历史(修复 2025-11-05) RecordQuery 存储到向量记忆 FetchSessionMemory 检索相关上下文
监控工作流
通过 Python SDK
# 获取任务状态
status = client.get_status(task_id)
print ( f "状态: { status.status } " )
print ( f "进度: { status.progress } " )
if status.result:
print ( f "结果: { status.result } " )
任务分解在 Shannon 内部进行。使用事件流式传输实时观察工作流执行。
通过 Temporal UI
访问 http://localhost:8088 可以查看:
工作流执行时间线
活动状态
输入/输出载荷
错误跟踪
重放历史
确定性重放
Shannon 工作流是确定性的 - 当使用相同输入重放时,它们会产生相同的结果。
用例:
调试 :重放失败的工作流以查找错误
测试 :验证代码更改不会破坏现有工作流
审计 :准确了解发生了什么
示例:
# 导出工作流历史
make replay-export WORKFLOW_ID=task-123 OUT=history.json
# 针对当前代码重放
make replay HISTORY=history.json
# 如果代码以非确定性方式更改,重放失败
工作流配置
工作流行为通过环境变量进行配置(示例):
# 在 .env
MAX_AGENTS_PER_TASK = 5 # 最大并行智能体数(如启用)
MAX_TOKENS_PER_REQUEST = 50000 # 每次请求的令牌预算(LLM 服务)
MAX_COST_PER_REQUEST = 5.0 # 每次请求的成本限制(美元,LLM 服务)
AGENT_TIMEOUT_SECONDS = 600 # 代理执行超时(编排器)
通过流式传输事件监控工作流执行:
handle = client.submit_task( query = "复杂分析任务" )
# 流式传输工作流事件
for event in client.stream(handle.workflow_id):
print ( f "[ { event.type } ] { event.message } " )
有关所有环境变量,请参阅配置指南 。
错误处理
工作流自动处理故障:
活动自动使用指数退避进行重试: 尝试 1:立即
尝试 2:1秒后
尝试 3:2秒后
尝试 4:4秒后
...
最多:5次尝试
如果 LLM 提供商失败,熔断器打开并路由到备用: 主要:OpenAI(失败)
备用:Anthropic(健康)
如果复杂模式失败,自动回退到更简单的执行。
当达到预算限制时,任务立即停止,防止成本超支。
最佳实践
1. 选择正确的策略
将认知策略与您的任务相匹配:
任务类型 推荐策略
简单问答 DIRECT(单智能体) 研究 REACT(网络搜索 + 综合) 分析 DECOMPOSE(分解为部分) 设计 EXPLORATORY(ToT 评估)
2. 使用适当的模式
simple :直接执行,无开销
standard :任务分解,多智能体
complex :完整认知模式(CoT、ToT 等)
3. 监控进度
流式传输事件以跟踪工作流执行:
for event in client.stream(workflow_id):
if event.type == "LLM_PROMPT" :
print ( f "提示: { event.message } " )
elif event.type == "AGENT_COMPLETED" :
print ( f "智能体已完成: { event.agent_id } " )
4. 设置超时
通过环境变量防止工作流无限运行:
# 在 .env
AGENT_TIMEOUT_SECONDS = 300 # 5 分钟限制(每个代理执行)
下一步