redis-patterns

Redis data structure patterns, caching strategies, distributed locks, rate limiting, pub/sub, and connection management for production applications.

INSTALLATION
npx skills add https://github.com/affaan-m/everything-claude-code --skill redis-patterns
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

Redis Patterns

Quick reference for Redis best practices across common backend use cases.

How It Works

Redis is an in-memory data structure store that supports strings, hashes, lists, sets, sorted sets, streams, and more. Individual Redis commands are atomic on a single instance; multi-step workflows require Lua scripts, MULTI/EXEC transactions, or explicit synchronization to stay atomic. Data is optionally persisted via RDB snapshots or AOF logs. Clients communicate over TCP using the RESP protocol; connection pools are essential to avoid per-request handshake overhead.

When to Activate

  • Adding caching to an application
  • Implementing rate limiting or throttling
  • Building distributed locks or coordination
  • Setting up session or token storage
  • Using Pub/Sub or Redis Streams for messaging
  • Configuring Redis in production (pooling, eviction, clustering)

Data Structure Cheat Sheet

Use Case

Structure

Example Key

Simple cache

String

product:123

User session

Hash

session:abc

Leaderboard

Sorted Set

scores:weekly

Unique visitors

Set

visitors:2024-01-01

Activity feed

List

feed:user:456

Event stream

Stream

events:orders

Counters / rate limits

String (INCR)

ratelimit:user:123

Bloom filter / HLL

HyperLogLog

hll:pageviews

Core Patterns

Cache-Aside (Lazy Loading)

import redis

import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def get_product(product_id: int):

    cache_key = f"product:{product_id}"

    cached = r.get(cache_key)

    if cached:

        return json.loads(cached)

    product = db.query("SELECT * FROM products WHERE id = %s", product_id)

    r.setex(cache_key, 3600, json.dumps(product))  # TTL: 1 hour

    return product

Write-Through Cache

def update_product(product_id: int, data: dict):

    # Write to DB first

    db.execute("UPDATE products SET ... WHERE id = %s", product_id)

    # Immediately update cache

    cache_key = f"product:{product_id}"

    r.setex(cache_key, 3600, json.dumps(data))

Cache Invalidation

# Tag-based invalidation — group related keys under a set

def cache_product(product_id: int, category_id: int, data: dict):

    key = f"product:{product_id}"

    tag = f"tag:category:{category_id}"

    pipe = r.pipeline(transaction=True)

    pipe.setex(key, 3600, json.dumps(data))

    pipe.sadd(tag, key)

    pipe.expire(tag, 3600)

    pipe.execute()

def invalidate_category(category_id: int):

    tag = f"tag:category:{category_id}"

    keys = r.smembers(tag)

    if keys:

        r.delete(*keys)

    r.delete(tag)

Session Storage

import time

import uuid

def create_session(user_id: int, ttl: int = 86400) -> str:

    session_id = str(uuid.uuid4())

    key = f"session:{session_id}"

    pipe = r.pipeline(transaction=True)

    pipe.hset(key, mapping={

        "user_id": user_id,

        "created_at": int(time.time()),

    })

    pipe.expire(key, ttl)

    pipe.execute()

    return session_id

def get_session(session_id: str) -> dict | None:

    data = r.hgetall(f"session:{session_id}")

    return data if data else None

def delete_session(session_id: str):

    r.delete(f"session:{session_id}")

Rate Limiting

Fixed Window (Simple)

def is_rate_limited(user_id: int, limit: int = 100, window: int = 60) -> bool:

    key = f"ratelimit:{user_id}:{int(time.time()) // window}"

    pipe = r.pipeline(transaction=True)

    pipe.incr(key)

    pipe.expire(key, window)

    count, _ = pipe.execute()

    return count > limit

Sliding Window (Lua — Atomic)

-- sliding_window.lua

local key = KEYS[1]

local now = tonumber(ARGV[1])

local window = tonumber(ARGV[2])

local limit = tonumber(ARGV[3])

redis.call('ZREMRANGEBYSCORE', key, 0, now - window)

local count = redis.call('ZCARD', key)

if count < limit then

    -- Use unique member (now + sequence) to avoid collisions within the same millisecond

    local seq_key = key .. ':seq'

    local seq = redis.call('INCR', seq_key)

    redis.call('EXPIRE', seq_key, math.ceil(window / 1000))

    redis.call('ZADD', key, now, now .. '-' .. seq)

    redis.call('EXPIRE', key, math.ceil(window / 1000))

    return 1

end

return 0
sliding_window = r.register_script(open('sliding_window.lua').read())

def allow_request(user_id: int) -> bool:

    key = f"ratelimit:sliding:{user_id}"

    now = int(time.time() * 1000)

    return bool(sliding_window(keys=[key], args=[now, 60000, 100]))

Distributed Locks

Distributed Lock (Single Node — SET NX PX)

import uuid

def acquire_lock(resource: str, ttl_ms: int = 5000) -> str | None:

    lock_key = f"lock:{resource}"

    token = str(uuid.uuid4())

    acquired = r.set(lock_key, token, px=ttl_ms, nx=True)

    return token if acquired else None

def release_lock(resource: str, token: str) -> bool:

    release_script = """

    if redis.call('get', KEYS[1]) == ARGV[1] then

        return redis.call('del', KEYS[1])

    else

        return 0

    end

    """

    result = r.eval(release_script, 1, f"lock:{resource}", token)

    return bool(result)

# Usage

token = acquire_lock("order:payment:123")

if token:

    try:

        process_payment()

    finally:

        release_lock("order:payment:123", token)

For multi-node setups use the redlock-py library which implements the full Redlock algorithm.

Pub/Sub &#x26; Streams

Pub/Sub (Fire-and-Forget)

