Shannon 提供实时事件流式传输,因此您可以在任务执行时进行监控。这对于以下场景至关重要:
- 向用户提供实时反馈
- 调试工作流执行
- 构建交互式 UI
- 监控长时间运行的任务
流式传输技术
Shannon 支持两种流式传输协议:
| 协议 | 用例 | 特性 |
| SSE(服务器发送事件) | 单向服务器→客户端 | 简单、基于 HTTP、自动重连 |
| WebSocket | 双向 | 全双工、较低延迟 |
由于 SSE 的简单性和内置重连处理,建议大多数用例使用 SSE。
服务器发送事件(SSE)
使用 cURL
curl -N http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}
输出:
event: WORKFLOW_STARTED
data: {"workflow_id":"task-123","agent_id":"orchestrator","message":"任务处理已开始"}
event: DATA_PROCESSING
data: {"workflow_id":"task-123","message":"准备上下文"}
event: PROGRESS
data: {"workflow_id":"task-123","agent_id":"planner","message":"创建了包含 3 个步骤的计划"}
event: AGENT_STARTED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"正在处理查询"}
event: LLM_PROMPT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"正在搜索信息..."}
event: TOOL_INVOKED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"调用 web_search,查询='主题 A'"}
event: TOOL_OBSERVATION
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"找到 5 个结果"}
event: LLM_OUTPUT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"研究完成:找到相关信息"}
event: AGENT_COMPLETED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"任务完成"}
event: WORKFLOW_COMPLETED
data: {"workflow_id":"task-123","message":"最终综合结果已准备就绪"}
使用 Python SDK
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# 提交任务
handle = client.submit_task(
query="研究 AI 趋势并创建摘要"
)
# 流式传输事件
for event in client.stream(handle.workflow_id):
print(f"[{event.type}] {event.message}")
if event.type == "WORKFLOW_COMPLETED":
print(f"任务已完成:{event.message}")
break
事件类型
Shannon 在不同类别中发出 26 种事件类型。以下是最常用的事件类型:
核心工作流事件
| 事件类型 | 描述 | 示例消息 |
WORKFLOW_STARTED | 任务处理开始 | "任务处理已开始" |
WORKFLOW_COMPLETED | 工作流成功完成 | "全部完成" |
AGENT_STARTED | 智能体开始处理 | "正在处理查询" |
AGENT_COMPLETED | 智能体完成 | "任务完成" |
STATUS_UPDATE | 状态更新/进度 | "正在规划下一步" |
ERROR_OCCURRED | 执行期间发生错误 | "连接失败:超时" |
LLM 和工具事件
| 事件类型 | 描述 | 示例消息 |
LLM_PROMPT | 发送给 LLM 的提示 | "5 + 5 等于多少?" |
LLM_OUTPUT | 完整的 LLM 响应 | "5 + 5 等于 10" |
LLM_PARTIAL | 流式传输块(通常已过滤) | "5 + 5" |
TOOL_INVOKED | 工具执行开始 | "调用 database_query" |
TOOL_OBSERVATION | 工具结果/输出 | "查询返回 42 行" |
进度和状态事件
| 事件类型 | 描述 | 示例消息 |
PROGRESS | 步骤完成更新 | "创建了包含 3 个步骤的计划" |
DATA_PROCESSING | 处理/分析数据 | "准备上下文" |
DELEGATION | 任务委托给另一个智能体 | "移交给简单任务" |
多智能体事件(需要功能门控)
| 事件类型 | 描述 | 示例消息 |
MESSAGE_SENT | 智能体发送消息(需要 p2p_v1) | "请分析第 3 节" |
MESSAGE_RECEIVED | 智能体接收消息(需要 p2p_v1) | "收到任务" |
TEAM_RECRUITED | 招募智能体(需要 dynamic_team_v1) | "总结第 3 节" |
TEAM_RETIRED | 智能体退役(需要 dynamic_team_v1) | "任务已完成" |
WebSocket 流式传输
通过 WebSocket 连接
import asyncio
import websockets
import json
async def stream_task():
uri = f"ws://localhost:8080/api/v1/stream/ws?workflow_id={workflow_id}"
# 通过标头传递 API 密钥(网关需要基于标头的身份验证)
async with websockets.connect(
uri, extra_headers={'X-API-Key': 'sk_test_123456'}
) as websocket:
while True:
message = await websocket.recv()
event = json.loads(message)
print(f"事件:{event['type']}")
if event['type'] == 'WORKFLOW_COMPLETED':
break
asyncio.run(stream_task())
WebSocket 流式传输目前仅支持服务器到客户端。使用 REST API /api/v1/tasks/{id}/cancel 取消任务。
过滤事件
按类型过滤事件以减少噪音:
# 仅显示重要事件
for event in client.stream(workflow_id):
if event.type in ['PROGRESS', 'AGENT_COMPLETED', 'WORKFLOW_COMPLETED']:
print(f"{event.type}:{event.message}")
进度跟踪
从事件计算任务进度:
def track_progress(workflow_id):
agents_started = 0
agents_completed = 0
for event in client.stream(workflow_id):
if event.type == 'PROGRESS':
# 跟踪规划器的进度更新
print(f"进度:{event.message}")
elif event.type == 'AGENT_STARTED':
agents_started += 1
print(f"智能体已启动:{event.agent_id}")
elif event.type == 'AGENT_COMPLETED':
agents_completed += 1
if agents_started > 0:
progress = (agents_completed / agents_started) * 100
print(f"进度:{progress:.1f}% ({agents_completed}/{agents_started})")
elif event.type == 'WORKFLOW_COMPLETED':
print("✅ 任务完成!")
break
track_progress(handle.workflow_id)
输出:
进度:创建了包含 3 个步骤的计划
智能体已启动:research-agent
智能体已启动:analysis-agent
智能体已启动:writer-agent
进度:33.3% (1/3)
进度:66.7% (2/3)
进度:100.0% (3/3)
✅ 任务完成!
实时 UI 示例
React 组件
import { useEffect, useState } from 'react';
function TaskMonitor({ taskId, apiKey }) {
const [events, setEvents] = useState([]);
const [progress, setProgress] = useState(0);
useEffect(() => {
const params = new URLSearchParams({ workflow_id: taskId });
// 注意:浏览器 EventSource 不支持自定义请求头。
// 生产环境请由后端发起 SSE 并注入认证请求头;
// 开发环境可使用 GATEWAY_SKIP_AUTH=1 关闭认证。
const eventSource = new EventSource(
`http://localhost:8080/api/v1/stream/sse?${params}`
);
const handleEvent = (e) => {
const event = JSON.parse(e.data);
setEvents(prev => [...prev, event]);
// 根据工作流生命周期更新进度
if (event.type === 'WORKFLOW_STARTED') {
setProgress(10);
} else if (event.type === 'PROGRESS') {
setProgress(prev => Math.min(prev + 20, 80));
} else if (event.type === 'WORKFLOW_COMPLETED') {
setProgress(100);
}
};
// 监听特定事件类型(Shannon 使用命名的 SSE 事件)
const eventTypes = [
'WORKFLOW_STARTED',
'WORKFLOW_COMPLETED',
'AGENT_STARTED',
'AGENT_COMPLETED',
'STATUS_UPDATE',
'PROGRESS',
'ERROR_OCCURRED',
'LLM_OUTPUT',
'TOOL_INVOKED',
'TOOL_OBSERVATION'
];
eventTypes.forEach(type => {
eventSource.addEventListener(type, handleEvent);
});
// 未命名事件的备用处理器
eventSource.onmessage = handleEvent;
eventSource.onerror = () => {
console.error('SSE 连接错误');
};
return () => {
eventTypes.forEach(type => {
eventSource.removeEventListener(type, handleEvent);
});
eventSource.close();
};
}, [taskId, apiKey]);
return (
<div>
<progress value={progress} max={100} />
<ul>
{events.map((e, i) => (
<li key={i}>
<strong>[{e.type}]</strong>{' '}
{e.agent_id && <span>({e.agent_id})</span>}{' '}
{e.message}
</li>
))}
</ul>
</div>
);
}
关键实现细节:
- Shannon 发出命名的 SSE 事件(例如,
event: AGENT_STARTED),因此您必须为每种事件类型使用 addEventListener()
- 浏览器的
EventSource API 不支持自定义请求头。不要通过查询参数传递 API 密钥;应由后端代理注入请求头(或在开发环境关闭认证)
- 始终在清理函数中清除事件监听器,以防止内存泄漏
错误处理
优雅地处理连接失败:
from shannon import ShannonClient
from shannon.errors import ShannonError, ConnectionError
import time
def robust_streaming(workflow_id, max_retries=3):
client = ShannonClient(base_url="http://localhost:8080")
for attempt in range(max_retries):
try:
for event in client.stream(workflow_id):
print(f"事件:{event.type}")
if event.type == 'WORKFLOW_COMPLETED':
# 获取最终状态以检索结果
status = client.get_status(workflow_id.replace('wf-', 'task-'))
return status.result
except (ShannonError, ConnectionError) as e:
print(f"流错误(尝试 {attempt + 1}):{e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 指数退避
else:
raise
流式传输身份验证
启用身份验证时,提供 API 密钥:
带 API 密钥的 SSE
curl -N \
-H "X-API-Key: sk_test_123456" \
http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}
带 API 密钥的 Python SDK
client = ShannonClient(
base_url="http://localhost:8080",
api_key="sk_test_123456"
)
# API 密钥自动包含在所有请求中
for event in client.stream(workflow_id):
print(event)
性能考虑
事件缓冲
Shannon 缓冲事件以防止压垮客户端:
# config/shannon.yaml
streaming:
buffer_size: 100 # 最大排队事件数
flush_interval_ms: 100 # 每 100 毫秒发送批次
保持连接
SSE 每 15 秒发送保持连接注释以防止超时:
: keepalive
: keepalive
event: LLM_PROMPT
data: {"message":"..."}
资源清理
完成后始终关闭流:
# 遍历事件(自动处理连接)
for event in client.stream(workflow_id):
if event.type == 'WORKFLOW_COMPLETED':
break
# 无需显式关闭
多任务监控
同时监控多个任务:
import asyncio
async def monitor_task(client, workflow_id):
# 在异步上下文中使用同步 stream()
for event in client.stream(workflow_id):
print(f"[{workflow_id}] {event.type}")
if event.type == 'WORKFLOW_COMPLETED':
# 从状态获取最终结果
task_id = workflow_id.replace('wf-', 'task-')
status = client.get_status(task_id)
return status.result
async def monitor_all(task_ids):
client = AsyncShannonClient(base_url="http://localhost:8080")
# 并行监控所有任务
results = await asyncio.gather(*[
monitor_task(client, tid) for tid in task_ids
])
return results
# 运行
task_ids = ["task-1", "task-2", "task-3"]
results = asyncio.run(monitor_all(task_ids))
仪表板集成
Shannon 的内置仪表板(http://localhost:2111)使用 SSE 进行实时更新:
- 活动任务的实时事件源
- 实时指标更新
- 智能体状态可视化
- 令牌使用图表
最佳实践
1. 使用 SSE 进行简单监控
# ✅ 好:简单的 SSE 流式传输
for event in client.stream(workflow_id):
print(event.type)
2. 处理断开连接
# ✅ 好:重试逻辑
for attempt in range(3):
try:
for event in client.stream(workflow_id):
process_event(event)
break
except (ShannonError, ConnectionError):
if attempt == 2:
raise
time.sleep(2)
3. 过滤不必要的事件
# ✅ 好:仅关键事件
critical_events = ['PROGRESS', 'WORKFLOW_COMPLETED', 'ERROR_OCCURRED']
for event in client.stream(workflow_id):
if event.type in critical_events:
handle_event(event)
4. 设置超时
# ✅ 好:超时保护
import signal
def timeout_handler(signum, frame):
raise TimeoutError("流超时")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(300) # 5 分钟超时
try:
for event in client.stream(workflow_id):
print(event)
finally:
signal.alarm(0) # 取消警报
下一步