fastapi-async-patterns

FastAPI async patterns for building high-performance, concurrent APIs with optimal resource usage. Covers async route handlers, database operations (SQLAlchemy, asyncpg, Motor, Tortoise ORM), and connection pooling strategies for efficient resource management Includes real-time communication patterns: WebSockets with authentication and broadcasting, Server-Sent Events (SSE), and streaming responses for large files or generated content Provides concurrent request handling with asyncio.gather for parallel operations, background tasks with BackgroundTasks, and async context managers for resource cleanup Demonstrates performance optimization techniques including caching, batch operations, debouncing, and prefetching related data to minimize latency

INSTALLATION
npx skills add https://github.com/thebushidocollective/han --skill fastapi-async-patterns
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

FastAPI Async Patterns

Master async patterns in FastAPI for building high-performance,

concurrent APIs with optimal resource usage.

Basic Async Route Handlers

Understanding async vs sync endpoints in FastAPI.

from fastapi import FastAPI

import time

import asyncio

app = FastAPI()

Sync endpoint (blocks the event loop)

@app.get('/sync')

def sync_endpoint():

time.sleep(1) # Blocks the entire server

return {'message': 'Completed after 1 second'}

Async endpoint (non-blocking)

@app.get('/async')

async def async_endpoint():

await asyncio.sleep(1) # Other requests can be handled

return {'message': 'Completed after 1 second'}

CPU-bound work (use sync)

@app.get('/cpu-intensive')

def cpu_intensive():

result = sum(i * i for i in range(10000000))

return {'result': result}

I/O-bound work (use async)

@app.get('/io-intensive')

async def io_intensive():

async with httpx.AsyncClient() as client:

response = await client.get('https://api.example.com/data')

return response.json()

## Async Database Operations

Async database patterns with popular ORMs and libraries.

from fastapi import FastAPI, Depends, HTTPException

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine

from sqlalchemy.orm import sessionmaker

from sqlalchemy import select

import asyncpg

from motor.motor_asyncio import AsyncIOMotorClient

from tortoise import Tortoise

from tortoise.contrib.fastapi import register_tortoise

app = FastAPI()

SQLAlchemy async setup

DATABASE_URL = 'postgresql+asyncpg://user:pass@localhost/db'

engine = create_async_engine(DATABASE_URL, echo=True, future=True)

AsyncSessionLocal = sessionmaker(

engine, class_=AsyncSession, expire_on_commit=False

)

async def get_db() -> AsyncSession:

async with AsyncSessionLocal() as session:

try:

yield session

await session.commit()

except Exception:

await session.rollback()

raise

@app.get('/users/{user_id}')

async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):

result = await db.execute(select(User).where(User.id == user_id))

user = result.scalar_one_or_none()

if not user:

raise HTTPException(status_code=404, detail='User not found')

return user

Direct asyncpg (lower level, faster)

async def get_asyncpg_pool():

pool = await asyncpg.create_pool(

'postgresql://user:pass@localhost/db',

min_size=10,

max_size=20

)

try:

yield pool

finally:

await pool.close()

@app.get('/users-fast/{user_id}')

async def get_user_fast(user_id: int, pool = Depends(get_asyncpg_pool)):

async with pool.acquire() as conn:

row = await conn.fetchrow(

'SELECT * FROM users WHERE id = $1', user_id

)

if not row:

raise HTTPException(status_code=404, detail='User not found')

return dict(row)

MongoDB with Motor

mongo_client = AsyncIOMotorClient('mongodb://localhost:27017')

db = mongo_client.mydatabase

@app.get('/documents/{doc_id}')

async def get_document(doc_id: str):

document = await db.collection.find_one({'_id': doc_id})

if not document:

raise HTTPException(status_code=404, detail='Document not found')

return document

@app.post('/documents')

async def create_document(data: dict):

result = await db.collection.insert_one(data)

return {'id': str(result.inserted_id)}

Tortoise ORM async

register_tortoise(

app,

db_url='postgres://user:pass@localhost/db',

modules={'models': ['app.models']},

generate_schemas=True,

add_exception_handlers=True,

)

from tortoise.models import Model

from tortoise import fields

class UserModel(Model):

id = fields.IntField(pk=True)

name = fields.CharField(max_length=255)

email = fields.CharField(max_length=255)

@app.get('/tortoise-users/{user_id}')

async def get_tortoise_user(user_id: int):

user = await UserModel.get_or_none(id=user_id)

if not user:

raise HTTPException(status_code=404, detail='User not found')

