microservices-patterns

Comprehensive guide to designing distributed systems with service boundaries, communication patterns, and resilience strategies. Covers service decomposition by business capability and domain-driven design, with the Strangler Fig pattern for gradual monolith migration Includes synchronous (REST, gRPC) and asynchronous (Kafka, message queues) communication patterns with event-driven architecture examples Provides Saga pattern implementation for distributed transactions with compensating actions and eventual consistency Features resilience patterns including Circuit Breaker, retry with exponential backoff, and bulkhead isolation to prevent cascade failures Includes production-ready Python code examples for API gateways, event buses, saga orchestration, and circuit breaker implementations

INSTALLATION
npx skills add https://github.com/wshobson/agents --skill microservices-patterns
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

Microservices Patterns

Master microservices architecture patterns including service boundaries, inter-service communication, data management, and resilience patterns for building distributed systems.

When to Use This Skill

  • Decomposing monoliths into microservices
  • Designing service boundaries and contracts
  • Implementing inter-service communication
  • Managing distributed data and transactions
  • Building resilient distributed systems
  • Implementing service discovery and load balancing
  • Designing event-driven architectures

Core Concepts

1. Service Decomposition Strategies

By Business Capability

  • Organize services around business functions
  • Each service owns its domain
  • Example: OrderService, PaymentService, InventoryService

By Subdomain (DDD)

  • Core domain, supporting subdomains
  • Bounded contexts map to services
  • Clear ownership and responsibility

Strangler Fig Pattern

  • Gradually extract from monolith
  • New functionality as microservices
  • Proxy routes to old/new systems

2. Communication Patterns

Synchronous (Request/Response)

  • REST APIs
  • gRPC
  • GraphQL

Asynchronous (Events/Messages)

  • Event streaming (Kafka)
  • Message queues (RabbitMQ, SQS)
  • Pub/Sub patterns

3. Data Management

Database Per Service

  • Each service owns its data
  • No shared databases
  • Loose coupling

Saga Pattern

  • Distributed transactions
  • Compensating actions
  • Eventual consistency

4. Resilience Patterns

Circuit Breaker

  • Fail fast on repeated errors
  • Prevent cascade failures

Retry with Backoff

  • Transient fault handling
  • Exponential backoff

Bulkhead

  • Isolate resources
  • Limit impact of failures

Service Decomposition Patterns

Pattern 1: By Business Capability

# E-commerce example

# Order Service

class OrderService:

    """Handles order lifecycle."""

    async def create_order(self, order_data: dict) -> Order:

        order = Order.create(order_data)

        # Publish event for other services

        await self.event_bus.publish(

            OrderCreatedEvent(

                order_id=order.id,

                customer_id=order.customer_id,

                items=order.items,

                total=order.total

            )

        )

        return order

# Payment Service (separate service)

class PaymentService:

    """Handles payment processing."""

    async def process_payment(self, payment_request: PaymentRequest) -> PaymentResult:

        # Process payment

        result = await self.payment_gateway.charge(

            amount=payment_request.amount,

            customer=payment_request.customer_id

        )

        if result.success:

            await self.event_bus.publish(

                PaymentCompletedEvent(

                    order_id=payment_request.order_id,

                    transaction_id=result.transaction_id

                )

            )

        return result

# Inventory Service (separate service)

class InventoryService:

    """Handles inventory management."""

    async def reserve_items(self, order_id: str, items: List[OrderItem]) -> ReservationResult:

        # Check availability

        for item in items:

            available = await self.inventory_repo.get_available(item.product_id)

            if available < item.quantity:

                return ReservationResult(

                    success=False,

                    error=f"Insufficient inventory for {item.product_id}"

                )

        # Reserve items

        reservation = await self.create_reservation(order_id, items)

        await self.event_bus.publish(

            InventoryReservedEvent(

                order_id=order_id,

                reservation_id=reservation.id

            )

        )

        return ReservationResult(success=True, reservation=reservation)

Pattern 2: API Gateway

from fastapi import FastAPI, HTTPException, Depends

import httpx

from circuitbreaker import circuit

app = FastAPI()

class APIGateway:

    """Central entry point for all client requests."""

    def __init__(self):

        self.order_service_url = "http://order-service:8000"

        self.payment_service_url = "http://payment-service:8001"

        self.inventory_service_url = "http://inventory-service:8002"

        self.http_client = httpx.AsyncClient(timeout=5.0)

    @circuit(failure_threshold=5, recovery_timeout=30)

    async def call_order_service(self, path: str, method: str = "GET", **kwargs):

        """Call order service with circuit breaker."""

        response = await self.http_client.request(

            method,

            f"{self.order_service_url}{path}",

            **kwargs

        )

        response.raise_for_status()

        return response.json()

    async def create_order_aggregate(self, order_id: str) -> dict:

        """Aggregate data from multiple services."""

        # Parallel requests

        order, payment, inventory = await asyncio.gather(

            self.call_order_service(f"/orders/{order_id}"),

            self.call_payment_service(f"/payments/order/{order_id}"),

            self.call_inventory_service(f"/reservations/order/{order_id}"),

            return_exceptions=True

        )

        # Handle partial failures

        result = {"order": order}

        if not isinstance(payment, Exception):

            result["payment"] = payment

        if not isinstance(inventory, Exception):

            result["inventory"] = inventory

        return result

