Skip to main content

Overview

Shannon provides real-time event streaming through Server-Sent Events (SSE) and WebSocket protocols. Use streaming to monitor task execution, display progress, and receive results as they’re generated.
Authentication: Streaming endpoints require the same headers as other APIs. Browsers cannot send custom headers with EventSource.
  • Development: set GATEWAY_SKIP_AUTH=1.
  • Production: proxy SSE via your backend and inject X-API-Key or Bearer headers. Do not pass API keys in URL query parameters.

Endpoints

MethodEndpointProtocolDescription
POST/api/v1/tasks/streamHTTP+SSESubmit task and get stream URL (recommended)
GET/api/v1/stream/sseSSEServer-Sent Events endpoint
GET/api/v1/stream/wsWebSocketWebSocket streaming endpoint
GET/api/v1/tasks/{id}/eventsHTTPGet historical events (paginated)

POST /api/v1/tasks/stream

The easiest way to submit a task and immediately start streaming its events. This endpoint combines task submission with streaming setup in one call.
Best for frontend applications: This endpoint is perfect for real-time UIs where you want to show progress immediately after submitting a task.

Authentication

Required: Yes
X-API-Key: sk_test_123456
Or:
Authorization: Bearer YOUR_TOKEN

Request Body

ParameterTypeRequiredDescription
querystringYesNatural language task description
session_idstringNoSession identifier for multi-turn conversations
contextobjectNoAdditional context data as key-value pairs
model_tierstringNoPreferred tier: small, medium, or large
model_overridestringNoSpecific model name (canonical; e.g., gpt-5)
provider_overridestringNoForce provider (e.g., openai, anthropic, google)

Response

Status: 201 Created Body:
{
  "task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "workflow_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "stream_url": "/api/v1/stream/sse?workflow_id=task_01HQZX3Y9K8M2P4N5S7T9W2V"
}

Response Fields

FieldTypeDescription
workflow_idstringTask/workflow identifier
stream_urlstringRelative URL to SSE stream endpoint

Example: JavaScript/TypeScript

async function submitAndStream(query, onEvent, onComplete, onError) {
  try {
    // 1. Submit task and get stream URL
    const response = await fetch('http://localhost:8080/api/v1/tasks/stream', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer YOUR_TOKEN'
      },
      body: JSON.stringify({
        query: query,
        session_id: `session-${Date.now()}`
      })
    });

    if (!response.ok) {
      throw new Error(`HTTP ${response.status}: ${response.statusText}`);
    }

    const { task_id, workflow_id, stream_url } = await response.json();
    console.log(`Task submitted: ${workflow_id}`);

    // 2. Connect to SSE stream
    const eventSource = new EventSource(
      `http://localhost:8080${stream_url}`,
      { withCredentials: false }
    );

    eventSource.onmessage = (e) => {
      const event = JSON.parse(e.data);
      onEvent(event);

      // Check for completion
      if (event.type === 'WORKFLOW_COMPLETED') {
        eventSource.close();
        onComplete(event);
      }
    };

    eventSource.onerror = (err) => {
      console.error('SSE error:', err);
      eventSource.close();
      onError(err);
    };

    return { workflow_id, eventSource };

  } catch (error) {
    onError(error);
    throw error;
  }
}

// Usage
submitAndStream(
  "Analyze Q4 revenue trends",
  (event) => console.log(`[${event.type}]`, event.message),
  (final) => console.log("Completed:", final.result),
  (error) => console.error("Error:", error)
);

Example: React Hook

import { useState, useCallback, useRef } from 'react';

