Overview
Shannon provides real-time event streaming through Server-Sent Events (SSE) and WebSocket protocols. Use streaming to monitor task execution, display progress, and receive results as they’re generated.
Authentication: Streaming endpoints require the same headers as other APIs.
Browsers cannot send custom headers with EventSource.
- Development: set
GATEWAY_SKIP_AUTH=1.
- Production: proxy SSE via your backend and inject
X-API-Key or Bearer headers.
Do not pass API keys in URL query parameters.
Endpoints
| Method | Endpoint | Protocol | Description |
POST | /api/v1/tasks/stream | HTTP+SSE | Submit task and get stream URL (recommended) |
GET | /api/v1/stream/sse | SSE | Server-Sent Events endpoint |
GET | /api/v1/stream/ws | WebSocket | WebSocket streaming endpoint |
GET | /api/v1/tasks/{id}/events | HTTP | Get historical events (paginated) |
Unified Submit + Stream (Recommended)
POST /api/v1/tasks/stream
The easiest way to submit a task and immediately start streaming its events. This endpoint combines task submission with streaming setup in one call.
Best for frontend applications: This endpoint is perfect for real-time UIs where you want to show progress immediately after submitting a task.
Authentication
Required: Yes
X-API-Key: sk_test_123456
Or:
Authorization: Bearer YOUR_TOKEN
Request Body
| Parameter | Type | Required | Description |
query | string | Yes | Natural language task description |
session_id | string | No | Session identifier for multi-turn conversations |
context | object | No | Additional context data as key-value pairs |
model_tier | string | No | Preferred tier: small, medium, or large |
model_override | string | No | Specific model name (canonical; e.g., gpt-5) |
provider_override | string | No | Force provider (e.g., openai, anthropic, google) |
Response
Status: 201 Created
Body:
{
"task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"workflow_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"stream_url": "/api/v1/stream/sse?workflow_id=task_01HQZX3Y9K8M2P4N5S7T9W2V"
}
Response Fields
| Field | Type | Description |
workflow_id | string | Task/workflow identifier |
stream_url | string | Relative URL to SSE stream endpoint |
Example: JavaScript/TypeScript
async function submitAndStream(query, onEvent, onComplete, onError) {
try {
// 1. Submit task and get stream URL
const response = await fetch('http://localhost:8080/api/v1/tasks/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer YOUR_TOKEN'
},
body: JSON.stringify({
query: query,
session_id: `session-${Date.now()}`
})
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const { task_id, workflow_id, stream_url } = await response.json();
console.log(`Task submitted: ${workflow_id}`);
// 2. Connect to SSE stream
const eventSource = new EventSource(
`http://localhost:8080${stream_url}`,
{ withCredentials: false }
);
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
onEvent(event);
// Check for completion
if (event.type === 'WORKFLOW_COMPLETED') {
eventSource.close();
onComplete(event);
}
};
eventSource.onerror = (err) => {
console.error('SSE error:', err);
eventSource.close();
onError(err);
};
return { workflow_id, eventSource };
} catch (error) {
onError(error);
throw error;
}
}
// Usage
submitAndStream(
"Analyze Q4 revenue trends",
(event) => console.log(`[${event.type}]`, event.message),
(final) => console.log("Completed:", final.result),
(error) => console.error("Error:", error)
);
Example: React Hook
import { useState, useCallback, useRef } from 'react';
function useTaskStream(apiUrl = 'http://localhost:8080') {
const [events, setEvents] = useState([]);
const [isStreaming, setIsStreaming] = useState(false);
const [error, setError] = useState(null);
const [workflowId, setWorkflowId] = useState(null);
const eventSourceRef = useRef(null);
const submitTask = useCallback(async (query, sessionId = null) => {
setEvents([]);
setError(null);
setIsStreaming(true);
try {
// Submit task
const response = await fetch(`${apiUrl}/api/v1/tasks/stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${localStorage.getItem('token')}`
},
body: JSON.stringify({
query,
session_id: sessionId || `session-${Date.now()}`
})
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const { task_id, workflow_id, stream_url } = await response.json();
setWorkflowId(workflow_id);
// Connect to stream
const eventSource = new EventSource(`${apiUrl}${stream_url}`);
eventSourceRef.current = eventSource;
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
setEvents(prev => [...prev, event]);
if (event.type === 'WORKFLOW_COMPLETED' || event.type === 'ERROR_OCCURRED') {
eventSource.close();
setIsStreaming(false);
}
};
eventSource.onerror = (err) => {
setError(err);
setIsStreaming(false);
eventSource.close();
};
} catch (err) {
setError(err);
setIsStreaming(false);
}
}, [apiUrl]);
const stopStreaming = useCallback(() => {
if (eventSourceRef.current) {
eventSourceRef.current.close();
setIsStreaming(false);
}
}, []);
return { submitTask, stopStreaming, events, isStreaming, error, workflowId };
}
// Usage in component
function TaskStreamDemo() {
const { submitTask, stopStreaming, events, isStreaming, error, workflowId } = useTaskStream();
return (
<div>
<button
onClick={() => submitTask("What is 15 + 25?")}
disabled={isStreaming}
>
{isStreaming ? 'Processing...' : 'Submit Task'}
</button>
{workflowId && <p>Workflow ID: {workflowId}</p>}
{error && <p style={{color: 'red'}}>Error: {error.message}</p>}
<div>
{events.map((event, idx) => (
<div key={idx}>
<strong>{event.type}</strong>: {event.message || event.agent_id}
</div>
))}
</div>
{isStreaming && <button onClick={stopStreaming}>Stop</button>}
</div>
);
}
Example: Vue 3 Composition API
<script setup>
import { ref } from 'vue';
const events = ref([]);
const isStreaming = ref(false);
const error = ref(null);
const workflowId = ref(null);
let eventSource = null;
async function submitTask(query) {
events.value = [];
error.value = null;
isStreaming.value = true;
try {
const response = await fetch('http://localhost:8080/api/v1/tasks/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${localStorage.getItem('token')}`
},
body: JSON.stringify({
query,
session_id: `session-${Date.now()}`
})
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const { task_id, workflow_id, stream_url } = await response.json();
workflowId.value = workflow_id;
eventSource = new EventSource(`http://localhost:8080${stream_url}`);
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
events.value.push(event);
if (event.type === 'WORKFLOW_COMPLETED') {
eventSource.close();
isStreaming.value = false;
}
};
eventSource.onerror = (err) => {
error.value = err;
eventSource.close();
isStreaming.value = false;
};
} catch (err) {
error.value = err;
isStreaming.value = false;
}
}
function stopStreaming() {
if (eventSource) {
eventSource.close();
isStreaming.value = false;
}
}
</script>
<template>
<div>
<button @click="submitTask('Analyze data')" :disabled="isStreaming">
{{ isStreaming ? 'Processing...' : 'Submit Task' }}
</button>
<div v-if="workflowId">Workflow: {{ workflowId }}</div>
<div v-if="error" style="color: red">Error: {{ error.message }}</div>
<div v-for="(event, idx) in events" :key="idx">
<strong>{{ event.type }}</strong>: {{ event.message || event.agent_id }}
</div>
<button v-if="isStreaming" @click="stopStreaming">Stop</button>
</div>
</template>
Example: Python
import httpx
import json
def submit_and_stream(query: str, api_key: str):
"""Submit task and stream events."""
# 1. Submit task
response = httpx.post(
"http://localhost:8080/api/v1/tasks/stream",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json={
"query": query,
"session_id": f"session-{int(time.time())}"
}
)
data = response.json()
workflow_id = data["workflow_id"]
stream_url = data["stream_url"]
print(f"Task submitted: {workflow_id}")
# 2. Stream events
with httpx.stream(
"GET",
f"http://localhost:8080{stream_url}",
headers={"Authorization": f"Bearer {api_key}"},
timeout=None
) as stream_response:
for line in stream_response.iter_lines():
if line.startswith("data:"):
event = json.loads(line[5:])
print(f"[{event['type']}] {event.get('message', '')}")
if event['type'] in ['WORKFLOW_COMPLETED']:
return event
# Usage
result = submit_and_stream("What is the capital of France?", "sk_test_123456")
print("Final result:", result.get('result'))
Why use this endpoint? The unified endpoint ensures you start streaming immediately after submission, preventing any missed events that could occur if you submit and then connect separately.
Server-Sent Events (SSE)
GET /api/v1/stream/sse
Real-time event streaming using Server-Sent Events.
Authentication
Required: Yes
X-API-Key: sk_test_123456
Query Parameters
| Parameter | Type | Required | Description |
workflow_id | string | Yes | Task/workflow identifier |
types | string | No | Comma-separated event types to filter |
last_event_id | string | No | Resume from specific event ID. Accepts a Redis stream ID (e.g., 1700000000000-0) or a numeric sequence (e.g., 42). When numeric, replay includes events with seq > last_event_id. |
Each event follows SSE specification:
id: <event_id>
event: <event_type>
data: <json_payload>
Example Request
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task_abc123" \
-H "X-API-Key: sk_test_123456"
Example Response
id: 1
event: WORKFLOW_STARTED
data: {"workflow_id":"task_abc123","timestamp":"2025-10-22T10:30:00Z","message":"Workflow started"}
id: 2
event: AGENT_THINKING
data: {"workflow_id":"task_abc123","agent_id":"agent_1","message":"Analyzing query...","timestamp":"2025-10-22T10:30:01Z"}
id: 3
event: TOOL_INVOKED
data: {"workflow_id":"task_abc123","tool":"web_search","params":{"query":"Python programming"},"timestamp":"2025-10-22T10:30:02Z"}
id: 4
event: TOOL_OBSERVATION
data: {"workflow_id":"task_abc123","tool":"web_search","result":"...","timestamp":"2025-10-22T10:30:05Z"}
id: 5
event: AGENT_COMPLETED
data: {"workflow_id":"task_abc123","agent_id":"agent_1","result":"Python is a high-level programming language...","timestamp":"2025-10-22T10:30:06Z"}
id: 6
event: WORKFLOW_COMPLETED
data: {"workflow_id":"task_abc123","status":"COMPLETED","result":"...","timestamp":"2025-10-22T10:30:07Z"}
WebSocket
GET /api/v1/stream/ws
Bidirectional streaming via WebSocket.
Authentication
The gateway authenticates WebSocket connections via headers only (X-API-Key or Authorization). Browsers cannot set custom headers during the WebSocket handshake. For browser usage:
- Run locally with
GATEWAY_SKIP_AUTH=1, or
- Use a reverse proxy that injects the header before forwarding to the gateway.
Header-based examples for server environments:
Node (ws):
import WebSocket from 'ws';
const ws = new WebSocket('ws://localhost:8080/api/v1/stream/ws', {
headers: { 'X-API-Key': 'sk_test_123456' },
});
ws.on('open', () => {
ws.send(JSON.stringify({ type: 'subscribe', workflow_id: 'task_abc123' }));
});
ws.on('message', (msg) => {
const data = JSON.parse(msg.toString());
console.log('Event:', data.type, data.message);
});
ws.on('error', (err) => console.error('WebSocket error:', err));
Python (websockets):
import asyncio, json, websockets
async def main():
async with websockets.connect(
'ws://localhost:8080/api/v1/stream/ws',
extra_headers={'X-API-Key': 'sk_test_123456'},
) as ws:
await ws.send(json.dumps({'type': 'subscribe', 'workflow_id': 'task_abc123'}))
async for message in ws:
data = json.loads(message)
print('Event:', data.get('type'), data.get('message'))
asyncio.run(main())
Passing the API key in the query string or via an “auth” message after connect is not supported by the gateway.
Message Types
Client → Server:
// Subscribe to workflow
{
"type": "subscribe",
"workflow_id": "task_abc123",
"types": ["AGENT_THINKING", "TOOL_INVOKED"]
}
// Unsubscribe
{
"type": "unsubscribe",
"workflow_id": "task_abc123"
}
// Ping (keep-alive)
{
"type": "ping"
}
Server → Client:
// Event
{
"type": "AGENT_THINKING",
"workflow_id": "task_abc123",
"message": "Analyzing query...",
"timestamp": "2025-10-22T10:30:00Z"
}
// Pong (keep-alive response)
{
"type": "pong"
}
Event Types
Core Events
| Event Type | Description | When Fired |
WORKFLOW_STARTED | Workflow execution began | Start of task |
WORKFLOW_COMPLETED | Workflow finished successfully | End of task (success) |
Agent Events
| Event Type | Description | Payload Fields |
AGENT_THINKING | Agent reasoning/planning | agent_id, message |
AGENT_COMPLETED | Agent finished execution | agent_id, result |
| Event Type | Description | Payload Fields |
TOOL_INVOKED | Tool execution started | tool, params |
TOOL_OBSERVATION | Agent observes tool result | tool, result |
LLM Events
| Event Type | Description | Payload Fields |
LLM_PROMPT | Prompt sent to LLM | text |
LLM_PARTIAL | Streaming LLM output | text |
LLM_OUTPUT | Final LLM output | text |
Progress & System Events
| Event Type | Description | Payload Fields |
PROGRESS | Progress update | progress, message |
DATA_PROCESSING | Data processing in progress | message |
WAITING | Waiting for resources | message |
ERROR_OCCURRED | Error occurred | error, severity |
ERROR_RECOVERY | Error recovery attempt | message |
WORKSPACE_UPDATED | Memory/context updated | message |
Team & Approvals
| Event Type | Description |
TEAM_RECRUITED | Multi-agent team assembled |
TEAM_RETIRED | Team disbanded |
TEAM_STATUS | Team coordination update |
ROLE_ASSIGNED | Agent role assigned |
DELEGATION | Task delegated |
DEPENDENCY_SATISFIED | Dependency resolved |
APPROVAL_REQUESTED | Human approval needed |
APPROVAL_DECISION | Approval decision recorded |
Code Examples
Python with httpx (SSE)
import httpx
import json
def stream_task_events(task_id: str, api_key: str):
"""Stream task events using SSE."""
url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"
with httpx.stream(
"GET",
url,
headers={"X-API-Key": api_key},
timeout=None # No timeout for streaming
) as response:
for line in response.iter_lines():
if line.startswith("data:"):
data = json.loads(line[5:]) # Remove "data:" prefix
yield data
# Usage
for event in stream_task_events("task_abc123", "sk_test_123456"):
print(f"[{event.get('type')}] {event.get('message', '')}")
if event.get('type') == 'WORKFLOW_COMPLETED':
print("Final result:", event.get('result'))
break
Python - Stream with Event Filtering
def stream_filtered_events(task_id: str, api_key: str, event_types: list):
"""Stream only specific event types."""
types_param = ",".join(event_types)
url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}&types={types_param}"
with httpx.stream("GET", url, headers={"X-API-Key": api_key}) as response:
for line in response.iter_lines():
if line.startswith("data:"):
yield json.loads(line[5:])
# Only receive agent thinking and tool events
for event in stream_filtered_events(
"task_abc123",
"sk_test_123456",
["AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"]
):
print(f"{event['type']}: {event.get('message', event.get('tool'))}")
JavaScript/Node.js (SSE)
const EventSource = require('eventsource');
function streamTaskEvents(taskId, apiKey) {
const url = `http://localhost:8080/api/v1/stream/sse?workflow_id=${taskId}`;
const eventSource = new EventSource(url, {
headers: {
'X-API-Key': apiKey
}
});
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(`[${data.type}] ${data.message || ''}`);
if (data.type === 'WORKFLOW_COMPLETED') {
console.log('Final result:', data.result);
eventSource.close();
}
};
eventSource.onerror = (error) => {
console.error('SSE error:', error);
eventSource.close();
};
return eventSource;
}
// Usage
const stream = streamTaskEvents('task_abc123', 'sk_test_123456');
// Close manually
setTimeout(() => stream.close(), 60000);
JavaScript/Node.js - WebSocket (ws)
import WebSocket from 'ws';
function connectWebSocket(taskId, apiKey) {
const ws = new WebSocket(
`ws://localhost:8080/api/v1/stream/ws?workflow_id=${taskId}`,
{ headers: { 'X-API-Key': apiKey } }
);
ws.on('open', () => console.log('✓ Connected'));
ws.on('message', (msg) => {
const data = JSON.parse(msg.toString());
switch (data.type) {
case 'AGENT_THINKING':
console.log(`💭 ${data.message}`);
break;
case 'TOOL_INVOKED':
console.log(`🔧 Tool: ${data.tool}`);
break;
case 'TOOL_OBSERVATION':
console.log(`✓ Result: ${data.result}`);
break;
case 'WORKFLOW_COMPLETED':
console.log(`✓ Done: ${data.result}`);
ws.close();
break;
default:
console.log(`[${data.type}] ${data.message || ''}`);
}
});
ws.on('error', (err) => console.error('❌ Error:', err));
ws.on('close', () => console.log('Connection closed'));
return ws;
}
const ws = connectWebSocket('task_abc123', 'sk_test_123456');
Go (SSE)
package main
import (
"bufio"
"encoding/json"
"fmt"
"net/http"
"strings"
)
type Event struct {
Type string `json:"type"`
WorkflowID string `json:"workflow_id"`
Message string `json:"message,omitempty"`
Data map[string]interface{} `json:",inline"`
}
func streamEvents(taskID, apiKey string) error {
url := fmt.Sprintf(
"http://localhost:8080/api/v1/stream/sse?workflow_id=%s",
taskID,
)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("X-API-Key", apiKey)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "data:") {
data := strings.TrimPrefix(line, "data:")
var event Event
json.Unmarshal([]byte(data), &event)
fmt.Printf("[%s] %s\n", event.Type, event.Message)
if event.Type == "WORKFLOW_COMPLETED" {
fmt.Println("✓ Task completed")
break
}
}
}
return scanner.Err()
}
func main() {
err := streamEvents("task_abc123", "sk_test_123456")
if err != nil {
fmt.Println("Error:", err)
}
}
Bash/curl (SSE)
#!/bin/bash
API_KEY="sk_test_123456"
TASK_ID="$1"
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=$TASK_ID" \
-H "X-API-Key: $API_KEY" \
| while IFS= read -r line; do
if [[ $line == data:* ]]; then
# Extract JSON from "data: {...}"
JSON="${line#data:}"
# Parse and display
TYPE=$(echo "$JSON" | jq -r '.type')
MESSAGE=$(echo "$JSON" | jq -r '.message // ""')
echo "[$(date +%T)] $TYPE: $MESSAGE"
# Exit on completion
if [[ "$TYPE" == "WORKFLOW_COMPLETED" ]]; then
echo ""
echo "$JSON" | jq -r '.result'
break
fi
fi
done
Use Cases
1. Real-Time Progress Display
def display_progress(task_id: str, api_key: str):
"""Display real-time progress to user."""
print(f"Task {task_id} starting...")
for event in stream_task_events(task_id, api_key):
event_type = event.get('type')
if event_type == 'AGENT_THINKING':
print(f"💭 {event['message']}")
elif event_type == 'TOOL_INVOKED':
print(f"🔧 Using tool: {event['tool']}")
elif event_type == 'TOOL_OBSERVATION':
print(f"✓ Tool completed")
elif event_type == 'LLM_PARTIAL':
print(event['text'], end='', flush=True)
elif event_type == 'WORKFLOW_COMPLETED':
print(f"\n\n✓ Done!")
return event['result']
2. Log All Events to File
import json
from datetime import datetime
def log_events_to_file(task_id: str, api_key: str, log_file: str):
"""Log all events to JSON Lines file."""
with open(log_file, 'a') as f:
for event in stream_task_events(task_id, api_key):
# Add timestamp
event['logged_at'] = datetime.now().isoformat()
# Write as JSON Lines
f.write(json.dumps(event) + '\n')
f.flush()
if event.get('type') == 'WORKFLOW_COMPLETED':
break
log_events_to_file("task_abc123", "sk_test_123456", "task_events.jsonl")
def collect_tool_metrics(task_id: str, api_key: str):
"""Collect metrics about tool usage."""
metrics = {
"tools_invoked": 0,
"tools_succeeded": 0,
"tools_failed": 0,
"tool_list": []
}
for event in stream_task_events(task_id, api_key):
if event.get('type') == 'TOOL_INVOKED':
metrics['tools_invoked'] += 1
metrics['tool_list'].append(event['tool'])
elif event.get('type') == 'TOOL_OBSERVATION':
metrics['tools_succeeded'] += 1
elif event.get('type') == 'ERROR_OCCURRED':
metrics['tools_failed'] += 1
elif event.get('type') == 'WORKFLOW_COMPLETED':
break
return metrics
metrics = collect_tool_metrics("task_abc123", "sk_test_123456")
print(f"Tools used: {metrics['tool_list']}")
print(f"Success rate: {metrics['tools_succeeded']}/{metrics['tools_invoked']}")
4. React UI Integration
import { useState, useEffect } from 'react';
function TaskMonitor({ taskId, apiKey }) {
const [events, setEvents] = useState([]);
const [status, setStatus] = useState('connecting');
useEffect(() => {
const eventSource = new EventSource(
`http://localhost:8080/api/v1/stream/sse?workflow_id=${taskId}`,
{ headers: { 'X-API-Key': apiKey } }
);
eventSource.onopen = () => setStatus('connected');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
setEvents(prev => [...prev, data]);
if (data.type === 'WORKFLOW_COMPLETED') {
setStatus('completed');
eventSource.close();
}
};
eventSource.onerror = () => {
setStatus('error');
eventSource.close();
};
return () => eventSource.close();
}, [taskId, apiKey]);
return (
<div>
<h3>Status: {status}</h3>
<ul>
{events.map((event, i) => (
<li key={i}>
<strong>{event.type}</strong>: {event.message || event.tool}
</li>
))}
</ul>
</div>
);
}
Best Practices
1. Handle Connection Errors
import time
def stream_with_retry(task_id: str, api_key: str, max_retries: int = 3):
"""Stream with automatic retry on connection failure."""
for attempt in range(max_retries):
try:
for event in stream_task_events(task_id, api_key):
yield event
if event.get('type') == 'WORKFLOW_COMPLETED':
return
except Exception as e:
if attempt < max_retries - 1:
print(f"Connection failed, retrying in {2 ** attempt}s...")
time.sleep(2 ** attempt)
else:
raise
2. Implement Timeout
import signal
def stream_with_timeout(task_id: str, api_key: str, timeout: int = 300):
"""Stream with timeout."""
def timeout_handler(signum, frame):
raise TimeoutError("Stream timeout")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
try:
for event in stream_task_events(task_id, api_key):
yield event
if event.get('type') == 'WORKFLOW_COMPLETED':
break
finally:
signal.alarm(0) # Cancel alarm
3. Filter Events Client-Side
def stream_specific_events(task_id: str, api_key: str, event_types: set):
"""Filter events client-side."""
for event in stream_task_events(task_id, api_key):
if event.get('type') in event_types:
yield event
if event.get('type') == 'WORKFLOW_COMPLETED':
yield event
break
# Only receive agent and tool events
for event in stream_specific_events(
"task_abc123",
"sk_test_123456",
{"AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"}
):
print(event)
4. Resume from Last Event
def stream_with_resume(task_id: str, api_key: str, last_event_id: str = None):
"""Resume streaming from specific event."""
url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"
if last_event_id:
url += f"&last_event_id={last_event_id}"
# Stream events...
Note: last_event_id accepts either a Redis stream ID (e.g., 1700000000000-0) or a numeric sequence (e.g., 42). When numeric, replay includes events with seq > last_event_id.
Comparison: SSE vs WebSocket vs Polling
| Feature | SSE | WebSocket | Polling |
| Direction | Server → Client | Bidirectional | Client → Server |
| Protocol | HTTP | WebSocket | HTTP |
| Auto-reconnect | Yes (browser) | No (manual) | N/A |
| Overhead | Low | Very Low | High |
| Simplicity | High | Medium | High |
| Use Case | Real-time updates | Interactive apps | Simple status |
| Shannon Support | ✅ Recommended | ✅ Available | ⚠️ Not ideal |
When to Use Each
- SSE: Most use cases, real-time monitoring, progress display
- WebSocket: Interactive applications, bidirectional communication needed
- Polling (GET /api/v1/tasks/): Legacy systems, no streaming support
Notes
Event Retention: Events are stored in Redis for 24 hours. Use last_event_id to resume streaming if connection drops.
Connection Limits: Maximum 100 concurrent streaming connections per API key. Consider multiplexing multiple workflows over a single WebSocket.