Skip to main content
Error handling documentation is being expanded. Core patterns are shown below.

Overview

The Shannon Python SDK provides comprehensive error handling to help you build robust applications. All SDK exceptions inherit from ShannonError.

Exception Hierarchy

ShannonError                    # Base exception
├── ConnectionError             # Network/connection issues
├── AuthenticationError         # API key/auth problems
├── ValidationError             # Invalid parameters
├── TaskNotFoundError          # Task doesn't exist
├── SessionNotFoundError       # Session doesn't exist
├── TaskTimeoutError           # Task exceeded timeout
└── TaskCancelledError         # Task was cancelled
Budget and Task Failure Handling: Budget exceeded and task failures are not exceptions. For failures, check status.status. For token usage and cost totals, use list_tasks() and read total_token_usage from the returned task summaries.

Basic Error Handling

Try-Catch Pattern

from shannon import (
    ShannonClient,
    ShannonError,
    AuthenticationError,
    ConnectionError,
    TaskTimeoutError,
    TaskStatusEnum,
)

client = ShannonClient(base_url="http://localhost:8080")

try:
    handle = client.submit_task(query="Analyze this data")
    status = client.wait(handle.task_id, timeout=120)

    if status.status == TaskStatusEnum.FAILED:
        print(f"Task failed: {status.error_message}")
    else:
        print("Result:", status.result)

except ConnectionError:
    print("Could not connect to Shannon server")
except AuthenticationError:
    print("Invalid API credentials")
except TaskTimeoutError:
    print("Task exceeded timeout limit")
except ShannonError as e:
    print(f"Shannon error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Specific Error Types

Connection Errors

Handle network and connection issues:
import time
from shannon import ShannonClient, ConnectionError

def connect_with_retry(max_retries=3):
    client = ShannonClient()

    for attempt in range(max_retries):
        try:
            # Test connection with a simple task
            handle = client.submit_task(query="ping")
            return client

        except ConnectionError as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Connection failed, retrying in {wait_time}s...")
                time.sleep(wait_time)
            else:
                print(f"Failed to connect after {max_retries} attempts")
                raise

Check Cost and Failures

Monitor failures from status and get usage totals via list_tasks():
from shannon import ShannonClient, TaskStatusEnum

client = ShannonClient()
handle = client.submit_task(query="Analyze data")
status = client.wait(handle.task_id)

# Failure check
if status.status == TaskStatusEnum.FAILED:
    print(f"Failed with error: {status.error_message}")

# Usage and cost totals (from task listings)
tasks, _ = client.list_tasks(limit=50)
usage = next((t.total_token_usage for t in tasks if t.task_id == handle.task_id), None)
if usage:
    print(f"tokens={usage.total_tokens} prompt={usage.prompt_tokens} completion={usage.completion_tokens} cost=${usage.cost_usd:.6f}")

Timeout Errors

Handle long-running operations:
import asyncio
from shannon import AsyncShannonClient, TaskTimeoutError

async def with_timeout_handling():
    async with AsyncShannonClient() as client:
        try:
            handle = await asyncio.wait_for(
                client.submit_task(query="Complex analysis"),
                timeout=10.0,
            )
            result = await asyncio.wait_for(client.wait(handle.task_id), timeout=60.0)
            return result

        except asyncio.TimeoutError:
            print("Operation timed out")
            return None
        except TaskTimeoutError:
            print("Task timed out")
            return None

Rate Limiting

Handle API rate limits gracefully:
from shannon import ShannonClient, ConnectionError
import time

def handle_rate_limits(queries):
    client = ShannonClient()
    results = []

    for query in queries:
        backoff = 1
        while True:
            try:
                handle = client.submit_task(query=query)
                result = client.wait(handle.task_id)
                results.append(result)
                break  # Success, move to next
            except ConnectionError:
                print(f"Rate limited or transient error, retrying in {backoff}s...")
                time.sleep(backoff)
                backoff = min(backoff * 2, 30)

    return results

Validation Errors

Handle invalid parameters:
from shannon import ShannonClient, ValidationError

def validate_and_submit(query, session_id=None):
    client = ShannonClient()

    try:
        return client.submit_task(query=query, session_id=session_id)
    except ValidationError as e:
        print(f"Invalid parameters: {e}")
        return None

Task Failure Handling

Handle task execution failures:
from shannon import ShannonClient, TaskTimeoutError, TaskStatusEnum

def handle_task_failure(query: str):
    client = ShannonClient()

    try:
        handle = client.submit_task(query=query)
        status = client.wait(handle.task_id, timeout=120)

        if status.status == TaskStatusEnum.FAILED:
            print(f"Task failed: {status.error_message}")
            return None
        return status

    except TaskTimeoutError:
        print("Task timed out; consider increasing timeout or simplifying the request.")
        return None

Logging Errors

Implement comprehensive error logging:
import logging
from shannon import ShannonClient, ShannonError

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('shannon')

def logged_task_submission(query):
    client = ShannonClient()

    try:
        logger.info(f"Submitting task: {query[:50]}...")
        handle = client.submit_task(query=query)

        logger.info(f"Task submitted: task_id={handle.task_id}")
        result = client.wait(handle.task_id)

        logger.info("Task completed successfully")
        return result.result

    except ShannonError as e:
        logger.error(f"Shannon error: {e}", exc_info=True)
        raise

    except Exception as e:
        logger.critical(f"Unexpected error: {e}", exc_info=True)
        raise

Circuit Breaker Pattern

Implement circuit breaker for resilience:
class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open

    def call(self, func, *args, **kwargs):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.reset_timeout:
                self.state = "half-open"
            else:
                raise Exception("Circuit breaker is open")

        try:
            result = func(*args, **kwargs)
            if self.state == "half-open":
                self.state = "closed"
                self.failure_count = 0
            return result

        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = "open"

            raise

# Usage
breaker = CircuitBreaker()
client = ShannonClient()

try:
    result = breaker.call(
        client.submit_task,
        query="Analyze data"
    )
except Exception as e:
    print(f"Service unavailable: {e}")

Best Practices

  1. Always catch specific exceptions before generic ones
  2. Implement retry logic with exponential backoff
  3. Log errors for debugging and monitoring
  4. Provide fallback options for critical operations
  5. Set reasonable timeouts to avoid hanging
  6. Validate inputs before submission
  7. Use circuit breakers for external dependencies

Next Steps