This examples collection is growing. More use cases will be added regularly.
Basic Examples
Model Selection
Copy
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
handle = client.submit_task(
"Summarize this document",
model_tier="small",
# model_override="gpt-5-nano-2025-08-07",
# provider_override="openai",
mode="simple",
)
print(client.wait(handle.task_id).result)
Simple Question Answering
Copy
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# Ask a simple question
handle = client.submit_task(query="What is the capital of France?")
final = client.wait(handle.task_id)
print(f"Answer: {final.result}")
Data Analysis
Copy
# Analyze CSV data
handle = client.submit_task(
query="Analyze the sales trends in this CSV file",
context={"file_path": "/data/sales_2024.csv"}
)
# Stream progress
for event in client.stream(handle.workflow_id):
print(f"[{event.type}] {event.message}")
if event.type == "WORKFLOW_COMPLETED":
break
Advanced Examples
Multi-Step Workflow
Copy
from shannon import ShannonClient
def complex_analysis():
client = ShannonClient()
session_id = "quarterly-analysis"
# Step 1: Load data
load_handle = client.submit_task(
query="Load Q4 financial data from database",
session_id=session_id,
)
client.wait(load_handle.task_id)
# Step 2: Analyze trends (has context from step 1)
analyze_handle = client.submit_task(
query="Identify top 3 revenue trends",
session_id=session_id,
)
trends = client.wait(analyze_handle.task_id)
# Step 3: Generate report
report_handle = client.submit_task(
query="Create executive summary with visualizations",
session_id=session_id,
)
return client.wait(report_handle.task_id)
Parallel Processing
Copy
import asyncio
from shannon import AsyncShannonClient
async def parallel_research(topics):
async with AsyncShannonClient() as client:
tasks = [
client.submit_task(query=f"Research latest developments in {topic}")
for topic in topics
]
# Wait for all handles
handles = await asyncio.gather(*tasks)
# Collect all results
results = await asyncio.gather(*[
client.wait(h.task_id)
for h in handles
])
return dict(zip(topics, results))
# Use it
topics = ["AI", "Quantum Computing", "Biotechnology"]
results = asyncio.run(parallel_research(topics))
Streaming with filters
Copy
from shannon import EventType
handle = client.submit_task(query="Research cloud providers")
for event in client.stream(handle.workflow_id, types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]):
print(event.type)
if event.type == EventType.WORKFLOW_COMPLETED:
break
Real-World Use Cases
Code Review Assistant
Copy
def automated_code_review(file_path, language="python"):
client = ShannonClient()
with open(file_path, 'r') as f:
code = f.read()
handle = client.submit_task(
query=f"Review this {language} code for bugs, security issues, and improvements",
# Mode auto-selected,
context={"code": code}
)
# Track progress (generic)
for event in client.stream(handle.workflow_id):
print(f"[{event.type}] {event.message}")
if event.type == "WORKFLOW_COMPLETED":
break
return client.wait(handle.task_id)
# Use it
review = automated_code_review("app.py")
print(review)
Error Handling Examples
Copy
from shannon import ShannonClient, ShannonError, ConnectionError, TaskTimeoutError
import time
def robust_task_submission(query, max_retries=3):
client = ShannonClient()
for attempt in range(max_retries):
try:
# Submit task
handle = client.submit_task(query=query) # Mode auto-selected
# Wait with timeout
result = client.wait(handle.task_id, timeout=300)
return result
except ConnectionError:
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
raise
except TaskTimeoutError:
print("Task timed out")
raise
except ShannonError as e:
print(f"Shannon error: {e}")
raise