@app.post("/api/orders")

async def create_order(

    order_data: dict,

    gateway: APIGateway = Depends()

):

    """API Gateway endpoint."""

    try:

        # Route to order service

        order = await gateway.call_order_service(

            "/orders",

            method="POST",

            json=order_data

        )

        return {"order": order}

    except httpx.HTTPError as e:

        raise HTTPException(status_code=503, detail="Order service unavailable")

Communication Patterns

Pattern 1: Synchronous REST Communication

# Service A calls Service B

import httpx

from tenacity import retry, stop_after_attempt, wait_exponential

class ServiceClient:

    """HTTP client with retries and timeout."""

    def __init__(self, base_url: str):

        self.base_url = base_url

        self.client = httpx.AsyncClient(

            timeout=httpx.Timeout(5.0, connect=2.0),

            limits=httpx.Limits(max_keepalive_connections=20)

        )

    @retry(

        stop=stop_after_attempt(3),

        wait=wait_exponential(multiplier=1, min=2, max=10)

    )

    async def get(self, path: str, **kwargs):

        """GET with automatic retries."""

        response = await self.client.get(f"{self.base_url}{path}", **kwargs)

        response.raise_for_status()

        return response.json()

    async def post(self, path: str, **kwargs):

        """POST request."""

        response = await self.client.post(f"{self.base_url}{path}", **kwargs)

        response.raise_for_status()

        return response.json()

# Usage

payment_client = ServiceClient("http://payment-service:8001")

result = await payment_client.post("/payments", json=payment_data)

Pattern 2: Asynchronous Event-Driven

# Event-driven communication with Kafka

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer

import json

from dataclasses import dataclass, asdict

from datetime import datetime

@dataclass

class DomainEvent:

    event_id: str

    event_type: str

    aggregate_id: str

    occurred_at: datetime

    data: dict

class EventBus:

    """Event publishing and subscription."""

    def __init__(self, bootstrap_servers: List[str]):

        self.bootstrap_servers = bootstrap_servers

        self.producer = None

    async def start(self):

        self.producer = AIOKafkaProducer(

            bootstrap_servers=self.bootstrap_servers,

            value_serializer=lambda v: json.dumps(v).encode()

        )

        await self.producer.start()

    async def publish(self, event: DomainEvent):

        """Publish event to Kafka topic."""

        topic = event.event_type

        await self.producer.send_and_wait(

            topic,

            value=asdict(event),

            key=event.aggregate_id.encode()

        )

    async def subscribe(self, topic: str, handler: callable):

        """Subscribe to events."""

        consumer = AIOKafkaConsumer(

            topic,

            bootstrap_servers=self.bootstrap_servers,

            value_deserializer=lambda v: json.loads(v.decode()),

            group_id="my-service"

        )

        await consumer.start()

        try:

            async for message in consumer:

                event_data = message.value

                await handler(event_data)

        finally:

            await consumer.stop()

# Order Service publishes event

async def create_order(order_data: dict):

    order = await save_order(order_data)

    event = DomainEvent(

        event_id=str(uuid.uuid4()),

        event_type="OrderCreated",

        aggregate_id=order.id,

        occurred_at=datetime.now(),

        data={

            "order_id": order.id,

            "customer_id": order.customer_id,

            "total": order.total

        }

    )

    await event_bus.publish(event)

# Inventory Service listens for OrderCreated

async def handle_order_created(event_data: dict):

    """React to order creation."""

    order_id = event_data["data"]["order_id"]

    items = event_data["data"]["items"]

    # Reserve inventory

    await reserve_inventory(order_id, items)

Pattern 3: Saga Pattern (Distributed Transactions)

# Saga orchestration for order fulfillment

from enum import Enum

from typing import List, Callable

class SagaStep:

    """Single step in saga."""

    def __init__(

        self,

        name: str,

        action: Callable,

        compensation: Callable

    ):

        self.name = name

        self.action = action

        self.compensation = compensation

class SagaStatus(Enum):

    PENDING = "pending"

    COMPLETED = "completed"

    COMPENSATING = "compensating"

    FAILED = "failed"

