Skip to main content

Overview

Shannon provides real-time event streaming so you can monitor task execution as it happens. This is essential for:
  • Providing live feedback to users
  • Debugging workflow execution
  • Building interactive UIs
  • Monitoring long-running tasks

Streaming Technologies

Shannon supports two streaming protocols:
ProtocolUse CaseFeatures
SSE (Server-Sent Events)One-way server→clientSimple, HTTP-based, auto-reconnect
WebSocketBidirectionalFull duplex, lower latency
SSE is recommended for most use cases due to its simplicity and built-in reconnection handling.

Server-Sent Events (SSE)

Using cURL

curl -N http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}
Output:
event: WORKFLOW_STARTED
data: {"workflow_id":"task-123","agent_id":"orchestrator","message":"Task processing started"}

event: DATA_PROCESSING
data: {"workflow_id":"task-123","message":"Preparing context"}

event: PROGRESS
data: {"workflow_id":"task-123","agent_id":"planner","message":"Created a plan with 3 steps"}

event: AGENT_STARTED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Processing query"}

event: LLM_PROMPT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Searching for information..."}

event: TOOL_INVOKED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Calling web_search with query='topic A'"}

event: TOOL_OBSERVATION
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Found 5 results"}

event: LLM_OUTPUT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Research complete: Found relevant information"}

event: AGENT_COMPLETED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Task done"}

event: WORKFLOW_COMPLETED
data: {"workflow_id":"task-123","message":"Final synthesized result ready"}

Using Python SDK

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# Submit task
handle = client.submit_task(
    query="Research AI trends and create summary"
)

# Stream events
for event in client.stream(handle.workflow_id):
    print(f"[{event.type}] {event.message}")

    if event.type == "WORKFLOW_COMPLETED":
        print(f"Task completed: {event.message}")
        break

Event Types

Shannon emits 26 event types across different categories. Here are the most commonly used ones:

Core Workflow Events

Event TypeDescriptionExample Message
WORKFLOW_STARTEDTask processing started"Task processing started"
WORKFLOW_COMPLETEDWorkflow finished successfully"All done"
AGENT_STARTEDAgent began processing"Processing query"
AGENT_COMPLETEDAgent finished"Task done"
STATUS_UPDATEStatus update/progress"Planning next step"
ERROR_OCCURREDError during execution"Failed to connect: timeout"

LLM & Tool Events

Event TypeDescriptionExample Message
LLM_PROMPTPrompt sent to LLM"What is 5 + 5?"
LLM_OUTPUTComplete LLM response"5 + 5 equals 10"
LLM_PARTIALStreaming chunk (often filtered)"5 + 5"
TOOL_INVOKEDTool execution started"Calling database_query"
TOOL_OBSERVATIONTool result/output"Query returned 42 rows"

Progress & Status Events

Event TypeDescriptionExample Message
PROGRESSStep completion update"Created a plan with 3 steps"
DATA_PROCESSINGProcessing/analyzing data"Preparing context"
DELEGATIONTask delegated to another agent"Handing off to simple task"

Multi-Agent Events (requires feature gates)

Event TypeDescriptionExample Message
MESSAGE_SENTAgent sent message (requires p2p_v1)"Please analyze section 3"
MESSAGE_RECEIVEDAgent received message (requires p2p_v1)"Received task"
TEAM_RECRUITEDAgent recruited (requires dynamic_team_v1)"Summarize section 3"
TEAM_RETIREDAgent retired (requires dynamic_team_v1)"Task completed"
For a complete list of all 26 event types, see the Event Types Reference.

WebSocket Streaming

Connect via WebSocket

import asyncio
import websockets
import json

async def stream_task():
    uri = f"ws://localhost:8080/api/v1/stream/ws?workflow_id={workflow_id}"

    # Pass API key via headers (gateway requires header-based auth)
    async with websockets.connect(
        uri, extra_headers={'X-API-Key': 'sk_test_123456'}
    ) as websocket:
        while True:
            message = await websocket.recv()
            event = json.loads(message)
            print(f"Event: {event['type']}")

            if event['type'] == 'WORKFLOW_COMPLETED':
                break