function useTaskStream(apiUrl = 'http://localhost:8080') {
  const [events, setEvents] = useState([]);
  const [isStreaming, setIsStreaming] = useState(false);
  const [error, setError] = useState(null);
  const [workflowId, setWorkflowId] = useState(null);
  const eventSourceRef = useRef(null);

  const submitTask = useCallback(async (query, sessionId = null) => {
    setEvents([]);
    setError(null);
    setIsStreaming(true);

    try {
      // Submit task
      const response = await fetch(`${apiUrl}/api/v1/tasks/stream`, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${localStorage.getItem('token')}`
        },
        body: JSON.stringify({
          query,
          session_id: sessionId || `session-${Date.now()}`
        })
      });

      if (!response.ok) throw new Error(`HTTP ${response.status}`);

      const { task_id, workflow_id, stream_url } = await response.json();
      setWorkflowId(workflow_id);

      // Connect to stream
      const eventSource = new EventSource(`${apiUrl}${stream_url}`);
      eventSourceRef.current = eventSource;

      eventSource.onmessage = (e) => {
        const event = JSON.parse(e.data);
        setEvents(prev => [...prev, event]);

      if (event.type === 'WORKFLOW_COMPLETED' || event.type === 'ERROR_OCCURRED') {
        eventSource.close();
        setIsStreaming(false);
      }
      };

      eventSource.onerror = (err) => {
        setError(err);
        setIsStreaming(false);
        eventSource.close();
      };

    } catch (err) {
      setError(err);
      setIsStreaming(false);
    }
  }, [apiUrl]);

  const stopStreaming = useCallback(() => {
    if (eventSourceRef.current) {
      eventSourceRef.current.close();
      setIsStreaming(false);
    }
  }, []);

  return { submitTask, stopStreaming, events, isStreaming, error, workflowId };
}

// Usage in component
function TaskStreamDemo() {
  const { submitTask, stopStreaming, events, isStreaming, error, workflowId } = useTaskStream();

  return (
    <div>
      <button
        onClick={() => submitTask("What is 15 + 25?")}
        disabled={isStreaming}
      >
        {isStreaming ? 'Processing...' : 'Submit Task'}
      </button>

      {workflowId && <p>Workflow ID: {workflowId}</p>}
      {error && <p style={{color: 'red'}}>Error: {error.message}</p>}

      <div>
        {events.map((event, idx) => (
          <div key={idx}>
            <strong>{event.type}</strong>: {event.message || event.agent_id}
          </div>
        ))}
      </div>

      {isStreaming && <button onClick={stopStreaming}>Stop</button>}
    </div>
  );
}

Example: Vue 3 Composition API

<script setup>
import { ref } from 'vue';

const events = ref([]);
const isStreaming = ref(false);
const error = ref(null);
const workflowId = ref(null);
let eventSource = null;

async function submitTask(query) {
  events.value = [];
  error.value = null;
  isStreaming.value = true;

  try {
    const response = await fetch('http://localhost:8080/api/v1/tasks/stream', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${localStorage.getItem('token')}`
      },
      body: JSON.stringify({
        query,
        session_id: `session-${Date.now()}`
      })
    });

    if (!response.ok) throw new Error(`HTTP ${response.status}`);

    const { task_id, workflow_id, stream_url } = await response.json();
    workflowId.value = workflow_id;

    eventSource = new EventSource(`http://localhost:8080${stream_url}`);

    eventSource.onmessage = (e) => {
      const event = JSON.parse(e.data);
      events.value.push(event);

      if (event.type === 'WORKFLOW_COMPLETED') {
        eventSource.close();
        isStreaming.value = false;
      }
    };

    eventSource.onerror = (err) => {
      error.value = err;
      eventSource.close();
      isStreaming.value = false;
    };
  } catch (err) {
    error.value = err;
    isStreaming.value = false;
  }
}

function stopStreaming() {
  if (eventSource) {
    eventSource.close();
    isStreaming.value = false;
  }
}
</script>

<template>
  <div>
    <button @click="submitTask('Analyze data')" :disabled="isStreaming">
      {{ isStreaming ? 'Processing...' : 'Submit Task' }}
    </button>

    <div v-if="workflowId">Workflow: {{ workflowId }}</div>
    <div v-if="error" style="color: red">Error: {{ error.message }}</div>

    <div v-for="(event, idx) in events" :key="idx">
      <strong>{{ event.type }}</strong>: {{ event.message || event.agent_id }}
    </div>

    <button v-if="isStreaming" @click="stopStreaming">Stop</button>
  </div>
</template>

Example: Python

import httpx
import json

def submit_and_stream(query: str, api_key: str):
    """Submit task and stream events."""

    # 1. Submit task
    response = httpx.post(
        "http://localhost:8080/api/v1/tasks/stream",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        },
        json={
            "query": query,
            "session_id": f"session-{int(time.time())}"
        }
    )

    data = response.json()
    workflow_id = data["workflow_id"]
    stream_url = data["stream_url"]

    print(f"Task submitted: {workflow_id}")

    # 2. Stream events
    with httpx.stream(
        "GET",
        f"http://localhost:8080{stream_url}",
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=None
    ) as stream_response:
        for line in stream_response.iter_lines():
            if line.startswith("data:"):
                event = json.loads(line[5:])
                print(f"[{event['type']}] {event.get('message', '')}")

                if event['type'] in ['WORKFLOW_COMPLETED']:
                    return event

