python-background-jobs

Async task processing patterns for decoupling long-running work from request/response cycles. Covers core patterns including immediate job ID returns, task queue configuration with Celery, idempotency strategies, and job state management for visibility Includes advanced workflows: dead letter queues for failed tasks, status polling endpoints, task chaining, and parallel execution Provides examples for Celery, RQ, and Dramatiq, plus guidance on cloud-native alternatives like AWS SQS and Google Cloud Tasks Emphasizes idempotency through check-before-write, idempotency keys, and deduplication windows to handle at-least-once delivery safely

INSTALLATION
npx skills add https://github.com/wshobson/agents --skill python-background-jobs
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

Python Background Jobs & Task Queues

Decouple long-running or unreliable work from request/response cycles. Return immediately to the user while background workers handle the heavy lifting asynchronously.

When to Use This Skill

  • Processing tasks that take longer than a few seconds
  • Sending emails, notifications, or webhooks
  • Generating reports or exporting data
  • Processing uploads or media transformations
  • Integrating with unreliable external services
  • Building event-driven architectures

Core Concepts

1. Task Queue Pattern

API accepts request, enqueues a job, returns immediately with a job ID. Workers process jobs asynchronously.

2. Idempotency

Tasks may be retried on failure. Design for safe re-execution.

3. Job State Machine

Jobs transition through states: pending → running → succeeded/failed.

4. At-Least-Once Delivery

Most queues guarantee at-least-once delivery. Your code must handle duplicates.

Quick Start

This skill uses Celery for examples, a widely adopted task queue. Alternatives like RQ, Dramatiq, and cloud-native solutions (AWS SQS, GCP Tasks) are equally valid choices.

from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379")

@app.task

def send_email(to: str, subject: str, body: str) -> None:

    # This runs in a background worker

    email_client.send(to, subject, body)

# In your API handler

send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")

Fundamental Patterns

Pattern 1: Return Job ID Immediately

For operations exceeding a few seconds, return a job ID and process asynchronously.

from uuid import uuid4

from dataclasses import dataclass

from enum import Enum

from datetime import datetime

class JobStatus(Enum):

    PENDING = "pending"

    RUNNING = "running"

    SUCCEEDED = "succeeded"

    FAILED = "failed"

@dataclass

class Job:

    id: str

    status: JobStatus

    created_at: datetime

    started_at: datetime | None = None

    completed_at: datetime | None = None

    result: dict | None = None

    error: str | None = None

# API endpoint

async def start_export(request: ExportRequest) -> JobResponse:

    """Start export job and return job ID."""

    job_id = str(uuid4())

    # Persist job record

    await jobs_repo.create(Job(

        id=job_id,

        status=JobStatus.PENDING,

        created_at=datetime.utcnow(),

    ))

    # Enqueue task for background processing

    await task_queue.enqueue(

        "export_data",

        job_id=job_id,

        params=request.model_dump(),

    )

    # Return immediately with job ID

    return JobResponse(

        job_id=job_id,

        status="pending",

        poll_url=f"/jobs/{job_id}",

    )

Pattern 2: Celery Task Configuration

Configure Celery tasks with proper retry and timeout settings.

from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379")

# Global configuration

app.conf.update(

    task_time_limit=3600,          # Hard limit: 1 hour

    task_soft_time_limit=3000,      # Soft limit: 50 minutes

    task_acks_late=True,            # Acknowledge after completion

    task_reject_on_worker_lost=True,

    worker_prefetch_multiplier=1,   # Don't prefetch too many tasks

)

@app.task(

    bind=True,

    max_retries=3,

    default_retry_delay=60,

    autoretry_for=(ConnectionError, TimeoutError),

)

def process_payment(self, payment_id: str) -> dict:

    """Process payment with automatic retry on transient errors."""

    try:

        result = payment_gateway.charge(payment_id)

        return {"status": "success", "transaction_id": result.id}

    except PaymentDeclinedError as e:

        # Don't retry permanent failures

        return {"status": "declined", "reason": str(e)}

    except TransientError as e:

        # Retry with exponential backoff

        raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)

Pattern 3: Make Tasks Idempotent

Workers may retry on crash or timeout. Design for safe re-execution.

@app.task(bind=True)

def process_order(self, order_id: str) -> None:

    """Process order idempotently."""

    order = orders_repo.get(order_id)

    # Already processed? Return early

    if order.status == OrderStatus.COMPLETED:

        logger.info("Order already processed", order_id=order_id)

        return

    # Already in progress? Check if we should continue

    if order.status == OrderStatus.PROCESSING:

        # Use idempotency key to avoid double-charging

        pass

    # Process with idempotency key

    result = payment_provider.charge(

        amount=order.total,

        idempotency_key=f"order-{order_id}",  # Critical!

    )

    orders_repo.update(order_id, status=OrderStatus.COMPLETED)

