跳转到主要内容

概述

Shannon 提供实时事件流式传输,因此您可以在任务执行时进行监控。这对于以下场景至关重要:
  • 向用户提供实时反馈
  • 调试工作流执行
  • 构建交互式 UI
  • 监控长时间运行的任务

流式传输技术

Shannon 支持两种流式传输协议:
协议用例特性
SSE(服务器发送事件)单向服务器→客户端简单、基于 HTTP、自动重连
WebSocket双向全双工、较低延迟
由于 SSE 的简单性和内置重连处理,建议大多数用例使用 SSE。

服务器发送事件(SSE)

使用 cURL

curl -N http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}
输出:
event: WORKFLOW_STARTED
data: {"workflow_id":"task-123","agent_id":"orchestrator","message":"任务处理已开始"}

event: DATA_PROCESSING
data: {"workflow_id":"task-123","message":"准备上下文"}

event: PROGRESS
data: {"workflow_id":"task-123","agent_id":"planner","message":"创建了包含 3 个步骤的计划"}

event: AGENT_STARTED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"正在处理查询"}

event: LLM_PROMPT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"正在搜索信息..."}

event: TOOL_INVOKED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"调用 web_search,查询='主题 A'"}

event: TOOL_OBSERVATION
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"找到 5 个结果"}

event: LLM_OUTPUT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"研究完成:找到相关信息"}

event: AGENT_COMPLETED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"任务完成"}

event: WORKFLOW_COMPLETED
data: {"workflow_id":"task-123","message":"最终综合结果已准备就绪"}

使用 Python SDK

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# 提交任务
handle = client.submit_task(
    query="研究 AI 趋势并创建摘要"
)

# 流式传输事件
for event in client.stream(handle.workflow_id):
    print(f"[{event.type}] {event.message}")

    if event.type == "WORKFLOW_COMPLETED":
        print(f"任务已完成:{event.message}")
        break

事件类型

Shannon 在不同类别中发出 26 种事件类型。以下是最常用的事件类型:

核心工作流事件

事件类型描述示例消息
WORKFLOW_STARTED任务处理开始"任务处理已开始"
WORKFLOW_COMPLETED工作流成功完成"全部完成"
AGENT_STARTED智能体开始处理"正在处理查询"
AGENT_COMPLETED智能体完成"任务完成"
STATUS_UPDATE状态更新/进度"正在规划下一步"
ERROR_OCCURRED执行期间发生错误"连接失败:超时"

LLM 和工具事件

事件类型描述示例消息
LLM_PROMPT发送给 LLM 的提示"5 + 5 等于多少?"
LLM_OUTPUT完整的 LLM 响应"5 + 5 等于 10"
LLM_PARTIAL流式传输块(通常已过滤)"5 + 5"
TOOL_INVOKED工具执行开始"调用 database_query"
TOOL_OBSERVATION工具结果/输出"查询返回 42 行"

进度和状态事件

事件类型描述示例消息
PROGRESS步骤完成更新"创建了包含 3 个步骤的计划"
DATA_PROCESSING处理/分析数据"准备上下文"
DELEGATION任务委托给另一个智能体"移交给简单任务"

多智能体事件(需要功能门控)