# Usage
result = submit_and_stream("What is the capital of France?", "sk_test_123456")
print("Final result:", result.get('result'))
Why use this endpoint? The unified endpoint ensures you start streaming immediately after submission, preventing any missed events that could occur if you submit and then connect separately.

Server-Sent Events (SSE)

GET /api/v1/stream/sse

Real-time event streaming using Server-Sent Events.

Authentication

Required: Yes
X-API-Key: sk_test_123456

Query Parameters

ParameterTypeRequiredDescription
workflow_idstringYesTask/workflow identifier
typesstringNoComma-separated event types to filter
last_event_idstringNoResume from specific event ID. Accepts a Redis stream ID (e.g., 1700000000000-0) or a numeric sequence (e.g., 42). When numeric, replay includes events with seq > last_event_id.

Event Format

Each event follows SSE specification:
id: <event_id>
event: <event_type>
data: <json_payload>

Example Request

curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task_abc123" \
  -H "X-API-Key: sk_test_123456"

Example Response

id: 1
event: WORKFLOW_STARTED
data: {"workflow_id":"task_abc123","timestamp":"2025-10-22T10:30:00Z","message":"Workflow started"}

id: 2
event: AGENT_THINKING
data: {"workflow_id":"task_abc123","agent_id":"agent_1","message":"Analyzing query...","timestamp":"2025-10-22T10:30:01Z"}

id: 3
event: TOOL_INVOKED
data: {"workflow_id":"task_abc123","tool":"web_search","params":{"query":"Python programming"},"timestamp":"2025-10-22T10:30:02Z"}

id: 4
event: TOOL_OBSERVATION
data: {"workflow_id":"task_abc123","tool":"web_search","result":"...","timestamp":"2025-10-22T10:30:05Z"}

id: 5
event: AGENT_COMPLETED
data: {"workflow_id":"task_abc123","agent_id":"agent_1","result":"Python is a high-level programming language...","timestamp":"2025-10-22T10:30:06Z"}

id: 6
event: WORKFLOW_COMPLETED
data: {"workflow_id":"task_abc123","status":"COMPLETED","result":"...","timestamp":"2025-10-22T10:30:07Z"}

WebSocket

GET /api/v1/stream/ws

Bidirectional streaming via WebSocket.

Authentication

The gateway authenticates WebSocket connections via headers only (X-API-Key or Authorization). Browsers cannot set custom headers during the WebSocket handshake. For browser usage:
  • Run locally with GATEWAY_SKIP_AUTH=1, or
  • Use a reverse proxy that injects the header before forwarding to the gateway.
Header-based examples for server environments: Node (ws):
import WebSocket from 'ws';

const ws = new WebSocket('ws://localhost:8080/api/v1/stream/ws', {
  headers: { 'X-API-Key': 'sk_test_123456' },
});

ws.on('open', () => {
  ws.send(JSON.stringify({ type: 'subscribe', workflow_id: 'task_abc123' }));
});

ws.on('message', (msg) => {
  const data = JSON.parse(msg.toString());
  console.log('Event:', data.type, data.message);
});

ws.on('error', (err) => console.error('WebSocket error:', err));
Python (websockets):
import asyncio, json, websockets

async def main():
    async with websockets.connect(
        'ws://localhost:8080/api/v1/stream/ws',
        extra_headers={'X-API-Key': 'sk_test_123456'},
    ) as ws:
        await ws.send(json.dumps({'type': 'subscribe', 'workflow_id': 'task_abc123'}))
        async for message in ws:
            data = json.loads(message)
            print('Event:', data.get('type'), data.get('message'))

asyncio.run(main())
Passing the API key in the query string or via an “auth” message after connect is not supported by the gateway.

Message Types

Client → Server:
// Subscribe to workflow
{
  "type": "subscribe",
  "workflow_id": "task_abc123",
  "types": ["AGENT_THINKING", "TOOL_INVOKED"]
}

// Unsubscribe
{
  "type": "unsubscribe",
  "workflow_id": "task_abc123"
}

// Ping (keep-alive)
{
  "type": "ping"
}
Server → Client:
// Event
{
  "type": "AGENT_THINKING",
  "workflow_id": "task_abc123",
  "message": "Analyzing query...",
  "timestamp": "2025-10-22T10:30:00Z"
}

// Pong (keep-alive response)
{
  "type": "pong"
}

Event Types

Core Events

