Skip to main content
Full async documentation coming soon. Basic patterns are shown below.

Overview

The Shannon Python SDK provides both synchronous and asynchronous clients. Use async operations for:
  • Concurrent task submission
  • Non-blocking event streaming
  • Parallel API calls
  • High-throughput applications

AsyncShannonClient

Basic Usage

import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient(base_url="http://localhost:8080") as client:
        handle = await client.submit_task(
            query="Analyze this data",
            model_tier="small",
            mode="standard",
        )
        final = await client.wait(handle.task_id)
        print(final.result)

asyncio.run(main())

Concurrent Tasks

Submit multiple tasks simultaneously:
async def analyze_multiple():
    async with AsyncShannonClient() as client:
        tasks = [client.submit_task(query=f"Analyze dataset {i}") for i in range(1, 4)]

        # Wait for all to complete
        handles = await asyncio.gather(*tasks)

        # Get all results
        results = await asyncio.gather(*[client.wait(h.task_id) for h in handles])
        return [r.result for r in results]

Async Streaming

Stream events without blocking. Tip: don’t await other client calls inside the async for loop — break out first, then await:
async def stream():
    async with AsyncShannonClient() as client:
        h = await client.submit_task(query="Complex analysis")
        async for e in client.stream(h.workflow_id):
            print(e.type)
            if e.type == "WORKFLOW_COMPLETED":
                break  # exit the loop cleanly

        # Now make additional awaits safely
        final = await client.wait(h.task_id)
        return final.result

Timeout Handling

async def with_timeout():
    async with AsyncShannonClient() as client:
        try:
            # Submit with timeout
            handle = await asyncio.wait_for(
                client.submit_task(query="Analyze data"),
                timeout=30.0
            )

            # Wait for completion with timeout
            result = await asyncio.wait_for(
                client.wait(handle.task_id),
                timeout=300.0
            )

        except asyncio.TimeoutError:
            print("Operation timed out")

Background Tasks

Run tasks in background while doing other work:
async def background_processing():
    async with AsyncShannonClient() as client:
        # Submit task
        handle = await client.submit_task(query="Long running analysis")

        # Start background wait
        task = asyncio.create_task(client.wait(handle.task_id))

        # Do other work
        await do_other_work()

        # Check if done
        if not task.done():
            print("Still processing...")

        # Wait for result when needed
        result = await task

Error Handling

async def robust_submission():
    from shannon import ConnectionError, TaskTimeoutError

    async with AsyncShannonClient() as client:
        try:
            handle = await client.submit_task(query="Analyze data")
            result = await client.wait(handle.task_id)
            return result

        except ConnectionError:
            print("Failed to connect to Shannon")
        except TaskTimeoutError:
            print("Task timed out")
        except Exception as e:
            print(f"Error: {e}")

Integration with Web Frameworks

FastAPI Example

from fastapi import FastAPI
from shannon import AsyncShannonClient

app = FastAPI()
client = AsyncShannonClient()

@app.post("/analyze")
async def analyze(query: str):
    handle = await client.submit_task(query=query)
    result = await client.wait(handle.task_id)
    return {"result": result.result}

aiohttp Example

from aiohttp import web
from shannon import AsyncShannonClient

async def analyze_handler(request):
    data = await request.json()

    async with AsyncShannonClient() as client:
        handle = await client.submit_task(query=data['query'])
        result = await client.wait(handle.task_id)

    return web.json_response({'result': result.result})

Best Practices

  1. Use context managers (async with) for proper cleanup
  2. Handle timeouts for long-running operations
  3. Implement retry logic for network failures
  4. Use gather() for concurrent operations
  5. Stream events for real-time updates

Next Steps