Overview
Shannon provides real-time event streaming so you can monitor task execution as it happens. This is essential for:
- Providing live feedback to users
- Debugging workflow execution
- Building interactive UIs
- Monitoring long-running tasks
Streaming Technologies
Shannon supports two streaming protocols:
| Protocol | Use Case | Features |
|---|
| SSE (Server-Sent Events) | One-way server→client | Simple, HTTP-based, auto-reconnect |
| WebSocket | Bidirectional | Full duplex, lower latency |
SSE is recommended for most use cases due to its simplicity and built-in reconnection handling.
Server-Sent Events (SSE)
Using cURL
curl -N http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}
Output:
event: WORKFLOW_STARTED
data: {"workflow_id":"task-123","agent_id":"orchestrator","message":"Task processing started"}
event: DATA_PROCESSING
data: {"workflow_id":"task-123","message":"Preparing context"}
event: PROGRESS
data: {"workflow_id":"task-123","agent_id":"planner","message":"Created a plan with 3 steps"}
event: AGENT_STARTED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Processing query"}
event: LLM_PROMPT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Searching for information..."}
event: TOOL_INVOKED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Calling web_search with query='topic A'"}
event: TOOL_OBSERVATION
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Found 5 results"}
event: LLM_OUTPUT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Research complete: Found relevant information"}
event: AGENT_COMPLETED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Task done"}
event: WORKFLOW_COMPLETED
data: {"workflow_id":"task-123","message":"Final synthesized result ready"}
Using Python SDK
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# Submit task
handle = client.submit_task(
query="Research AI trends and create summary"
)
# Stream events
for event in client.stream(handle.workflow_id):
print(f"[{event.type}] {event.message}")
if event.type == "WORKFLOW_COMPLETED":
print(f"Task completed: {event.message}")
break
Event Types
Shannon emits 26 event types across different categories. Here are the most commonly used ones:
Core Workflow Events
| Event Type | Description | Example Message |
|---|
WORKFLOW_STARTED | Task processing started | "Task processing started" |
WORKFLOW_COMPLETED | Workflow finished successfully | "All done" |
AGENT_STARTED | Agent began processing | "Processing query" |
AGENT_COMPLETED | Agent finished | "Task done" |
STATUS_UPDATE | Status update/progress | "Planning next step" |
ERROR_OCCURRED | Error during execution | "Failed to connect: timeout" |
| Event Type | Description | Example Message |
|---|
LLM_PROMPT | Prompt sent to LLM | "What is 5 + 5?" |
LLM_OUTPUT | Complete LLM response | "5 + 5 equals 10" |
LLM_PARTIAL | Streaming chunk (often filtered) | "5 + 5" |
TOOL_INVOKED | Tool execution started | "Calling database_query" |
TOOL_OBSERVATION | Tool result/output | "Query returned 42 rows" |
Progress & Status Events
| Event Type | Description | Example Message |
|---|
PROGRESS | Step completion update | "Created a plan with 3 steps" |
DATA_PROCESSING | Processing/analyzing data | "Preparing context" |
DELEGATION | Task delegated to another agent | "Handing off to simple task" |
Multi-Agent Events (requires feature gates)
| Event Type | Description | Example Message |
|---|
MESSAGE_SENT | Agent sent message (requires p2p_v1) | "Please analyze section 3" |
MESSAGE_RECEIVED | Agent received message (requires p2p_v1) | "Received task" |
TEAM_RECRUITED | Agent recruited (requires dynamic_team_v1) | "Summarize section 3" |
TEAM_RETIRED | Agent retired (requires dynamic_team_v1) | "Task completed" |
WebSocket Streaming
Connect via WebSocket
import asyncio
import websockets
import json
async def stream_task():
uri = f"ws://localhost:8080/api/v1/stream/ws?workflow_id={workflow_id}"
# Pass API key via headers (gateway requires header-based auth)
async with websockets.connect(
uri, extra_headers={'X-API-Key': 'sk_test_123456'}
) as websocket:
while True:
message = await websocket.recv()
event = json.loads(message)
print(f"Event: {event['type']}")
if event['type'] == 'WORKFLOW_COMPLETED':
break
asyncio.run(stream_task())
WebSocket streaming is currently server-to-client only. Use the REST API /api/v1/tasks/{id}/cancel to cancel tasks.
Filtering Events
Filter events by type to reduce noise:
# Only show important events
for event in client.stream(workflow_id):
if event.type in ['PROGRESS', 'AGENT_COMPLETED', 'WORKFLOW_COMPLETED']:
print(f"{event.type}: {event.message}")
Progress Tracking
Calculate task progress from events:
def track_progress(workflow_id):
agents_started = 0
agents_completed = 0
for event in client.stream(workflow_id):
if event.type == 'PROGRESS':
# Track progress updates from planner
print(f"Progress: {event.message}")
elif event.type == 'AGENT_STARTED':
agents_started += 1
print(f"Agent started: {event.agent_id}")
elif event.type == 'AGENT_COMPLETED':
agents_completed += 1
if agents_started > 0:
progress = (agents_completed / agents_started) * 100
print(f"Progress: {progress:.1f}% ({agents_completed}/{agents_started})")
elif event.type == 'WORKFLOW_COMPLETED':
print("✅ Task complete!")
break
track_progress(handle.workflow_id)
Output:
Progress: Created a plan with 3 steps
Agent started: research-agent
Agent started: analysis-agent
Agent started: writer-agent
Progress: 33.3% (1/3)
Progress: 66.7% (2/3)
Progress: 100.0% (3/3)
✅ Task complete!
Real-Time UI Example
React Component
import { useEffect, useState } from 'react';
function TaskMonitor({ taskId, apiKey }) {
const [events, setEvents] = useState([]);
const [progress, setProgress] = useState(0);
useEffect(() => {
const params = new URLSearchParams({ workflow_id: taskId });
// Note: Browser EventSource doesn't support custom headers.
// Production: initiate SSE from a backend that injects auth headers,
// or use GATEWAY_SKIP_AUTH=1 in development.
const eventSource = new EventSource(
`http://localhost:8080/api/v1/stream/sse?${params}`
);
const handleEvent = (e) => {
const event = JSON.parse(e.data);
setEvents(prev => [...prev, event]);
// Update progress based on workflow lifecycle
if (event.type === 'WORKFLOW_STARTED') {
setProgress(10);
} else if (event.type === 'PROGRESS') {
setProgress(prev => Math.min(prev + 20, 80));
} else if (event.type === 'WORKFLOW_COMPLETED') {
setProgress(100);
}
};
// Listen to specific event types (Shannon uses named SSE events)
const eventTypes = [
'WORKFLOW_STARTED',
'WORKFLOW_COMPLETED',
'AGENT_STARTED',
'AGENT_COMPLETED',
'STATUS_UPDATE',
'PROGRESS',
'ERROR_OCCURRED',
'LLM_OUTPUT',
'TOOL_INVOKED',
'TOOL_OBSERVATION'
];
eventTypes.forEach(type => {
eventSource.addEventListener(type, handleEvent);
});
// Fallback for unnamed events
eventSource.onmessage = handleEvent;
eventSource.onerror = () => {
console.error('SSE connection error');
};
return () => {
eventTypes.forEach(type => {
eventSource.removeEventListener(type, handleEvent);
});
eventSource.close();
};
}, [taskId, apiKey]);
return (
<div>
<progress value={progress} max={100} />
<ul>
{events.map((e, i) => (
<li key={i}>
<strong>[{e.type}]</strong>{' '}
{e.agent_id && <span>({e.agent_id})</span>}{' '}
{e.message}
</li>
))}
</ul>
</div>
);
}
Key Implementation Details:
- Shannon emits named SSE events (e.g.,
event: AGENT_STARTED), so you must use addEventListener() for each event type
- Browser
EventSource API doesn’t support custom headers. Do not pass API keys via query params; use a backend proxy to inject headers or disable auth in development
- Always clean up event listeners in the cleanup function to prevent memory leaks
Error Handling
Handle connection failures gracefully:
from shannon import ShannonClient
from shannon.errors import ShannonError, ConnectionError
import time
def robust_streaming(workflow_id, max_retries=3):
client = ShannonClient(base_url="http://localhost:8080")
for attempt in range(max_retries):
try:
for event in client.stream(workflow_id):
print(f"Event: {event.type}")
if event.type == 'WORKFLOW_COMPLETED':
# Get final status to retrieve result
status = client.get_status(workflow_id.replace('wf-', 'task-'))
return status.result
except (ShannonError, ConnectionError) as e:
print(f"Stream error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
raise
Authentication with Streaming
When authentication is enabled, provide API key:
SSE with API Key
curl -N \
-H "X-API-Key: sk_test_123456" \
http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}
Python SDK with API Key
client = ShannonClient(
base_url="http://localhost:8080",
api_key="sk_test_123456"
)
# API key automatically included in all requests
for event in client.stream(workflow_id):
print(event)
Event Buffering
Shannon buffers events to prevent overwhelming clients:
# config/shannon.yaml
streaming:
buffer_size: 100 # Maximum queued events
flush_interval_ms: 100 # Send batches every 100ms
Keepalive
SSE sends keepalive comments every 15 seconds to prevent timeout:
: keepalive
: keepalive
event: LLM_PROMPT
data: {"message":"..."}
Resource Cleanup
Always close streams when done:
# Iterate through events (automatically handles connection)
for event in client.stream(workflow_id):
if event.type == 'WORKFLOW_COMPLETED':
break
# No explicit close needed
Multi-Task Monitoring
Monitor multiple tasks simultaneously:
import asyncio
async def monitor_task(client, workflow_id):
# Use sync stream() in async context
for event in client.stream(workflow_id):
print(f"[{workflow_id}] {event.type}")
if event.type == 'WORKFLOW_COMPLETED':
# Get final result from status
task_id = workflow_id.replace('wf-', 'task-')
status = client.get_status(task_id)
return status.result
async def monitor_all(task_ids):
client = AsyncShannonClient(base_url="http://localhost:8080")
# Monitor all tasks in parallel
results = await asyncio.gather(*[
monitor_task(client, tid) for tid in task_ids
])
return results
# Run
task_ids = ["task-1", "task-2", "task-3"]
results = asyncio.run(monitor_all(task_ids))
Dashboard Integration
Shannon’s built-in dashboard (http://localhost:2111) uses SSE for real-time updates:
- Live event feed for active tasks
- Real-time metrics updates
- Agent status visualization
- Token usage graphs
Best Practices
1. Use SSE for Simple Monitoring
# ✅ Good: Simple SSE streaming
for event in client.stream(workflow_id):
print(event.type)
2. Handle Disconnections
# ✅ Good: Retry logic
for attempt in range(3):
try:
for event in client.stream(workflow_id):
process_event(event)
break
except (ShannonError, ConnectionError):
if attempt == 2:
raise
time.sleep(2)
3. Filter Unnecessary Events
# ✅ Good: Only critical events
critical_events = ['PROGRESS', 'WORKFLOW_COMPLETED', 'ERROR_OCCURRED']
for event in client.stream(workflow_id):
if event.type in critical_events:
handle_event(event)
4. Set Timeouts
# ✅ Good: Timeout protection
import signal
def timeout_handler(signum, frame):
raise TimeoutError("Stream timeout")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(300) # 5 minute timeout
try:
for event in client.stream(workflow_id):
print(event)
finally:
signal.alarm(0) # Cancel alarm
Next Steps