Event TypeDescriptionWhen Fired
WORKFLOW_STARTEDWorkflow execution beganStart of task
WORKFLOW_COMPLETEDWorkflow finished successfullyEnd of task (success)

Agent Events

Event TypeDescriptionPayload Fields
AGENT_THINKINGAgent reasoning/planningagent_id, message
AGENT_COMPLETEDAgent finished executionagent_id, result

Tool Events

Event TypeDescriptionPayload Fields
TOOL_INVOKEDTool execution startedtool, params
TOOL_OBSERVATIONAgent observes tool resulttool, result

LLM Events

Event TypeDescriptionPayload Fields
LLM_PROMPTPrompt sent to LLMtext
LLM_PARTIALStreaming LLM outputtext
LLM_OUTPUTFinal LLM outputtext

Progress & System Events

Event TypeDescriptionPayload Fields
PROGRESSProgress updateprogress, message
DATA_PROCESSINGData processing in progressmessage
WAITINGWaiting for resourcesmessage
ERROR_OCCURREDError occurrederror, severity
ERROR_RECOVERYError recovery attemptmessage
WORKSPACE_UPDATEDMemory/context updatedmessage

Team & Approvals

Event TypeDescription
TEAM_RECRUITEDMulti-agent team assembled
TEAM_RETIREDTeam disbanded
TEAM_STATUSTeam coordination update
ROLE_ASSIGNEDAgent role assigned
DELEGATIONTask delegated
DEPENDENCY_SATISFIEDDependency resolved
APPROVAL_REQUESTEDHuman approval needed
APPROVAL_DECISIONApproval decision recorded

Code Examples

Python with httpx (SSE)

import httpx
import json

def stream_task_events(task_id: str, api_key: str):
    """Stream task events using SSE."""
    url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"

    with httpx.stream(
        "GET",
        url,
        headers={"X-API-Key": api_key},
        timeout=None  # No timeout for streaming
    ) as response:
        for line in response.iter_lines():
            if line.startswith("data:"):
                data = json.loads(line[5:])  # Remove "data:" prefix
                yield data

# Usage
for event in stream_task_events("task_abc123", "sk_test_123456"):
    print(f"[{event.get('type')}] {event.get('message', '')}")

    if event.get('type') == 'WORKFLOW_COMPLETED':
        print("Final result:", event.get('result'))
        break

Python - Stream with Event Filtering

def stream_filtered_events(task_id: str, api_key: str, event_types: list):
    """Stream only specific event types."""
    types_param = ",".join(event_types)
    url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}&types={types_param}"

    with httpx.stream("GET", url, headers={"X-API-Key": api_key}) as response:
        for line in response.iter_lines():
            if line.startswith("data:"):
                yield json.loads(line[5:])

# Only receive agent thinking and tool events
for event in stream_filtered_events(
    "task_abc123",
    "sk_test_123456",
    ["AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"]
):
    print(f"{event['type']}: {event.get('message', event.get('tool'))}")

JavaScript/Node.js (SSE)

const EventSource = require('eventsource');

function streamTaskEvents(taskId, apiKey) {
  const url = `http://localhost:8080/api/v1/stream/sse?workflow_id=${taskId}`;

  const eventSource = new EventSource(url, {
    headers: {
      'X-API-Key': apiKey
    }
  });

  eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    console.log(`[${data.type}] ${data.message || ''}`);

    if (data.type === 'WORKFLOW_COMPLETED') {
      console.log('Final result:', data.result);
      eventSource.close();
    }
  };

  eventSource.onerror = (error) => {
    console.error('SSE error:', error);
    eventSource.close();
  };

  return eventSource;
}

// Usage
const stream = streamTaskEvents('task_abc123', 'sk_test_123456');

// Close manually
setTimeout(() => stream.close(), 60000);

JavaScript/Node.js - WebSocket (ws)

import WebSocket from 'ws';

function connectWebSocket(taskId, apiKey) {
  const ws = new WebSocket(
    `ws://localhost:8080/api/v1/stream/ws?workflow_id=${taskId}`,
    { headers: { 'X-API-Key': apiKey } }
  );

  ws.on('open', () => console.log('✓ Connected'));

  ws.on('message', (msg) => {
    const data = JSON.parse(msg.toString());
    switch (data.type) {
      case 'AGENT_THINKING':
        console.log(`💭 ${data.message}`);
        break;
      case 'TOOL_INVOKED':
        console.log(`🔧 Tool: ${data.tool}`);
        break;
      case 'TOOL_OBSERVATION':
        console.log(`✓ Result: ${data.result}`);
        break;
      case 'WORKFLOW_COMPLETED':
        console.log(`✓ Done: ${data.result}`);
        ws.close();
        break;
      default:
        console.log(`[${data.type}] ${data.message || ''}`);
    }
  });

  ws.on('error', (err) => console.error('❌ Error:', err));
  ws.on('close', () => console.log('Connection closed'));

  return ws;
}