class OrderFulfillmentSaga:

    """Orchestrated saga for order fulfillment."""

    def __init__(self):

        self.steps: List[SagaStep] = [

            SagaStep(

                "create_order",

                action=self.create_order,

                compensation=self.cancel_order

            ),

            SagaStep(

                "reserve_inventory",

                action=self.reserve_inventory,

                compensation=self.release_inventory

            ),

            SagaStep(

                "process_payment",

                action=self.process_payment,

                compensation=self.refund_payment

            ),

            SagaStep(

                "confirm_order",

                action=self.confirm_order,

                compensation=self.cancel_order_confirmation

            )

        ]

    async def execute(self, order_data: dict) -> SagaResult:

        """Execute saga steps."""

        completed_steps = []

        context = {"order_data": order_data}

        try:

            for step in self.steps:

                # Execute step

                result = await step.action(context)

                if not result.success:

                    # Compensate

                    await self.compensate(completed_steps, context)

                    return SagaResult(

                        status=SagaStatus.FAILED,

                        error=result.error

                    )

                completed_steps.append(step)

                context.update(result.data)

            return SagaResult(status=SagaStatus.COMPLETED, data=context)

        except Exception as e:

            # Compensate on error

            await self.compensate(completed_steps, context)

            return SagaResult(status=SagaStatus.FAILED, error=str(e))

    async def compensate(self, completed_steps: List[SagaStep], context: dict):

        """Execute compensating actions in reverse order."""

        for step in reversed(completed_steps):

            try:

                await step.compensation(context)

            except Exception as e:

                # Log compensation failure

                print(f"Compensation failed for {step.name}: {e}")

    # Step implementations

    async def create_order(self, context: dict) -> StepResult:

        order = await order_service.create(context["order_data"])

        return StepResult(success=True, data={"order_id": order.id})

    async def cancel_order(self, context: dict):

        await order_service.cancel(context["order_id"])

    async def reserve_inventory(self, context: dict) -> StepResult:

        result = await inventory_service.reserve(

            context["order_id"],

            context["order_data"]["items"]

        )

        return StepResult(

            success=result.success,

            data={"reservation_id": result.reservation_id}

        )

    async def release_inventory(self, context: dict):

        await inventory_service.release(context["reservation_id"])

    async def process_payment(self, context: dict) -> StepResult:

        result = await payment_service.charge(

            context["order_id"],

            context["order_data"]["total"]

        )

        return StepResult(

            success=result.success,

            data={"transaction_id": result.transaction_id},

            error=result.error

        )

    async def refund_payment(self, context: dict):

        await payment_service.refund(context["transaction_id"])

Resilience Patterns

Circuit Breaker Pattern

from enum import Enum

from datetime import datetime, timedelta

from typing import Callable, Any

class CircuitState(Enum):

    CLOSED = "closed"  # Normal operation

    OPEN = "open"      # Failing, reject requests

    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:

    """Circuit breaker for service calls."""

    def __init__(

        self,

        failure_threshold: int = 5,

        recovery_timeout: int = 30,

        success_threshold: int = 2

    ):

        self.failure_threshold = failure_threshold

        self.recovery_timeout = recovery_timeout

        self.success_threshold = success_threshold

        self.failure_count = 0

        self.success_count = 0

        self.state = CircuitState.CLOSED

        self.opened_at = None

    async def call(self, func: Callable, *args, **kwargs) -> Any:

        """Execute function with circuit breaker."""

        if self.state == CircuitState.OPEN:

            if self._should_attempt_reset():

                self.state = CircuitState.HALF_OPEN

            else:

                raise CircuitBreakerOpenError("Circuit breaker is open")

        try:

            result = await func(*args, **kwargs)

            self._on_success()

            return result

        except Exception as e:

            self._on_failure()

            raise

    def _on_success(self):

        """Handle successful call."""

        self.failure_count = 0

        if self.state == CircuitState.HALF_OPEN:

            self.success_count += 1

            if self.success_count >= self.success_threshold:

                self.state = CircuitState.CLOSED

                self.success_count = 0

    def _on_failure(self):

        """Handle failed call."""

        self.failure_count += 1

        if self.failure_count >= self.failure_threshold:

            self.state = CircuitState.OPEN

            self.opened_at = datetime.now()

        if self.state == CircuitState.HALF_OPEN:

            self.state = CircuitState.OPEN

            self.opened_at = datetime.now()

    def _should_attempt_reset(self) -> bool:

        """Check if enough time passed to try again."""

        return (

            datetime.now() - self.opened_at

            > timedelta(seconds=self.recovery_timeout)

        )

# Usage

breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)

async def call_payment_service(payment_data: dict):

    return await breaker.call(

        payment_client.process_payment,

        payment_data

    )
BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card