SKILL.md
Redis Patterns
Quick reference for Redis best practices across common backend use cases.
How It Works
Redis is an in-memory data structure store that supports strings, hashes, lists, sets, sorted sets, streams, and more. Individual Redis commands are atomic on a single instance; multi-step workflows require Lua scripts, MULTI/EXEC transactions, or explicit synchronization to stay atomic. Data is optionally persisted via RDB snapshots or AOF logs. Clients communicate over TCP using the RESP protocol; connection pools are essential to avoid per-request handshake overhead.
When to Activate
- Adding caching to an application
- Implementing rate limiting or throttling
- Building distributed locks or coordination
- Setting up session or token storage
- Using Pub/Sub or Redis Streams for messaging
- Configuring Redis in production (pooling, eviction, clustering)
Data Structure Cheat Sheet
Use Case
Structure
Example Key
Simple cache
String
product:123
User session
Hash
session:abc
Leaderboard
Sorted Set
scores:weekly
Unique visitors
Set
visitors:2024-01-01
Activity feed
List
feed:user:456
Event stream
Stream
events:orders
Counters / rate limits
String (INCR)
ratelimit:user:123
Bloom filter / HLL
HyperLogLog
hll:pageviews
Core Patterns
Cache-Aside (Lazy Loading)
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_product(product_id: int):
cache_key = f"product:{product_id}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
product = db.query("SELECT * FROM products WHERE id = %s", product_id)
r.setex(cache_key, 3600, json.dumps(product)) # TTL: 1 hour
return product
Write-Through Cache
def update_product(product_id: int, data: dict):
# Write to DB first
db.execute("UPDATE products SET ... WHERE id = %s", product_id)
# Immediately update cache
cache_key = f"product:{product_id}"
r.setex(cache_key, 3600, json.dumps(data))
Cache Invalidation
# Tag-based invalidation — group related keys under a set
def cache_product(product_id: int, category_id: int, data: dict):
key = f"product:{product_id}"
tag = f"tag:category:{category_id}"
pipe = r.pipeline(transaction=True)
pipe.setex(key, 3600, json.dumps(data))
pipe.sadd(tag, key)
pipe.expire(tag, 3600)
pipe.execute()
def invalidate_category(category_id: int):
tag = f"tag:category:{category_id}"
keys = r.smembers(tag)
if keys:
r.delete(*keys)
r.delete(tag)
Session Storage
import time
import uuid
def create_session(user_id: int, ttl: int = 86400) -> str:
session_id = str(uuid.uuid4())
key = f"session:{session_id}"
pipe = r.pipeline(transaction=True)
pipe.hset(key, mapping={
"user_id": user_id,
"created_at": int(time.time()),
})
pipe.expire(key, ttl)
pipe.execute()
return session_id
def get_session(session_id: str) -> dict | None:
data = r.hgetall(f"session:{session_id}")
return data if data else None
def delete_session(session_id: str):
r.delete(f"session:{session_id}")
Rate Limiting
Fixed Window (Simple)
def is_rate_limited(user_id: int, limit: int = 100, window: int = 60) -> bool:
key = f"ratelimit:{user_id}:{int(time.time()) // window}"
pipe = r.pipeline(transaction=True)
pipe.incr(key)
pipe.expire(key, window)
count, _ = pipe.execute()
return count > limit
Sliding Window (Lua — Atomic)
-- sliding_window.lua
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)
if count < limit then
-- Use unique member (now + sequence) to avoid collisions within the same millisecond
local seq_key = key .. ':seq'
local seq = redis.call('INCR', seq_key)
redis.call('EXPIRE', seq_key, math.ceil(window / 1000))
redis.call('ZADD', key, now, now .. '-' .. seq)
redis.call('EXPIRE', key, math.ceil(window / 1000))
return 1
end
return 0
sliding_window = r.register_script(open('sliding_window.lua').read())
def allow_request(user_id: int) -> bool:
key = f"ratelimit:sliding:{user_id}"
now = int(time.time() * 1000)
return bool(sliding_window(keys=[key], args=[now, 60000, 100]))
Distributed Locks
Distributed Lock (Single Node — SET NX PX)
import uuid
def acquire_lock(resource: str, ttl_ms: int = 5000) -> str | None:
lock_key = f"lock:{resource}"
token = str(uuid.uuid4())
acquired = r.set(lock_key, token, px=ttl_ms, nx=True)
return token if acquired else None
def release_lock(resource: str, token: str) -> bool:
release_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
result = r.eval(release_script, 1, f"lock:{resource}", token)
return bool(result)
# Usage
token = acquire_lock("order:payment:123")
if token:
try:
process_payment()
finally:
release_lock("order:payment:123", token)
For multi-node setups use the redlock-py library which implements the full Redlock algorithm.
Pub/Sub & Streams
Pub/Sub (Fire-and-Forget)
# Publisher
def publish_event(channel: str, payload: dict):
r.publish(channel, json.dumps(payload))
# Subscriber (blocking — run in separate thread/process)
def subscribe_events(channel: str):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
handle(json.loads(message['data']))
Redis Streams (Durable Queue)
# Producer
def emit(stream: str, event: dict):
r.xadd(stream, event, maxlen=10000) # Cap stream length
# Consumer group — guarantees at-least-once delivery
try:
r.xgroup_create('events:orders', 'processor', id='0', mkstream=True)
except Exception:
pass # Group already exists
def consume(stream: str, group: str, consumer: str):
while True:
messages = r.xreadgroup(group, consumer, {stream: '>'}, count=10, block=2000)
for _, entries in (messages or []):
for msg_id, data in entries:
process(data)
r.xack(stream, group, msg_id)
Prefer Streams over Pub/Sub when you need delivery guarantees, consumer groups, or replay.
Key Design
Naming Conventions
# Pattern: resource:id:field
user:123:profile
order:456:status
cache:product:789
# Pattern: namespace:resource:id
myapp:session:abc123
myapp:ratelimit:user:123
# Pattern: resource:date (time-bound keys)
stats:pageviews:2024-01-01
TTL Strategy
Data Type
Suggested TTL
User session
24h (86400)
API response cache
5–15 min
Rate limit window
Match window size
Short-lived tokens
5–10 min
Leaderboard
1h–24h
Static/reference data
1h–1 week
Always set a TTL. Keys without TTL accumulate indefinitely and cause memory pressure.
Connection Management
Connection Pooling
from redis import ConnectionPool, Redis
pool = ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=20,
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2,
)
r = Redis(connection_pool=pool)
Cluster Mode
from redis.cluster import RedisCluster
r = RedisCluster(
startup_nodes=[{"host": "redis-1", "port": 6379}],
decode_responses=True,
skip_full_coverage_check=True,
)
Sentinel (High Availability)
from redis.sentinel import Sentinel
sentinel = Sentinel(
[('sentinel-1', 26379), ('sentinel-2', 26379)],
socket_timeout=0.5,
)
master = sentinel.master_for('mymaster', decode_responses=True)
replica = sentinel.slave_for('mymaster', decode_responses=True)
Eviction Policies
Policy
Behavior
Best For
noeviction
Error on write when full
Queues / critical data
allkeys-lru
Evict least recently used
General cache
volatile-lru
LRU only among keys with TTL
Mixed data store
allkeys-lfu
Evict least frequently used
Skewed access patterns
volatile-ttl
Evict soonest-to-expire
Prioritize long-lived data
Set via redis.conf: maxmemory-policy allkeys-lru
Anti-Patterns
Anti-Pattern
Problem
Fix
Keys with no TTL
Memory grows unbounded
Always set TTL
KEYS * in production
Blocks the server (O(N))
Use SCAN cursor
Storing large blobs (>100KB)
Slow serialization, memory pressure
Store reference + fetch from object store
Single Redis for everything
No isolation between cache & queue
Use separate DBs or instances
Ignoring connection pool limits
Connection exhaustion under load
Size pool to workload
Not handling cache miss stampede
Thundering herd on cold start
Use locks or probabilistic early expiry
FLUSHALL without thought
Wipes entire instance
Scope deletes by key pattern
Cache Miss Stampede Prevention
import threading
_locks: dict[str, threading.Lock] = {}
_locks_mutex = threading.Lock()
def get_with_lock(key: str, fetch_fn, ttl: int = 300):
cached = r.get(key)
if cached:
return json.loads(cached)
with _locks_mutex:
if key not in _locks:
_locks[key] = threading.Lock()
lock = _locks[key]
with lock:
cached = r.get(key) # Re-check after acquiring lock
if cached:
return json.loads(cached)
value = fetch_fn()
r.setex(key, ttl, json.dumps(value))
return value
Note: for multi-process deployments, replace the in-process lock with acquire_lock/release_lock from the Distributed Locks section above.
Examples
Add caching to a Django/Flask API endpoint:
Use cache-aside with setex and a 5-minute TTL on the response. Key on the request parameters.
Rate-limit an API by user:
Use fixed-window with pipeline(transaction=True) for low-traffic endpoints; use sliding-window Lua for accurate per-user throttling.
Coordinate a background job across workers:
Use acquire_lock with a TTL that exceeds the expected job duration. Always release in a finally block.
Fan-out notifications to multiple subscribers:
Use Pub/Sub for fire-and-forget. Switch to Streams if you need guaranteed delivery or replay for late consumers.
Quick Reference
Pattern
When to Use
Cache-aside
Read-heavy, tolerate slight staleness
Write-through
Strong consistency required
Distributed lock
Prevent concurrent access to a resource
Sliding window rate limit
Accurate per-user throttling
Redis Streams
Durable event queue with consumer groups
Pub/Sub
Broadcast with no delivery guarantees needed
Sorted Set leaderboard
Ranked scoring, pagination
HyperLogLog
Approximate unique count at low memory
Related
- Skill:
postgres-patterns— relational data patterns
- Skill:
backend-patterns— API and service layer patterns
- Skill:
database-migrations— schema versioning
- Skill:
django-patterns— Django cache framework integration
- Agent:
database-reviewer— full database review workflow