asyncio.run(stream_task())
WebSocket streaming is currently server-to-client only. Use the REST API /api/v1/tasks/{id}/cancel to cancel tasks.

Filtering Events

Filter events by type to reduce noise:
# Only show important events
for event in client.stream(workflow_id):
    if event.type in ['PROGRESS', 'AGENT_COMPLETED', 'WORKFLOW_COMPLETED']:
        print(f"{event.type}: {event.message}")

Progress Tracking

Calculate task progress from events:
def track_progress(workflow_id):
    agents_started = 0
    agents_completed = 0

    for event in client.stream(workflow_id):
        if event.type == 'PROGRESS':
            # Track progress updates from planner
            print(f"Progress: {event.message}")

        elif event.type == 'AGENT_STARTED':
            agents_started += 1
            print(f"Agent started: {event.agent_id}")

        elif event.type == 'AGENT_COMPLETED':
            agents_completed += 1
            if agents_started > 0:
                progress = (agents_completed / agents_started) * 100
                print(f"Progress: {progress:.1f}% ({agents_completed}/{agents_started})")

        elif event.type == 'WORKFLOW_COMPLETED':
            print("✅ Task complete!")
            break

track_progress(handle.workflow_id)
Output:
Progress: Created a plan with 3 steps
Agent started: research-agent
Agent started: analysis-agent
Agent started: writer-agent
Progress: 33.3% (1/3)
Progress: 66.7% (2/3)
Progress: 100.0% (3/3)
✅ Task complete!

Real-Time UI Example

React Component

import { useEffect, useState } from 'react';

function TaskMonitor({ taskId, apiKey }) {
  const [events, setEvents] = useState([]);
  const [progress, setProgress] = useState(0);

  useEffect(() => {
    const params = new URLSearchParams({ workflow_id: taskId });

    // Note: Browser EventSource doesn't support custom headers.
    // Production: initiate SSE from a backend that injects auth headers,
    // or use GATEWAY_SKIP_AUTH=1 in development.

    const eventSource = new EventSource(
      `http://localhost:8080/api/v1/stream/sse?${params}`
    );

    const handleEvent = (e) => {
      const event = JSON.parse(e.data);
      setEvents(prev => [...prev, event]);

      // Update progress based on workflow lifecycle
      if (event.type === 'WORKFLOW_STARTED') {
        setProgress(10);
      } else if (event.type === 'PROGRESS') {
        setProgress(prev => Math.min(prev + 20, 80));
      } else if (event.type === 'WORKFLOW_COMPLETED') {
        setProgress(100);
      }
    };

    // Listen to specific event types (Shannon uses named SSE events)
    const eventTypes = [
      'WORKFLOW_STARTED',
      'WORKFLOW_COMPLETED',
      'AGENT_STARTED',
      'AGENT_COMPLETED',
      'STATUS_UPDATE',
      'PROGRESS',
      'ERROR_OCCURRED',
      'LLM_OUTPUT',
      'TOOL_INVOKED',
      'TOOL_OBSERVATION'
    ];

    eventTypes.forEach(type => {
      eventSource.addEventListener(type, handleEvent);
    });

    // Fallback for unnamed events
    eventSource.onmessage = handleEvent;

    eventSource.onerror = () => {
      console.error('SSE connection error');
    };

    return () => {
      eventTypes.forEach(type => {
        eventSource.removeEventListener(type, handleEvent);
      });
      eventSource.close();
    };
  }, [taskId, apiKey]);

  return (
    <div>
      <progress value={progress} max={100} />
      <ul>
        {events.map((e, i) => (
          <li key={i}>
            <strong>[{e.type}]</strong>{' '}
            {e.agent_id && <span>({e.agent_id})</span>}{' '}
            {e.message}
          </li>
        ))}
      </ul>
    </div>
  );
}
Key Implementation Details:
  • Shannon emits named SSE events (e.g., event: AGENT_STARTED), so you must use addEventListener() for each event type
  • Browser EventSource API doesn’t support custom headers. Do not pass API keys via query params; use a backend proxy to inject headers or disable auth in development
  • Always clean up event listeners in the cleanup function to prevent memory leaks

