rust-async-patterns

Production patterns for async Rust with Tokio, channels, error handling, and concurrent task management. Covers seven core patterns: concurrent task execution with JoinSet and select! , multi-producer channels (mpsc, broadcast, oneshot, watch), custom error types with anyhow and thiserror , graceful shutdown via CancellationToken , async traits with async_trait , streams and async iteration, and resource pooling with RwLock and Semaphore Includes practical examples for racing futures, limiting concurrency, timeout handling, and task cancellation across shutdown scenarios Provides debugging guidance using tokio-console , tracing instrumentation, and common pitfalls like blocking in async contexts or holding locks across awaits Best practices section covers when to use channels over shared state, proper error propagation, and spawn limits to prevent resource exhaustion

INSTALLATION
npx skills add https://github.com/wshobson/agents --skill rust-async-patterns
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

Rust Async Patterns

Production patterns for async Rust programming with Tokio runtime, including tasks, channels, streams, and error handling.

When to Use This Skill

  • Building async Rust applications
  • Implementing concurrent network services
  • Using Tokio for async I/O
  • Handling async errors properly
  • Debugging async code issues
  • Optimizing async performance

Core Concepts

1. Async Execution Model

Future (lazy) → poll() → Ready(value) | Pending

                ↑           ↓

              Waker ← Runtime schedules

2. Key Abstractions

Concept

Purpose

Future

Lazy computation that may complete later

async fn

Function returning impl Future

await

Suspend until future completes

Task

Spawned future running concurrently

Runtime

Executor that polls futures

Quick Start

# Cargo.toml

[dependencies]

tokio = { version = "1", features = ["full"] }

futures = "0.3"

async-trait = "0.1"

anyhow = "1.0"

tracing = "0.1"

tracing-subscriber = "0.3"
use tokio::time::{sleep, Duration};

use anyhow::Result;

#[tokio::main]

async fn main() -> Result<()> {

    // Initialize tracing

    tracing_subscriber::fmt::init();

    // Async operations

    let result = fetch_data("https://api.example.com").await?;

    println!("Got: {}", result);

    Ok(())

}

async fn fetch_data(url: &#x26;str) -> Result<String> {

    // Simulated async operation

    sleep(Duration::from_millis(100)).await;

    Ok(format!("Data from {}", url))

}

Patterns

Pattern 1: Concurrent Task Execution

use tokio::task::JoinSet;

use anyhow::Result;

// Spawn multiple concurrent tasks