Idempotency Strategies:

  • Check-before-write: Verify state before action
  • Idempotency keys: Use unique tokens with external services
  • Upsert patterns: INSERT ... ON CONFLICT UPDATE
  • Deduplication window: Track processed IDs for N hours

Pattern 4: Job State Management

Persist job state transitions for visibility and debugging.

class JobRepository:

    """Repository for managing job state."""

    async def create(self, job: Job) -> Job:

        """Create new job record."""

        await self._db.execute(

            """INSERT INTO jobs (id, status, created_at)

               VALUES ($1, $2, $3)""",

            job.id, job.status.value, job.created_at,

        )

        return job

    async def update_status(

        self,

        job_id: str,

        status: JobStatus,

        **fields,

    ) -> None:

        """Update job status with timestamp."""

        updates = {"status": status.value, **fields}

        if status == JobStatus.RUNNING:

            updates["started_at"] = datetime.utcnow()

        elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):

            updates["completed_at"] = datetime.utcnow()

        await self._db.execute(

            "UPDATE jobs SET status = $1, ... WHERE id = $2",

            updates, job_id,

        )

        logger.info(

            "Job status updated",

            job_id=job_id,

            status=status.value,

        )

Advanced Patterns

Pattern 5: Dead Letter Queue

Handle permanently failed tasks for manual inspection.

@app.task(bind=True, max_retries=3)

def process_webhook(self, webhook_id: str, payload: dict) -> None:

    """Process webhook with DLQ for failures."""

    try:

        result = send_webhook(payload)

        if not result.success:

            raise WebhookFailedError(result.error)

    except Exception as e:

        if self.request.retries >= self.max_retries:

            # Move to dead letter queue for manual inspection

            dead_letter_queue.send({

                "task": "process_webhook",

                "webhook_id": webhook_id,

                "payload": payload,

                "error": str(e),

                "attempts": self.request.retries + 1,

                "failed_at": datetime.utcnow().isoformat(),

            })

            logger.error(

                "Webhook moved to DLQ after max retries",

                webhook_id=webhook_id,

                error=str(e),

            )

            return

        # Exponential backoff retry

        raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)

Pattern 6: Status Polling Endpoint

Provide an endpoint for clients to check job status.

from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.get("/jobs/{job_id}")

async def get_job_status(job_id: str) -> JobStatusResponse:

    """Get current status of a background job."""

    job = await jobs_repo.get(job_id)

    if job is None:

        raise HTTPException(404, f"Job {job_id} not found")

    return JobStatusResponse(

        job_id=job.id,

        status=job.status.value,

        created_at=job.created_at,

        started_at=job.started_at,

        completed_at=job.completed_at,

        result=job.result if job.status == JobStatus.SUCCEEDED else None,

        error=job.error if job.status == JobStatus.FAILED else None,

        # Helpful for clients

        is_terminal=job.status in (JobStatus.SUCCEEDED, JobStatus.FAILED),

    )

Pattern 7: Task Chaining and Workflows

Compose complex workflows from simple tasks.

from celery import chain, group, chord

# Simple chain: A → B → C

workflow = chain(

    extract_data.s(source_id),

    transform_data.s(),

    load_data.s(destination_id),

)

# Parallel execution: A, B, C all at once

parallel = group(

    send_email.s(user_email),

    send_sms.s(user_phone),

    update_analytics.s(event_data),

)

# Chord: Run tasks in parallel, then a callback

# Process all items, then send completion notification

workflow = chord(

    [process_item.s(item_id) for item_id in item_ids],

    send_completion_notification.s(batch_id),

)

workflow.apply_async()

Pattern 8: Alternative Task Queues

Choose the right tool for your needs.

RQ (Redis Queue): Simple, Redis-based

from rq import Queue

from redis import Redis

queue = Queue(connection=Redis())

job = queue.enqueue(send_email, "user@example.com", "Subject", "Body")

Dramatiq: Modern Celery alternative

import dramatiq

from dramatiq.brokers.redis import RedisBroker

dramatiq.set_broker(RedisBroker())

@dramatiq.actor

def send_email(to: str, subject: str, body: str) -> None:

    email_client.send(to, subject, body)

Cloud-native options:

  • AWS SQS + Lambda
  • Google Cloud Tasks
  • Azure Functions

Best Practices Summary

  • Return immediately - Don't block requests for long operations
  • Persist job state - Enable status polling and debugging
  • Make tasks idempotent - Safe to retry on any failure
  • Use idempotency keys - For external service calls
  • Set timeouts - Both soft and hard limits
  • Implement DLQ - Capture permanently failed tasks
  • Log transitions - Track job state changes
  • Retry appropriately - Exponential backoff for transient errors
  • Don't retry permanent failures - Validation errors, invalid credentials
  • Monitor queue depth - Alert on backlog growth
BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card