return user


## Background Tasks

Fire-and-forget tasks without blocking the response.

from fastapi import BackgroundTasks, FastAPI

import asyncio

from datetime import datetime

app = FastAPI()

Simple background task

async def send_email(email: str, message: str):

await asyncio.sleep(2) # Simulate email sending

print(f'Email sent to {email}: {message}')

@app.post('/send-email')

async def send_email_endpoint(

email: str,

message: str,

background_tasks: BackgroundTasks

):

background_tasks.add_task(send_email, email, message)

return {'status': 'Email will be sent in background'}

Multiple background tasks

async def log_activity(user_id: int, action: str):

await asyncio.sleep(0.5)

print(f'[{datetime.now()}] User {user_id} performed: {action}')

async def update_analytics(action: str):

await asyncio.sleep(1)

print(f'Analytics updated for action: {action}')

@app.post('/users/{user_id}/action')

async def perform_action(

user_id: int,

action: str,

background_tasks: BackgroundTasks

):

# Add multiple tasks

background_tasks.add_task(log_activity, user_id, action)

background_tasks.add_task(update_analytics, action)

return {'status': 'Action logged'}

Background cleanup

async def cleanup_temp_files(file_path: str):

await asyncio.sleep(60) # Wait before cleanup

import os

if os.path.exists(file_path):

os.remove(file_path)

print(f'Cleaned up: {file_path}')

@app.post('/upload')

async def upload_file(

file: UploadFile,

background_tasks: BackgroundTasks

):

temp_path = f'/tmp/{file.filename}'

with open(temp_path, 'wb') as f:

content = await file.read()

f.write(content)

# Schedule cleanup

background_tasks.add_task(cleanup_temp_files, temp_path)

return {'filename': file.filename, 'path': temp_path}


## WebSocket Handling

Real-time bidirectional communication patterns.

from fastapi import WebSocket, WebSocketDisconnect, Depends

from typing import List

import json

app = FastAPI()

Simple WebSocket

@app.websocket('/ws')

async def websocket_endpoint(websocket: WebSocket):

await websocket.accept()

try:

while True:

data = await websocket.receive_text()

await websocket.send_text(f'Echo: {data}')

except WebSocketDisconnect:

print('Client disconnected')

WebSocket with authentication

async def get_current_user_ws(websocket: WebSocket):

token = websocket.query_params.get('token')

if not token or not verify_token(token):

await websocket.close(code=1008) # Policy violation

raise HTTPException(status_code=401, detail='Unauthorized')

return decode_token(token)

@app.websocket('/ws/authenticated')

async def authenticated_websocket(

websocket: WebSocket,

user = Depends(get_current_user_ws)

):

await websocket.accept()

try:

await websocket.send_text(f'Welcome {user["name"]}')

while True:

data = await websocket.receive_text()

await websocket.send_text(f'{user["name"]}: {data}')

except WebSocketDisconnect:

print(f'User {user["name"]} disconnected')

Broadcasting to multiple connections

class ConnectionManager:

def __init__(self):

self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):

await websocket.accept()

self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):

self.active_connections.remove(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):

await websocket.send_text(message)

async def broadcast(self, message: str):

for connection in self.active_connections:

await connection.send_text(message)

manager = ConnectionManager()

@app.websocket('/ws/chat/{client_id}')

async def chat_endpoint(websocket: WebSocket, client_id: str):

await manager.connect(websocket)

await manager.broadcast(f'Client {client_id} joined the chat')

try:

while True:

data = await websocket.receive_text()

await manager.broadcast(f'Client {client_id}: {data}')

except WebSocketDisconnect:

manager.disconnect(websocket)

await manager.broadcast(f'Client {client_id} left the chat')

WebSocket with JSON messages

@app.websocket('/ws/json')

async def json_websocket(websocket: WebSocket):

await websocket.accept()

try:

while True:

data = await websocket.receive_json()

message_type = data.get('type')

if message_type == 'ping':

await websocket.send_json({'type': 'pong'})

elif message_type == 'message':

await websocket.send_json({

'type': 'response',

'data': f'Received: {data.get("content")}'

})

except WebSocketDisconnect:

print('Client disconnected')


## Server-Sent Events (SSE)

One-way streaming from server to client.

from fastapi import FastAPI

from sse_starlette.sse import EventSourceResponse

import asyncio

app = FastAPI()

@app.get('/sse')

async def sse_endpoint():

async def event_generator():

for i in range(10):

await asyncio.sleep(1)