const ws = connectWebSocket('task_abc123', 'sk_test_123456');

Go (SSE)

package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "net/http"
    "strings"
)

type Event struct {
    Type      string                 `json:"type"`
    WorkflowID string                `json:"workflow_id"`
    Message   string                 `json:"message,omitempty"`
    Data      map[string]interface{} `json:",inline"`
}

func streamEvents(taskID, apiKey string) error {
    url := fmt.Sprintf(
        "http://localhost:8080/api/v1/stream/sse?workflow_id=%s",
        taskID,
    )

    req, _ := http.NewRequest("GET", url, nil)
    req.Header.Set("X-API-Key", apiKey)

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    scanner := bufio.NewScanner(resp.Body)

    for scanner.Scan() {
        line := scanner.Text()

        if strings.HasPrefix(line, "data:") {
            data := strings.TrimPrefix(line, "data:")

            var event Event
            json.Unmarshal([]byte(data), &event)

            fmt.Printf("[%s] %s\n", event.Type, event.Message)

            if event.Type == "WORKFLOW_COMPLETED" {
                fmt.Println("✓ Task completed")
                break
            }
        }
    }

    return scanner.Err()
}

func main() {
    err := streamEvents("task_abc123", "sk_test_123456")
    if err != nil {
        fmt.Println("Error:", err)
    }
}

Bash/curl (SSE)

#!/bin/bash

API_KEY="sk_test_123456"
TASK_ID="$1"

curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=$TASK_ID" \
  -H "X-API-Key: $API_KEY" \
  | while IFS= read -r line; do
    if [[ $line == data:* ]]; then
      # Extract JSON from "data: {...}"
      JSON="${line#data:}"

      # Parse and display
      TYPE=$(echo "$JSON" | jq -r '.type')
      MESSAGE=$(echo "$JSON" | jq -r '.message // ""')

      echo "[$(date +%T)] $TYPE: $MESSAGE"

      # Exit on completion
      if [[ "$TYPE" == "WORKFLOW_COMPLETED" ]]; then
        echo ""
        echo "$JSON" | jq -r '.result'
        break
      fi
    fi
  done

Use Cases

1. Real-Time Progress Display

def display_progress(task_id: str, api_key: str):
    """Display real-time progress to user."""
    print(f"Task {task_id} starting...")

    for event in stream_task_events(task_id, api_key):
        event_type = event.get('type')

        if event_type == 'AGENT_THINKING':
            print(f"💭 {event['message']}")
        elif event_type == 'TOOL_INVOKED':
            print(f"🔧 Using tool: {event['tool']}")
        elif event_type == 'TOOL_OBSERVATION':
            print(f"✓ Tool completed")
        elif event_type == 'LLM_PARTIAL':
            print(event['text'], end='', flush=True)
        elif event_type == 'WORKFLOW_COMPLETED':
            print(f"\n\n✓ Done!")
            return event['result']

2. Log All Events to File

import json
from datetime import datetime

def log_events_to_file(task_id: str, api_key: str, log_file: str):
    """Log all events to JSON Lines file."""
    with open(log_file, 'a') as f:
        for event in stream_task_events(task_id, api_key):
            # Add timestamp
            event['logged_at'] = datetime.now().isoformat()

            # Write as JSON Lines
            f.write(json.dumps(event) + '\n')
            f.flush()

            if event.get('type') == 'WORKFLOW_COMPLETED':
                break

log_events_to_file("task_abc123", "sk_test_123456", "task_events.jsonl")

3. Collect Tool Usage Metrics

def collect_tool_metrics(task_id: str, api_key: str):
    """Collect metrics about tool usage."""
    metrics = {
        "tools_invoked": 0,
        "tools_succeeded": 0,
        "tools_failed": 0,
        "tool_list": []
    }

    for event in stream_task_events(task_id, api_key):
        if event.get('type') == 'TOOL_INVOKED':
            metrics['tools_invoked'] += 1
            metrics['tool_list'].append(event['tool'])
        elif event.get('type') == 'TOOL_OBSERVATION':
            metrics['tools_succeeded'] += 1
        elif event.get('type') == 'ERROR_OCCURRED':
            metrics['tools_failed'] += 1
        elif event.get('type') == 'WORKFLOW_COMPLETED':
            break

    return metrics

