python-observability

Structured logging, metrics, and distributed tracing patterns for Python production systems. Covers four core observability areas: structured JSON logging with structlog, Prometheus metrics for the four golden signals (latency, traffic, errors, saturation), correlation ID propagation across service boundaries, and OpenTelemetry distributed tracing Includes semantic log level guidance, bounded cardinality rules for metrics to prevent storage explosion, and context manager patterns for consistent operation timing Provides FastAPI middleware examples for correlation ID injection, decorator patterns for request instrumentation, and best practices for avoiding unbounded metric labels Demonstrates end-to-end patterns from request ingress through downstream service calls, with code examples for logging, metrics collection, and span creation

INSTALLATION
npx skills add https://github.com/wshobson/agents --skill python-observability
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

Python Observability

Instrument Python applications with structured logs, metrics, and traces. When something breaks in production, you need to answer "what, where, and why" without deploying new code.

When to Use This Skill

  • Adding structured logging to applications
  • Implementing metrics collection with Prometheus
  • Setting up distributed tracing across services
  • Propagating correlation IDs through request chains
  • Debugging production issues
  • Building observability dashboards

Core Concepts

1. Structured Logging

Emit logs as JSON with consistent fields for production environments. Machine-readable logs enable powerful queries and alerts. For local development, consider human-readable formats.

2. The Four Golden Signals

Track latency, traffic, errors, and saturation for every service boundary.

3. Correlation IDs

Thread a unique ID through all logs and spans for a single request, enabling end-to-end tracing.

4. Bounded Cardinality

Keep metric label values bounded. Unbounded labels (like user IDs) explode storage costs.

Quick Start

import structlog

structlog.configure(

    processors=[

        structlog.processors.TimeStamper(fmt="iso"),

        structlog.processors.JSONRenderer(),

    ],

)

logger = structlog.get_logger()

logger.info("Request processed", user_id="123", duration_ms=45)

Fundamental Patterns

Pattern 1: Structured Logging with Structlog

Configure structlog for JSON output with consistent fields.

import logging

import structlog

def configure_logging(log_level: str = "INFO") -> None:

    """Configure structured logging for the application."""

    structlog.configure(

        processors=[

            structlog.contextvars.merge_contextvars,

            structlog.processors.add_log_level,

            structlog.processors.TimeStamper(fmt="iso"),

            structlog.processors.StackInfoRenderer(),

            structlog.processors.format_exc_info,

            structlog.processors.JSONRenderer(),

        ],

        wrapper_class=structlog.make_filtering_bound_logger(

            getattr(logging, log_level.upper())

        ),

        context_class=dict,

        logger_factory=structlog.PrintLoggerFactory(),

        cache_logger_on_first_use=True,

    )

# Initialize at application startup

configure_logging("INFO")

logger = structlog.get_logger()

Pattern 2: Consistent Log Fields

Every log entry should include standard fields for filtering and correlation.

import structlog

from contextvars import ContextVar

# Store correlation ID in context

correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

logger = structlog.get_logger()

def process_request(request: Request) -> Response:

    """Process request with structured logging."""

    logger.info(

        "Request received",

        correlation_id=correlation_id.get(),

        method=request.method,

        path=request.path,

        user_id=request.user_id,

    )

    try:

        result = handle_request(request)

        logger.info(

            "Request completed",

            correlation_id=correlation_id.get(),

            status_code=200,

            duration_ms=elapsed,

        )

        return result

    except Exception as e:

        logger.error(

            "Request failed",

            correlation_id=correlation_id.get(),

            error_type=type(e).__name__,

            error_message=str(e),

        )

        raise

Pattern 3: Semantic Log Levels

Use log levels consistently across the application.

Level

Purpose

Examples

DEBUG

Development diagnostics

Variable values, internal state

INFO

Request lifecycle, operations

Request start/end, job completion

WARNING

Recoverable anomalies

Retry attempts, fallback used

ERROR

Failures needing attention

Exceptions, service unavailable

# DEBUG: Detailed internal information

logger.debug("Cache lookup", key=cache_key, hit=cache_hit)

# INFO: Normal operational events

logger.info("Order created", order_id=order.id, total=order.total)

# WARNING: Abnormal but handled situations

logger.warning(

    "Rate limit approaching",

    current_rate=950,

    limit=1000,

    reset_seconds=30,

)

# ERROR: Failures requiring investigation

logger.error(

    "Payment processing failed",

    order_id=order.id,

    error=str(e),

    payment_provider="stripe",

)

Never log expected behavior at ERROR. A user entering a wrong password is INFO, not ERROR.

Pattern 4: Correlation ID Propagation

Generate a unique ID at ingress and thread it through all operations.

from contextvars import ContextVar

import uuid

import structlog

correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

def set_correlation_id(cid: str | None = None) -> str:

    """Set correlation ID for current context."""

    cid = cid or str(uuid.uuid4())

    correlation_id.set(cid)

    structlog.contextvars.bind_contextvars(correlation_id=cid)

    return cid

# FastAPI middleware example

from fastapi import Request