yield {

'event': 'message',

'data': f'Message {i}'

}

return EventSourceResponse(event_generator())

SSE with real-time updates

@app.get('/sse/updates')

async def sse_updates():

async def update_generator():

while True:

# Simulate fetching updates

await asyncio.sleep(2)

update = await fetch_latest_update()

yield {

'event': 'update',

'data': json.dumps(update)

}

return EventSourceResponse(update_generator())

SSE with heartbeat

@app.get('/sse/heartbeat')

async def sse_heartbeat():

async def heartbeat_generator():

try:

while True:

await asyncio.sleep(30)

yield {

'event': 'heartbeat',

'data': datetime.now().isoformat()

}

except asyncio.CancelledError:

print('SSE connection closed')

return EventSourceResponse(heartbeat_generator())


## Streaming Responses

Stream large files or generated content.

from fastapi import FastAPI

from fastapi.responses import StreamingResponse

import io

import csv

app = FastAPI()

Stream large file

@app.get('/download/{filename}')

async def download_file(filename: str):

async def file_stream():

with open(f'/data/{filename}', 'rb') as f:

while chunk := f.read(8192):

yield chunk

return StreamingResponse(

file_stream(),

media_type='application/octet-stream',

headers={'Content-Disposition': f'attachment; filename={filename}'}

)

Stream generated CSV

@app.get('/export/users')

async def export_users():

async def csv_stream():

output = io.StringIO()

writer = csv.writer(output)

# Write header

writer.writerow(['ID', 'Name', 'Email'])

yield output.getvalue()

output.truncate(0)

output.seek(0)

# Stream users in batches

offset = 0

batch_size = 100

while True:

users = await fetch_users_batch(offset, batch_size)

if not users:

break

for user in users:

writer.writerow([user.id, user.name, user.email])

yield output.getvalue()

output.truncate(0)

output.seek(0)

offset += batch_size

return StreamingResponse(

csv_stream(),

media_type='text/csv',

headers={'Content-Disposition': 'attachment; filename=users.csv'}

)

Stream generated content

@app.get('/generate/report')

async def generate_report():

async def report_stream():

yield b'<html><body><h1>Report</h1>'

for section in ['users', 'orders', 'analytics']:

await asyncio.sleep(0.5) # Simulate processing

data = await fetch_section_data(section)

yield f'<h2>{section.title()}</h2>'.encode()

yield f'<pre>{data}</pre>'.encode()

yield b'</body></html>'

return StreamingResponse(report_stream(), media_type='text/html')


## Concurrent Request Handling

Parallel processing patterns for multiple operations.

from fastapi import FastAPI

import asyncio

import httpx

app = FastAPI()

Parallel API calls

@app.get('/aggregate/user/{user_id}')

async def aggregate_user_data(user_id: int):

async with httpx.AsyncClient() as client:

# Fetch from multiple sources in parallel

profile_task = client.get(f'https://api.example.com/users/{user_id}')

posts_task = client.get(f'https://api.example.com/users/{user_id}/posts')

comments_task = client.get(f'https://api.example.com/users/{user_id}/comments')

profile, posts, comments = await asyncio.gather(

profile_task,

posts_task,

comments_task

)

return {

'profile': profile.json(),

'posts': posts.json(),

'comments': comments.json()

}

Parallel database queries

@app.get('/dashboard')

async def get_dashboard(db: AsyncSession = Depends(get_db)):

# Execute multiple queries in parallel

users_query = db.execute(select(User).limit(10))

orders_query = db.execute(select(Order).limit(10))

stats_query = db.execute(select(func.count(User.id)))

users, orders, stats = await asyncio.gather(

users_query,

orders_query,

stats_query

)

return {

'users': users.scalars().all(),

'orders': orders.scalars().all(),

'total_users': stats.scalar()

}

Race condition (first to complete wins)

@app.get('/fastest-price/{product_id}')

async def get_fastest_price(product_id: str):

async with httpx.AsyncClient() as client:

tasks = [

client.get(f'https://store1.com/price/{product_id}'),

client.get(f'https://store2.com/price/{product_id}'),

client.get(f'https://store3.com/price/{product_id}')

]

done, pending = await asyncio.wait(

tasks,

return_when=asyncio.FIRST_COMPLETED

)

# Cancel pending requests

for task in pending:

task.cancel()

result = done.pop().result()

return result.json()


## Async Context Managers

Resource management with async context managers.

from contextlib import asynccontextmanager

from fastapi import FastAPI

import asyncio

Async context manager for lifespan events