事件类型描述示例消息
MESSAGE_SENT智能体发送消息(需要 p2p_v1"请分析第 3 节"
MESSAGE_RECEIVED智能体接收消息(需要 p2p_v1"收到任务"
TEAM_RECRUITED招募智能体(需要 dynamic_team_v1"总结第 3 节"
TEAM_RETIRED智能体退役(需要 dynamic_team_v1"任务已完成"
有关所有 26 种事件类型的完整列表,请参阅事件类型参考

WebSocket 流式传输

通过 WebSocket 连接

import asyncio
import websockets
import json

async def stream_task():
    uri = f"ws://localhost:8080/api/v1/stream/ws?workflow_id={workflow_id}"

    # 通过标头传递 API 密钥(网关需要基于标头的身份验证)
    async with websockets.connect(
        uri, extra_headers={'X-API-Key': 'sk_test_123456'}
    ) as websocket:
        while True:
            message = await websocket.recv()
            event = json.loads(message)
            print(f"事件:{event['type']}")

            if event['type'] == 'WORKFLOW_COMPLETED':
                break

asyncio.run(stream_task())
WebSocket 流式传输目前仅支持服务器到客户端。使用 REST API /api/v1/tasks/{id}/cancel 取消任务。

过滤事件

按类型过滤事件以减少噪音:
# 仅显示重要事件
for event in client.stream(workflow_id):
    if event.type in ['PROGRESS', 'AGENT_COMPLETED', 'WORKFLOW_COMPLETED']:
        print(f"{event.type}{event.message}")

进度跟踪

从事件计算任务进度:
def track_progress(workflow_id):
    agents_started = 0
    agents_completed = 0

    for event in client.stream(workflow_id):
        if event.type == 'PROGRESS':
            # 跟踪规划器的进度更新
            print(f"进度:{event.message}")

        elif event.type == 'AGENT_STARTED':
            agents_started += 1
            print(f"智能体已启动:{event.agent_id}")

        elif event.type == 'AGENT_COMPLETED':
            agents_completed += 1
            if agents_started > 0:
                progress = (agents_completed / agents_started) * 100
                print(f"进度:{progress:.1f}% ({agents_completed}/{agents_started})")

        elif event.type == 'WORKFLOW_COMPLETED':
            print("✅ 任务完成!")
            break

track_progress(handle.workflow_id)
输出:
进度:创建了包含 3 个步骤的计划
智能体已启动:research-agent
智能体已启动:analysis-agent
智能体已启动:writer-agent
进度:33.3% (1/3)
进度:66.7% (2/3)
进度:100.0% (3/3)
✅ 任务完成!

实时 UI 示例

React 组件

import { useEffect, useState } from 'react';

function TaskMonitor({ taskId, apiKey }) {
  const [events, setEvents] = useState([]);
  const [progress, setProgress] = useState(0);

  useEffect(() => {
    const params = new URLSearchParams({ workflow_id: taskId });

    // 注意:浏览器 EventSource 不支持自定义请求头。
    // 生产环境请由后端发起 SSE 并注入认证请求头;
    // 开发环境可使用 GATEWAY_SKIP_AUTH=1 关闭认证。

    const eventSource = new EventSource(
      `http://localhost:8080/api/v1/stream/sse?${params}`
    );

    const handleEvent = (e) => {
      const event = JSON.parse(e.data);
      setEvents(prev => [...prev, event]);

      // 根据工作流生命周期更新进度
      if (event.type === 'WORKFLOW_STARTED') {
        setProgress(10);
      } else if (event.type === 'PROGRESS') {
        setProgress(prev => Math.min(prev + 20, 80));
      } else if (event.type === 'WORKFLOW_COMPLETED') {
        setProgress(100);
      }
    };

    // 监听特定事件类型(Shannon 使用命名的 SSE 事件)
    const eventTypes = [
      'WORKFLOW_STARTED',
      'WORKFLOW_COMPLETED',
      'AGENT_STARTED',
      'AGENT_COMPLETED',
      'STATUS_UPDATE',
      'PROGRESS',
      'ERROR_OCCURRED',
      'LLM_OUTPUT',
      'TOOL_INVOKED',
      'TOOL_OBSERVATION'
    ];

    eventTypes.forEach(type => {
      eventSource.addEventListener(type, handleEvent);
    });

    // 未命名事件的备用处理器
    eventSource.onmessage = handleEvent;

    eventSource.onerror = () => {
      console.error('SSE 连接错误');
    };

    return () => {
      eventTypes.forEach(type => {
        eventSource.removeEventListener(type, handleEvent);
      });
      eventSource.close();
    };
  }, [taskId, apiKey]);

  return (
    <div>
      <progress value={progress} max={100} />
      <ul>
        {events.map((e, i) => (
          <li key={i}>
            <strong>[{e.type}]</strong>{' '}
            {e.agent_id && <span>({e.agent_id})</span>}{' '}
            {e.message}
          </li>
        ))}
      </ul>
    </div>
  );
}
关键实现细节:
  • Shannon 发出命名的 SSE 事件(例如,event: AGENT_STARTED),因此您必须为每种事件类型使用 addEventListener()
  • 浏览器的 EventSource API 不支持自定义请求头。不要通过查询参数传递 API 密钥;应由后端代理注入请求头(或在开发环境关闭认证)
  • 始终在清理函数中清除事件监听器,以防止内存泄漏

错误处理

优雅地处理连接失败:
from shannon import ShannonClient
from shannon.errors import ShannonError, ConnectionError
import time

def robust_streaming(workflow_id, max_retries=3):
    client = ShannonClient(base_url="http://localhost:8080")

    for attempt in range(max_retries):
        try:
            for event in client.stream(workflow_id):
                print(f"事件:{event.type}")

                if event.type == 'WORKFLOW_COMPLETED':
                    # 获取最终状态以检索结果
                    status = client.get_status(workflow_id.replace('wf-', 'task-'))
                    return status.result

        except (ShannonError, ConnectionError) as e:
            print(f"流错误(尝试 {attempt + 1}):{e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # 指数退避
            else:
                raise

流式传输身份验证

启用身份验证时,提供 API 密钥:

带 API 密钥的 SSE

curl -N \
  -H "X-API-Key: sk_test_123456" \
  http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}

带 API 密钥的 Python SDK

client = ShannonClient(
    base_url="http://localhost:8080",
    api_key="sk_test_123456"
)

# API 密钥自动包含在所有请求中
for event in client.stream(workflow_id):
    print(event)

性能考虑

事件缓冲

Shannon 缓冲事件以防止压垮客户端:
# config/shannon.yaml
streaming:
  buffer_size: 100  # 最大排队事件数
  flush_interval_ms: 100  # 每 100 毫秒发送批次

保持连接

SSE 每 15 秒发送保持连接注释以防止超时:
: keepalive
: keepalive
event: LLM_PROMPT
data: {"message":"..."}

资源清理

完成后始终关闭流:
# 遍历事件(自动处理连接)
for event in client.stream(workflow_id):
    if event.type == 'WORKFLOW_COMPLETED':
        break
# 无需显式关闭

多任务监控

同时监控多个任务:
import asyncio

async def monitor_task(client, workflow_id):
    # 在异步上下文中使用同步 stream()
    for event in client.stream(workflow_id):
        print(f"[{workflow_id}] {event.type}")
        if event.type == 'WORKFLOW_COMPLETED':
            # 从状态获取最终结果
            task_id = workflow_id.replace('wf-', 'task-')
            status = client.get_status(task_id)
            return status.result

async def monitor_all(task_ids):
    client = AsyncShannonClient(base_url="http://localhost:8080")

    # 并行监控所有任务
    results = await asyncio.gather(*[
        monitor_task(client, tid) for tid in task_ids
    ])

    return results

# 运行
task_ids = ["task-1", "task-2", "task-3"]
results = asyncio.run(monitor_all(task_ids))

仪表板集成

Shannon 的内置仪表板(http://localhost:2111)使用 SSE 进行实时更新:
  • 活动任务的实时事件源
  • 实时指标更新
  • 智能体状态可视化
  • 令牌使用图表

最佳实践

1. 使用 SSE 进行简单监控

# ✅ 好:简单的 SSE 流式传输
for event in client.stream(workflow_id):
    print(event.type)

2. 处理断开连接

# ✅ 好:重试逻辑
for attempt in range(3):
    try:
        for event in client.stream(workflow_id):
            process_event(event)
        break
    except (ShannonError, ConnectionError):
        if attempt == 2:
            raise
        time.sleep(2)

3. 过滤不必要的事件

# ✅ 好:仅关键事件
critical_events = ['PROGRESS', 'WORKFLOW_COMPLETED', 'ERROR_OCCURRED']
for event in client.stream(workflow_id):
    if event.type in critical_events:
        handle_event(event)

4. 设置超时

# ✅ 好:超时保护
import signal

def timeout_handler(signum, frame):
    raise TimeoutError("流超时")

signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(300)  # 5 分钟超时

try:
    for event in client.stream(workflow_id):
        print(event)
finally:
    signal.alarm(0)  # 取消警报

下一步