Skip to main content

Overview

Use SSE for simple, reliable real-time updates. Optional WebSocket is available for environments that require WS.
from shannon import ShannonClient, EventType
client = ShannonClient()

h = client.submit_task("Analyze market trends")
for e in client.stream(h.workflow_id, types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]):
  print(f"{e.type}: {e.message}")
  if e.type == EventType.WORKFLOW_COMPLETED:
    break
Resume from last event:
last_id = None
for e in client.stream(h.workflow_id, last_event_id=last_id):
  # ...
  last_id = e.id
Notes:
  • last_event_id accepts either a Redis stream ID (preferred) or a numeric sequence. The SDK exposes event.id which chooses stream_id when present or falls back to seq.

WebSocket (optional)

Requires pip install websockets.
from shannon import ShannonClient
client = ShannonClient()

h = client.submit_task("Stream via WS")
for e in client.stream_ws(h.workflow_id):
  print(e.type, e.message)
  if e.type == "WORKFLOW_COMPLETED":
    break

Common Event Types

  • WORKFLOW_STARTED / WORKFLOW_COMPLETED
  • LLM_PROMPT / LLM_PARTIAL / LLM_OUTPUT
  • TOOL_INVOKED / TOOL_OBSERVATION
  • APPROVAL_REQUESTED / APPROVAL_DECISION
  • ERROR_OCCURRED
Tip: Filter using EventType enums or raw strings.