@asynccontextmanager

async def lifespan(app: FastAPI):

# Startup

print('Starting up...')

db_pool = await create_db_pool()

redis_client = await create_redis_client()

# Store in app state

app.state.db_pool = db_pool

app.state.redis = redis_client

yield

# Shutdown

print('Shutting down...')

await db_pool.close()

await redis_client.close()

app = FastAPI(lifespan=lifespan)

Custom async context manager

class AsyncDatabaseSession:

def __init__(self, pool):

self.pool = pool

self.conn = None

async def __aenter__(self):

self.conn = await self.pool.acquire()

return self.conn

async def __aexit__(self, exc_type, exc_val, exc_tb):

await self.pool.release(self.conn)

if exc_type is not None:

# Handle exception

await self.conn.rollback()

return False

@app.get('/data')

async def get_data():

async with AsyncDatabaseSession(app.state.db_pool) as conn:

result = await conn.fetch('SELECT * FROM data')

return result


## Connection Pooling

Efficient connection management for databases and HTTP clients.

from fastapi import FastAPI, Depends

import asyncpg

import httpx

from typing import AsyncGenerator

app = FastAPI()

Database connection pool

class DatabasePool:

def __init__(self):

self.pool = None

async def create_pool(self):

self.pool = await asyncpg.create_pool(

'postgresql://user:pass@localhost/db',

min_size=10,

max_size=20,

command_timeout=60,

max_queries=50000,

max_inactive_connection_lifetime=300

)

async def close_pool(self):

await self.pool.close()

async def get_connection(self):

async with self.pool.acquire() as connection:

yield connection

db_pool = DatabasePool()

@app.on_event('startup')

async def startup():

await db_pool.create_pool()

@app.on_event('shutdown')

async def shutdown():

await db_pool.close_pool()

@app.get('/users')

async def get_users(conn = Depends(db_pool.get_connection)):

rows = await conn.fetch('SELECT * FROM users')

return [dict(row) for row in rows]

HTTP client pool

class HTTPClientPool:

def __init__(self):

self.client = None

async def get_client(self) -> AsyncGenerator[httpx.AsyncClient, None]:

if self.client is None:

self.client = httpx.AsyncClient(

limits=httpx.Limits(max_keepalive_connections=20, max_connections=100),

timeout=httpx.Timeout(10.0)

)

yield self.client

async def close(self):

if self.client:

await self.client.aclose()

http_pool = HTTPClientPool()

@app.get('/external-api')

async def call_external_api(client: httpx.AsyncClient = Depends(http_pool.get_client)):

response = await client.get('https://api.example.com/data')

return response.json()


## Performance Optimization

Async patterns for optimal performance.

from fastapi import FastAPI

import asyncio

from functools import lru_cache

app = FastAPI()

Cache expensive async operations

from aiocache import Cache

from aiocache.serializers import JsonSerializer

cache = Cache(Cache.MEMORY, serializer=JsonSerializer())

@app.get('/expensive-data/{key}')

async def get_expensive_data(key: str):

# Check cache first

cached = await cache.get(key)

if cached:

return {'data': cached, 'cached': True}

# Expensive operation

await asyncio.sleep(2)

data = compute_expensive_result(key)

# Store in cache

await cache.set(key, data, ttl=300)

return {'data': data, 'cached': False}

Batch operations

@app.post('/users/batch')

async def create_users_batch(users: List[UserCreate], db = Depends(get_db)):

# Create users in batch (more efficient than one-by-one)

user_objects = [User(**user.dict()) for user in users]

db.add_all(user_objects)

await db.flush()

return user_objects

Debouncing with asyncio

class Debouncer:

def __init__(self, delay: float):

self.delay = delay

self.task = None

async def debounce(self, coro):

if self.task:

self.task.cancel()

async def delayed():

await asyncio.sleep(self.delay)

await coro

self.task = asyncio.create_task(delayed())

await self.task

debouncer = Debouncer(delay=1.0)

Prefetching related data

@app.get('/posts/{post_id}')

async def get_post_with_relations(post_id: int, db = Depends(get_db)):

# Fetch post and related data in parallel

post_task = db.get(Post, post_id)

comments_task = db.execute(

select(Comment).where(Comment.post_id == post_id)

)

author_task = db.execute(

select(User).where(User.id == Post.author_id)

)

post, comments_result, author_result = await asyncio.gather(

post_task, comments_task, author_task

)

return {

'post': post,

'comments': comments_result.scalars().all(),

'author': author_result.scalar_one()

}

BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card