async def correlation_middleware(request: Request, call_next):

    """Middleware to set and propagate correlation ID."""

    # Use incoming header or generate new

    cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())

    set_correlation_id(cid)

    response = await call_next(request)

    response.headers["X-Correlation-ID"] = cid

    return response

Propagate to outbound requests:

import httpx

async def call_downstream_service(endpoint: str, data: dict) -> dict:

    """Call downstream service with correlation ID."""

    async with httpx.AsyncClient() as client:

        response = await client.post(

            endpoint,

            json=data,

            headers={"X-Correlation-ID": correlation_id.get()},

        )

        return response.json()

Advanced Patterns

Pattern 5: The Four Golden Signals with Prometheus

Track these metrics for every service boundary:

from prometheus_client import Counter, Histogram, Gauge

# Latency: How long requests take

REQUEST_LATENCY = Histogram(

    "http_request_duration_seconds",

    "Request latency in seconds",

    ["method", "endpoint", "status"],

    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],

)

# Traffic: Request rate

REQUEST_COUNT = Counter(

    "http_requests_total",

    "Total HTTP requests",

    ["method", "endpoint", "status"],

)

# Errors: Error rate

ERROR_COUNT = Counter(

    "http_errors_total",

    "Total HTTP errors",

    ["method", "endpoint", "error_type"],

)

# Saturation: Resource utilization

DB_POOL_USAGE = Gauge(

    "db_connection_pool_used",

    "Number of database connections in use",

)

Instrument your endpoints:

import time

from functools import wraps

def track_request(func):

    """Decorator to track request metrics."""

    @wraps(func)

    async def wrapper(request: Request, *args, **kwargs):

        method = request.method

        endpoint = request.url.path

        start = time.perf_counter()

        try:

            response = await func(request, *args, **kwargs)

            status = str(response.status_code)

            return response

        except Exception as e:

            status = "500"

            ERROR_COUNT.labels(

                method=method,

                endpoint=endpoint,

                error_type=type(e).__name__,

            ).inc()

            raise

        finally:

            duration = time.perf_counter() - start

            REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()

            REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration)

    return wrapper

Pattern 6: Bounded Cardinality

Avoid labels with unbounded values to prevent metric explosion.

# BAD: User ID has potentially millions of values

REQUEST_COUNT.labels(method="GET", user_id=user.id)  # Don't do this!

# GOOD: Bounded values only

REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200")

# If you need per-user metrics, use a different approach:

# - Log the user_id and query logs

# - Use a separate analytics system

# - Bucket users by type/tier

REQUEST_COUNT.labels(

    method="GET",

    endpoint="/users",

    user_tier="premium",  # Bounded set of values

)

Pattern 7: Timed Operations with Context Manager

Create a reusable timing context manager for operations.

from contextlib import contextmanager

import time

import structlog

logger = structlog.get_logger()

@contextmanager

def timed_operation(name: str, **extra_fields):

    """Context manager for timing and logging operations."""

    start = time.perf_counter()

    logger.debug("Operation started", operation=name, **extra_fields)

    try:

        yield

    except Exception as e:

        elapsed_ms = (time.perf_counter() - start) * 1000

        logger.error(

            "Operation failed",

            operation=name,

            duration_ms=round(elapsed_ms, 2),

            error=str(e),

            **extra_fields,

        )

        raise

    else:

        elapsed_ms = (time.perf_counter() - start) * 1000

        logger.info(

            "Operation completed",

            operation=name,

            duration_ms=round(elapsed_ms, 2),

            **extra_fields,

        )

# Usage

with timed_operation("fetch_user_orders", user_id=user.id):

    orders = await order_repository.get_by_user(user.id)

Pattern 8: OpenTelemetry Tracing

Set up distributed tracing with OpenTelemetry.

Note: OpenTelemetry is actively evolving. Check the official Python documentation for the latest API patterns and best practices.

from opentelemetry import trace

from opentelemetry.sdk.trace import TracerProvider

from opentelemetry.sdk.trace.export import BatchSpanProcessor

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

def configure_tracing(service_name: str, otlp_endpoint: str) -> None:

    """Configure OpenTelemetry tracing."""

    provider = TracerProvider()

    processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))

    provider.add_span_processor(processor)

    trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)

async def process_order(order_id: str) -> Order:

    """Process order with tracing."""

    with tracer.start_as_current_span("process_order") as span:

        span.set_attribute("order.id", order_id)

        with tracer.start_as_current_span("validate_order"):

            validate_order(order_id)

        with tracer.start_as_current_span("charge_payment"):

            charge_payment(order_id)

        with tracer.start_as_current_span("send_confirmation"):

            send_confirmation(order_id)

        return order

Best Practices Summary

  • Use structured logging - JSON logs with consistent fields
  • Propagate correlation IDs - Thread through all requests and logs
  • Track the four golden signals - Latency, traffic, errors, saturation
  • Bound label cardinality - Never use unbounded values as metric labels
  • Log at appropriate levels - Don't cry wolf with ERROR
  • Include context - User ID, request ID, operation name in logs
  • Use context managers - Consistent timing and error handling
  • Separate concerns - Observability code shouldn't pollute business logic
  • Test your observability - Verify logs and metrics in integration tests
  • Set up alerts - Metrics are useless without alerting
BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card