Full async documentation coming soon. Basic patterns are shown below.
Overview
The Shannon Python SDK provides both synchronous and asynchronous clients. Use async operations for:
- Concurrent task submission
- Non-blocking event streaming
- Parallel API calls
- High-throughput applications
AsyncShannonClient
Basic Usage
import asyncio
from shannon import AsyncShannonClient
async def main():
async with AsyncShannonClient(base_url="http://localhost:8080") as client:
handle = await client.submit_task(
query="Analyze this data",
model_tier="small",
mode="standard",
)
final = await client.wait(handle.task_id)
print(final.result)
asyncio.run(main())
Concurrent Tasks
Submit multiple tasks simultaneously:
async def analyze_multiple():
async with AsyncShannonClient() as client:
tasks = [client.submit_task(query=f"Analyze dataset {i}") for i in range(1, 4)]
# Wait for all to complete
handles = await asyncio.gather(*tasks)
# Get all results
results = await asyncio.gather(*[client.wait(h.task_id) for h in handles])
return [r.result for r in results]
Async Streaming
Stream events without blocking. Tip: don’t await other client calls inside the async for loop — break out first, then await:
async def stream():
async with AsyncShannonClient() as client:
h = await client.submit_task(query="Complex analysis")
async for e in client.stream(h.workflow_id):
print(e.type)
if e.type == "WORKFLOW_COMPLETED":
break # exit the loop cleanly
# Now make additional awaits safely
final = await client.wait(h.task_id)
return final.result
Timeout Handling
async def with_timeout():
async with AsyncShannonClient() as client:
try:
# Submit with timeout
handle = await asyncio.wait_for(
client.submit_task(query="Analyze data"),
timeout=30.0
)
# Wait for completion with timeout
result = await asyncio.wait_for(
client.wait(handle.task_id),
timeout=300.0
)
except asyncio.TimeoutError:
print("Operation timed out")
Background Tasks
Run tasks in background while doing other work:
async def background_processing():
async with AsyncShannonClient() as client:
# Submit task
handle = await client.submit_task(query="Long running analysis")
# Start background wait
task = asyncio.create_task(client.wait(handle.task_id))
# Do other work
await do_other_work()
# Check if done
if not task.done():
print("Still processing...")
# Wait for result when needed
result = await task
Error Handling
async def robust_submission():
from shannon import ConnectionError, TaskTimeoutError
async with AsyncShannonClient() as client:
try:
handle = await client.submit_task(query="Analyze data")
result = await client.wait(handle.task_id)
return result
except ConnectionError:
print("Failed to connect to Shannon")
except TaskTimeoutError:
print("Task timed out")
except Exception as e:
print(f"Error: {e}")
Integration with Web Frameworks
FastAPI Example
from fastapi import FastAPI
from shannon import AsyncShannonClient
app = FastAPI()
client = AsyncShannonClient()
@app.post("/analyze")
async def analyze(query: str):
handle = await client.submit_task(query=query)
result = await client.wait(handle.task_id)
return {"result": result.result}
aiohttp Example
from aiohttp import web
from shannon import AsyncShannonClient
async def analyze_handler(request):
data = await request.json()
async with AsyncShannonClient() as client:
handle = await client.submit_task(query=data['query'])
result = await client.wait(handle.task_id)
return web.json_response({'result': result.result})
Best Practices
- Use context managers (
async with) for proper cleanup
- Handle timeouts for long-running operations
- Implement retry logic for network failures
- Use gather() for concurrent operations
- Stream events for real-time updates
Next Steps