Error handling documentation is being expanded. Core patterns are shown below.
Overview
The Shannon Python SDK provides comprehensive error handling to help you build robust applications. All SDK exceptions inherit from ShannonError.
Exception Hierarchy
ShannonError # Base exception
├── ConnectionError # Network/connection issues
├── AuthenticationError # API key/auth problems
├── ValidationError # Invalid parameters
├── TaskNotFoundError # Task doesn't exist
├── SessionNotFoundError # Session doesn't exist
├── TaskTimeoutError # Task exceeded timeout
└── TaskCancelledError # Task was cancelled
Budget and Task Failure Handling: Budget exceeded and task failures are not exceptions. For failures, check status.status. For token usage and cost totals, use list_tasks() and read total_token_usage from the returned task summaries.
Basic Error Handling
Try-Catch Pattern
from shannon import (
ShannonClient,
ShannonError,
AuthenticationError,
ConnectionError,
TaskTimeoutError,
TaskStatusEnum,
)
client = ShannonClient(base_url="http://localhost:8080")
try:
handle = client.submit_task(query="Analyze this data")
status = client.wait(handle.task_id, timeout=120)
if status.status == TaskStatusEnum.FAILED:
print(f"Task failed: {status.error_message}")
else:
print("Result:", status.result)
except ConnectionError:
print("Could not connect to Shannon server")
except AuthenticationError:
print("Invalid API credentials")
except TaskTimeoutError:
print("Task exceeded timeout limit")
except ShannonError as e:
print(f"Shannon error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Specific Error Types
Connection Errors
Handle network and connection issues:
import time
from shannon import ShannonClient, ConnectionError
def connect_with_retry(max_retries=3):
client = ShannonClient()
for attempt in range(max_retries):
try:
# Test connection with a simple task
handle = client.submit_task(query="ping")
return client
except ConnectionError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Connection failed, retrying in {wait_time}s...")
time.sleep(wait_time)
else:
print(f"Failed to connect after {max_retries} attempts")
raise
Check Cost and Failures
Monitor failures from status and get usage totals via list_tasks():
from shannon import ShannonClient, TaskStatusEnum
client = ShannonClient()
handle = client.submit_task(query="Analyze data")
status = client.wait(handle.task_id)
# Failure check
if status.status == TaskStatusEnum.FAILED:
print(f"Failed with error: {status.error_message}")
# Usage and cost totals (from task listings)
tasks, _ = client.list_tasks(limit=50)
usage = next((t.total_token_usage for t in tasks if t.task_id == handle.task_id), None)
if usage:
print(f"tokens={usage.total_tokens} prompt={usage.prompt_tokens} completion={usage.completion_tokens} cost=${usage.cost_usd:.6f}")
Timeout Errors
Handle long-running operations:
import asyncio
from shannon import AsyncShannonClient, TaskTimeoutError
async def with_timeout_handling():
async with AsyncShannonClient() as client:
try:
handle = await asyncio.wait_for(
client.submit_task(query="Complex analysis"),
timeout=10.0,
)
result = await asyncio.wait_for(client.wait(handle.task_id), timeout=60.0)
return result
except asyncio.TimeoutError:
print("Operation timed out")
return None
except TaskTimeoutError:
print("Task timed out")
return None
Rate Limiting
Handle API rate limits gracefully:
from shannon import ShannonClient, ConnectionError
import time
def handle_rate_limits(queries):
client = ShannonClient()
results = []
for query in queries:
backoff = 1
while True:
try:
handle = client.submit_task(query=query)
result = client.wait(handle.task_id)
results.append(result)
break # Success, move to next
except ConnectionError:
print(f"Rate limited or transient error, retrying in {backoff}s...")
time.sleep(backoff)
backoff = min(backoff * 2, 30)
return results
Validation Errors
Handle invalid parameters:
from shannon import ShannonClient, ValidationError
def validate_and_submit(query, session_id=None):
client = ShannonClient()
try:
return client.submit_task(query=query, session_id=session_id)
except ValidationError as e:
print(f"Invalid parameters: {e}")
return None
Task Failure Handling
Handle task execution failures:
from shannon import ShannonClient, TaskTimeoutError, TaskStatusEnum
def handle_task_failure(query: str):
client = ShannonClient()
try:
handle = client.submit_task(query=query)
status = client.wait(handle.task_id, timeout=120)
if status.status == TaskStatusEnum.FAILED:
print(f"Task failed: {status.error_message}")
return None
return status
except TaskTimeoutError:
print("Task timed out; consider increasing timeout or simplifying the request.")
return None
Logging Errors
Implement comprehensive error logging:
import logging
from shannon import ShannonClient, ShannonError
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('shannon')
def logged_task_submission(query):
client = ShannonClient()
try:
logger.info(f"Submitting task: {query[:50]}...")
handle = client.submit_task(query=query)
logger.info(f"Task submitted: task_id={handle.task_id}")
result = client.wait(handle.task_id)
logger.info("Task completed successfully")
return result.result
except ShannonError as e:
logger.error(f"Shannon error: {e}", exc_info=True)
raise
except Exception as e:
logger.critical(f"Unexpected error: {e}", exc_info=True)
raise
Circuit Breaker Pattern
Implement circuit breaker for resilience:
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "half-open"
else:
raise Exception("Circuit breaker is open")
try:
result = func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
# Usage
breaker = CircuitBreaker()
client = ShannonClient()
try:
result = breaker.call(
client.submit_task,
query="Analyze data"
)
except Exception as e:
print(f"Service unavailable: {e}")
Best Practices
- Always catch specific exceptions before generic ones
- Implement retry logic with exponential backoff
- Log errors for debugging and monitoring
- Provide fallback options for critical operations
- Set reasonable timeouts to avoid hanging
- Validate inputs before submission
- Use circuit breakers for external dependencies
Next Steps