Error Handling

Handle connection failures gracefully:
from shannon import ShannonClient
from shannon.errors import ShannonError, ConnectionError
import time

def robust_streaming(workflow_id, max_retries=3):
    client = ShannonClient(base_url="http://localhost:8080")

    for attempt in range(max_retries):
        try:
            for event in client.stream(workflow_id):
                print(f"Event: {event.type}")

                if event.type == 'WORKFLOW_COMPLETED':
                    # Get final status to retrieve result
                    status = client.get_status(workflow_id.replace('wf-', 'task-'))
                    return status.result

        except (ShannonError, ConnectionError) as e:
            print(f"Stream error (attempt {attempt + 1}): {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
            else:
                raise

Authentication with Streaming

When authentication is enabled, provide API key:

SSE with API Key

curl -N \
  -H "X-API-Key: sk_test_123456" \
  http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}

Python SDK with API Key

client = ShannonClient(
    base_url="http://localhost:8080",
    api_key="sk_test_123456"
)

# API key automatically included in all requests
for event in client.stream(workflow_id):
    print(event)

Performance Considerations

Event Buffering

Shannon buffers events to prevent overwhelming clients:
# config/shannon.yaml
streaming:
  buffer_size: 100  # Maximum queued events
  flush_interval_ms: 100  # Send batches every 100ms

Keepalive

SSE sends keepalive comments every 15 seconds to prevent timeout:
: keepalive
: keepalive
event: LLM_PROMPT
data: {"message":"..."}

Resource Cleanup

Always close streams when done:
# Iterate through events (automatically handles connection)
for event in client.stream(workflow_id):
    if event.type == 'WORKFLOW_COMPLETED':
        break
# No explicit close needed

Multi-Task Monitoring

Monitor multiple tasks simultaneously:
import asyncio

async def monitor_task(client, workflow_id):
    # Use sync stream() in async context
    for event in client.stream(workflow_id):
        print(f"[{workflow_id}] {event.type}")
        if event.type == 'WORKFLOW_COMPLETED':
            # Get final result from status
            task_id = workflow_id.replace('wf-', 'task-')
            status = client.get_status(task_id)
            return status.result

async def monitor_all(task_ids):
    client = AsyncShannonClient(base_url="http://localhost:8080")

    # Monitor all tasks in parallel
    results = await asyncio.gather(*[
        monitor_task(client, tid) for tid in task_ids
    ])

    return results

# Run
task_ids = ["task-1", "task-2", "task-3"]
results = asyncio.run(monitor_all(task_ids))

Dashboard Integration

Shannon’s built-in dashboard (http://localhost:2111) uses SSE for real-time updates:
  • Live event feed for active tasks
  • Real-time metrics updates
  • Agent status visualization
  • Token usage graphs

Best Practices

1. Use SSE for Simple Monitoring

# ✅ Good: Simple SSE streaming
for event in client.stream(workflow_id):
    print(event.type)

2. Handle Disconnections

# ✅ Good: Retry logic
for attempt in range(3):
    try:
        for event in client.stream(workflow_id):
            process_event(event)
        break
    except (ShannonError, ConnectionError):
        if attempt == 2:
            raise
        time.sleep(2)

3. Filter Unnecessary Events

# ✅ Good: Only critical events
critical_events = ['PROGRESS', 'WORKFLOW_COMPLETED', 'ERROR_OCCURRED']
for event in client.stream(workflow_id):
    if event.type in critical_events:
        handle_event(event)

4. Set Timeouts

# ✅ Good: Timeout protection
import signal

def timeout_handler(signum, frame):
    raise TimeoutError("Stream timeout")

signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(300)  # 5 minute timeout

try:
    for event in client.stream(workflow_id):
        print(event)
finally:
    signal.alarm(0)  # Cancel alarm

Next Steps