# Publisher

def publish_event(channel: str, payload: dict):

    r.publish(channel, json.dumps(payload))

# Subscriber (blocking — run in separate thread/process)

def subscribe_events(channel: str):

    pubsub = r.pubsub()

    pubsub.subscribe(channel)

    for message in pubsub.listen():

        if message['type'] == 'message':

            handle(json.loads(message['data']))

Redis Streams (Durable Queue)

# Producer

def emit(stream: str, event: dict):

    r.xadd(stream, event, maxlen=10000)  # Cap stream length

# Consumer group — guarantees at-least-once delivery

try:

    r.xgroup_create('events:orders', 'processor', id='0', mkstream=True)

except Exception:

    pass  # Group already exists

def consume(stream: str, group: str, consumer: str):

    while True:

        messages = r.xreadgroup(group, consumer, {stream: '>'}, count=10, block=2000)

        for _, entries in (messages or []):

            for msg_id, data in entries:

                process(data)

                r.xack(stream, group, msg_id)

Prefer Streams over Pub/Sub when you need delivery guarantees, consumer groups, or replay.

Key Design

Naming Conventions

# Pattern: resource:id:field

user:123:profile

order:456:status

cache:product:789

# Pattern: namespace:resource:id

myapp:session:abc123

myapp:ratelimit:user:123

# Pattern: resource:date (time-bound keys)

stats:pageviews:2024-01-01

TTL Strategy

Data Type

Suggested TTL

User session

24h (86400)

API response cache

5–15 min

Rate limit window

Match window size

Short-lived tokens

5–10 min

Leaderboard

1h–24h

Static/reference data

1h–1 week

Always set a TTL. Keys without TTL accumulate indefinitely and cause memory pressure.

Connection Management

Connection Pooling

from redis import ConnectionPool, Redis

pool = ConnectionPool(

    host='localhost',

    port=6379,

    db=0,

    max_connections=20,

    decode_responses=True,

    socket_connect_timeout=2,

    socket_timeout=2,

)

r = Redis(connection_pool=pool)

Cluster Mode

from redis.cluster import RedisCluster

r = RedisCluster(

    startup_nodes=[{"host": "redis-1", "port": 6379}],

    decode_responses=True,

    skip_full_coverage_check=True,

)

Sentinel (High Availability)

from redis.sentinel import Sentinel

sentinel = Sentinel(

    [('sentinel-1', 26379), ('sentinel-2', 26379)],

    socket_timeout=0.5,

)

master = sentinel.master_for('mymaster', decode_responses=True)

replica = sentinel.slave_for('mymaster', decode_responses=True)

Eviction Policies

Policy

Behavior

Best For

noeviction

Error on write when full

Queues / critical data

allkeys-lru

Evict least recently used

General cache

volatile-lru

LRU only among keys with TTL

Mixed data store

allkeys-lfu

Evict least frequently used

Skewed access patterns

volatile-ttl

Evict soonest-to-expire

Prioritize long-lived data

Set via redis.conf: maxmemory-policy allkeys-lru

Anti-Patterns

Anti-Pattern

Problem

Fix

Keys with no TTL

Memory grows unbounded

Always set TTL

KEYS * in production

Blocks the server (O(N))

Use SCAN cursor

Storing large blobs (>100KB)

Slow serialization, memory pressure

Store reference + fetch from object store

Single Redis for everything

No isolation between cache &#x26; queue

Use separate DBs or instances

Ignoring connection pool limits

Connection exhaustion under load

Size pool to workload

Not handling cache miss stampede

Thundering herd on cold start

Use locks or probabilistic early expiry

FLUSHALL without thought

Wipes entire instance

Scope deletes by key pattern

Cache Miss Stampede Prevention

import threading

_locks: dict[str, threading.Lock] = {}

_locks_mutex = threading.Lock()

def get_with_lock(key: str, fetch_fn, ttl: int = 300):

    cached = r.get(key)

    if cached:

        return json.loads(cached)

    with _locks_mutex:

        if key not in _locks:

            _locks[key] = threading.Lock()

        lock = _locks[key]

    with lock:

        cached = r.get(key)  # Re-check after acquiring lock

        if cached:

            return json.loads(cached)

        value = fetch_fn()

        r.setex(key, ttl, json.dumps(value))

        return value

Note: for multi-process deployments, replace the in-process lock with acquire_lock/release_lock from the Distributed Locks section above.

Examples

Add caching to a Django/Flask API endpoint:

Use cache-aside with setex and a 5-minute TTL on the response. Key on the request parameters.

Rate-limit an API by user:

Use fixed-window with pipeline(transaction=True) for low-traffic endpoints; use sliding-window Lua for accurate per-user throttling.

Coordinate a background job across workers:

Use acquire_lock with a TTL that exceeds the expected job duration. Always release in a finally block.

Fan-out notifications to multiple subscribers:

Use Pub/Sub for fire-and-forget. Switch to Streams if you need guaranteed delivery or replay for late consumers.

Quick Reference

Pattern

When to Use

Cache-aside

Read-heavy, tolerate slight staleness

Write-through

Strong consistency required

Distributed lock

Prevent concurrent access to a resource

Sliding window rate limit

Accurate per-user throttling

Redis Streams

Durable event queue with consumer groups

Pub/Sub

Broadcast with no delivery guarantees needed

Sorted Set leaderboard

Ranked scoring, pagination

HyperLogLog

Approximate unique count at low memory

Related

  • Skill: postgres-patterns — relational data patterns
  • Skill: backend-patterns — API and service layer patterns
  • Skill: database-migrations — schema versioning
  • Skill: django-patterns — Django cache framework integration
  • Agent: database-reviewer — full database review workflow
BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card