async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> {

    let mut set = JoinSet::new();

    for url in urls {

        set.spawn(async move {

            fetch_data(&#x26;url).await

        });

    }

    let mut results = Vec::new();

    while let Some(res) = set.join_next().await {

        match res {

            Ok(Ok(data)) => results.push(data),

            Ok(Err(e)) => tracing::error!("Task failed: {}", e),

            Err(e) => tracing::error!("Join error: {}", e),

        }

    }

    Ok(results)

}

// With concurrency limit

use futures::stream::{self, StreamExt};

async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> {

    stream::iter(urls)

        .map(|url| async move { fetch_data(&#x26;url).await })

        .buffer_unordered(limit) // Max concurrent tasks

        .collect()

        .await

}

// Select first to complete

use tokio::select;

async fn race_requests(url1: &#x26;str, url2: &#x26;str) -> Result<String> {

    select! {

        result = fetch_data(url1) => result,

        result = fetch_data(url2) => result,

    }

}

Pattern 2: Channels for Communication

use tokio::sync::{mpsc, broadcast, oneshot, watch};

// Multi-producer, single-consumer

async fn mpsc_example() {

    let (tx, mut rx) = mpsc::channel::<String>(100);

    // Spawn producer

    let tx2 = tx.clone();

    tokio::spawn(async move {

        tx2.send("Hello".to_string()).await.unwrap();

    });

    // Consume

    while let Some(msg) = rx.recv().await {

        println!("Got: {}", msg);

    }

}

// Broadcast: multi-producer, multi-consumer

async fn broadcast_example() {

    let (tx, _) = broadcast::channel::<String>(100);

    let mut rx1 = tx.subscribe();

    let mut rx2 = tx.subscribe();

    tx.send("Event".to_string()).unwrap();

    // Both receivers get the message

    let _ = rx1.recv().await;

    let _ = rx2.recv().await;

}

// Oneshot: single value, single use

async fn oneshot_example() -> String {

    let (tx, rx) = oneshot::channel::<String>();

    tokio::spawn(async move {

        tx.send("Result".to_string()).unwrap();

    });

    rx.await.unwrap()

}

// Watch: single producer, multi-consumer, latest value

async fn watch_example() {

    let (tx, mut rx) = watch::channel("initial".to_string());

    tokio::spawn(async move {

        loop {

            // Wait for changes

            rx.changed().await.unwrap();

            println!("New value: {}", *rx.borrow());

        }

    });

    tx.send("updated".to_string()).unwrap();

}

Pattern 3: Async Error Handling

use anyhow::{Context, Result, bail};

use thiserror::Error;

#[derive(Error, Debug)]

pub enum ServiceError {

    #[error("Network error: {0}")]

    Network(#[from] reqwest::Error),

    #[error("Database error: {0}")]

    Database(#[from] sqlx::Error),

    #[error("Not found: {0}")]

    NotFound(String),

    #[error("Timeout after {0:?}")]

    Timeout(std::time::Duration),

}

// Using anyhow for application errors

async fn process_request(id: &#x26;str) -> Result<Response> {

    let data = fetch_data(id)

        .await

        .context("Failed to fetch data")?;

    let parsed = parse_response(&#x26;data)

        .context("Failed to parse response")?;

    Ok(parsed)

}

// Using custom errors for library code

async fn get_user(id: &#x26;str) -> Result<User, ServiceError> {

    let result = db.query(id).await?;

    match result {

        Some(user) => Ok(user),

        None => Err(ServiceError::NotFound(id.to_string())),

    }

}

// Timeout wrapper

use tokio::time::timeout;

async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>

where

    F: std::future::Future<Output = Result<T, ServiceError>>,

{

    timeout(duration, future)

        .await

        .map_err(|_| ServiceError::Timeout(duration))?

}

Pattern 4: Graceful Shutdown

use tokio::signal;

use tokio::sync::broadcast;

use tokio_util::sync::CancellationToken;

async fn run_server() -> Result<()> {

    // Method 1: CancellationToken

    let token = CancellationToken::new();

    let token_clone = token.clone();

    // Spawn task that respects cancellation

    tokio::spawn(async move {

        loop {

            tokio::select! {

                _ = token_clone.cancelled() => {

                    tracing::info!("Task shutting down");

                    break;

                }

                _ = do_work() => {}

            }

        }

    });

    // Wait for shutdown signal

    signal::ctrl_c().await?;

    tracing::info!("Shutdown signal received");

    // Cancel all tasks

    token.cancel();

    // Give tasks time to cleanup

    tokio::time::sleep(Duration::from_secs(5)).await;

    Ok(())

}

// Method 2: Broadcast channel for shutdown

async fn run_with_broadcast() -> Result<()> {

    let (shutdown_tx, _) = broadcast::channel::<()>(1);

    let mut rx = shutdown_tx.subscribe();

    tokio::spawn(async move {

        tokio::select! {

            _ = rx.recv() => {

                tracing::info!("Received shutdown");

            }

            _ = async { loop { do_work().await } } => {}

        }

    });

    signal::ctrl_c().await?;

    let _ = shutdown_tx.send(());

    Ok(())

}

Pattern 5: Async Traits

use async_trait::async_trait;

#[async_trait]

pub trait Repository {

    async fn get(&#x26;self, id: &#x26;str) -> Result<Entity>;

    async fn save(&#x26;self, entity: &#x26;Entity) -> Result<()>;

    async fn delete(&#x26;self, id: &#x26;str) -> Result<()>;

}

pub struct PostgresRepository {

    pool: sqlx::PgPool,

}

#[async_trait]

impl Repository for PostgresRepository {

    async fn get(&#x26;self, id: &#x26;str) -> Result<Entity> {

        sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id)

            .fetch_one(&#x26;self.pool)

            .await

            .map_err(Into::into)

    }

    async fn save(&#x26;self, entity: &#x26;Entity) -> Result<()> {

        sqlx::query!(

            "INSERT INTO entities (id, data) VALUES ($1, $2)

             ON CONFLICT (id) DO UPDATE SET data = $2",

            entity.id,

            entity.data

        )

        .execute(&#x26;self.pool)

        .await?;

        Ok(())

    }

    async fn delete(&#x26;self, id: &#x26;str) -> Result<()> {

        sqlx::query!("DELETE FROM entities WHERE id = $1", id)

            .execute(&#x26;self.pool)

            .await?;

        Ok(())

    }

}

// Trait object usage

async fn process(repo: &#x26;dyn Repository, id: &#x26;str) -> Result<()> {

    let entity = repo.get(id).await?;

    // Process...

    repo.save(&#x26;entity).await

}

Pattern 6: Streams and Async Iteration

use futures::stream::{self, Stream, StreamExt};

use async_stream::stream;

// Create stream from async iterator

fn numbers_stream() -> impl Stream<Item = i32> {

    stream! {

        for i in 0..10 {

            tokio::time::sleep(Duration::from_millis(100)).await;

            yield i;

        }

    }

}

// Process stream

async fn process_stream() {

    let stream = numbers_stream();

    // Map and filter

    let processed: Vec<_> = stream

        .filter(|n| futures::future::ready(*n % 2 == 0))

        .map(|n| n * 2)

        .collect()

        .await;

    println!("{:?}", processed);

}

// Chunked processing

async fn process_in_chunks() {

    let stream = numbers_stream();

    let mut chunks = stream.chunks(3);

    while let Some(chunk) = chunks.next().await {

        println!("Processing chunk: {:?}", chunk);

    }

}

// Merge multiple streams

async fn merge_streams() {

    let stream1 = numbers_stream();

    let stream2 = numbers_stream();

    let merged = stream::select(stream1, stream2);

    merged

        .for_each(|n| async move {

            println!("Got: {}", n);

        })

        .await;

}

Pattern 7: Resource Management

use std::sync::Arc;

use tokio::sync::{Mutex, RwLock, Semaphore};

// Shared state with RwLock (prefer for read-heavy)

struct Cache {

    data: RwLock<HashMap<String, String>>,

}

impl Cache {

    async fn get(&#x26;self, key: &#x26;str) -> Option<String> {

        self.data.read().await.get(key).cloned()

    }

    async fn set(&#x26;self, key: String, value: String) {

        self.data.write().await.insert(key, value);

    }

}

// Connection pool with semaphore

struct Pool {

    semaphore: Semaphore,

    connections: Mutex<Vec<Connection>>,

}

impl Pool {

    fn new(size: usize) -> Self {

        Self {

            semaphore: Semaphore::new(size),

            connections: Mutex::new((0..size).map(|_| Connection::new()).collect()),

        }

    }

    async fn acquire(&#x26;self) -> PooledConnection<'_> {

        let permit = self.semaphore.acquire().await.unwrap();

        let conn = self.connections.lock().await.pop().unwrap();

        PooledConnection { pool: self, conn: Some(conn), _permit: permit }

    }

}

struct PooledConnection<'a> {

    pool: &#x26;'a Pool,

    conn: Option<Connection>,

    _permit: tokio::sync::SemaphorePermit<'a>,

}

impl Drop for PooledConnection<'_> {

    fn drop(&#x26;mut self) {

        if let Some(conn) = self.conn.take() {

            let pool = self.pool;

            tokio::spawn(async move {

                pool.connections.lock().await.push(conn);

            });

        }

    }

}

Debugging Tips

// Enable tokio-console for runtime debugging

// Cargo.toml: tokio = { features = ["tracing"] }

// Run: RUSTFLAGS="--cfg tokio_unstable" cargo run

// Then: tokio-console

// Instrument async functions

use tracing::instrument;

#[instrument(skip(pool))]

async fn fetch_user(pool: &#x26;PgPool, id: &#x26;str) -> Result<User> {

    tracing::debug!("Fetching user");

    // ...

}

// Track task spawning

let span = tracing::info_span!("worker", id = %worker_id);

tokio::spawn(async move {

    // Enters span when polled

}.instrument(span));

Best Practices

Do's

  • **Use tokio::select!** - For racing futures
  • Prefer channels - Over shared state when possible
  • **Use JoinSet** - For managing multiple tasks
  • Instrument with tracing - For debugging async code
  • Handle cancellation - Check CancellationToken

Don'ts

  • Don't block - Never use std::thread::sleep in async
  • Don't hold locks across awaits - Causes deadlocks
  • Don't spawn unboundedly - Use semaphores for limits
  • Don't ignore errors - Propagate with ? or log
  • Don't forget Send bounds - For spawned futures
BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card