metrics = collect_tool_metrics("task_abc123", "sk_test_123456")
print(f"Tools used: {metrics['tool_list']}")
print(f"Success rate: {metrics['tools_succeeded']}/{metrics['tools_invoked']}")

4. React UI Integration

import { useState, useEffect } from 'react';

function TaskMonitor({ taskId, apiKey }) {
  const [events, setEvents] = useState([]);
  const [status, setStatus] = useState('connecting');

  useEffect(() => {
    const eventSource = new EventSource(
      `http://localhost:8080/api/v1/stream/sse?workflow_id=${taskId}`,
      { headers: { 'X-API-Key': apiKey } }
    );

    eventSource.onopen = () => setStatus('connected');

    eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);
      setEvents(prev => [...prev, data]);

      if (data.type === 'WORKFLOW_COMPLETED') {
        setStatus('completed');
        eventSource.close();
      }
    };

    eventSource.onerror = () => {
      setStatus('error');
      eventSource.close();
    };

    return () => eventSource.close();
  }, [taskId, apiKey]);

  return (
    <div>
      <h3>Status: {status}</h3>
      <ul>
        {events.map((event, i) => (
          <li key={i}>
            <strong>{event.type}</strong>: {event.message || event.tool}
          </li>
        ))}
      </ul>
    </div>
  );
}

Best Practices

1. Handle Connection Errors

import time

def stream_with_retry(task_id: str, api_key: str, max_retries: int = 3):
    """Stream with automatic retry on connection failure."""
    for attempt in range(max_retries):
        try:
            for event in stream_task_events(task_id, api_key):
                yield event

                if event.get('type') == 'WORKFLOW_COMPLETED':
                    return
        except Exception as e:
            if attempt < max_retries - 1:
                print(f"Connection failed, retrying in {2 ** attempt}s...")
                time.sleep(2 ** attempt)
            else:
                raise

2. Implement Timeout

import signal

def stream_with_timeout(task_id: str, api_key: str, timeout: int = 300):
    """Stream with timeout."""
    def timeout_handler(signum, frame):
        raise TimeoutError("Stream timeout")

    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(timeout)

    try:
        for event in stream_task_events(task_id, api_key):
            yield event
            if event.get('type') == 'WORKFLOW_COMPLETED':
                break
    finally:
        signal.alarm(0)  # Cancel alarm

3. Filter Events Client-Side

def stream_specific_events(task_id: str, api_key: str, event_types: set):
    """Filter events client-side."""
    for event in stream_task_events(task_id, api_key):
        if event.get('type') in event_types:
            yield event

        if event.get('type') == 'WORKFLOW_COMPLETED':
            yield event
            break

# Only receive agent and tool events
for event in stream_specific_events(
    "task_abc123",
    "sk_test_123456",
    {"AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"}
):
    print(event)

4. Resume from Last Event

def stream_with_resume(task_id: str, api_key: str, last_event_id: str = None):
    """Resume streaming from specific event."""
    url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"

    if last_event_id:
        url += f"&last_event_id={last_event_id}"

    # Stream events...
Note: last_event_id accepts either a Redis stream ID (e.g., 1700000000000-0) or a numeric sequence (e.g., 42). When numeric, replay includes events with seq > last_event_id.

Comparison: SSE vs WebSocket vs Polling

FeatureSSEWebSocketPolling
DirectionServer → ClientBidirectionalClient → Server
ProtocolHTTPWebSocketHTTP
Auto-reconnectYes (browser)No (manual)N/A
OverheadLowVery LowHigh
SimplicityHighMediumHigh
Use CaseReal-time updatesInteractive appsSimple status
Shannon Support✅ Recommended✅ Available⚠️ Not ideal

When to Use Each

  • SSE: Most use cases, real-time monitoring, progress display
  • WebSocket: Interactive applications, bidirectional communication needed
  • Polling (GET /api/v1/tasks/): Legacy systems, no streaming support

Notes

Event Retention: Events are stored in Redis for 24 hours. Use last_event_id to resume streaming if connection drops.
Connection Limits: Maximum 100 concurrent streaming connections per API key. Consider multiplexing multiple workflows over a single WebSocket.