Blog post image for Redis Caching Patterns: Cache-Aside, Write-Through & Cache Invalidation - Master production-ready Redis caching patterns with practical examples. Learn cache-aside (lazy loading), write-through, consistency patterns, TTL strategies, and cache invalidation techniques to reduce database load and improve application performance.
Codesnippets

Redis Caching Patterns: Cache-Aside, Write-Through & Cache Invalidation

Redis Caching Patterns: Cache-Aside, Write-Through & Cache Invalidation

05 Mins read

Need to scale your backend without throwing money at servers? Redis caching patterns are your answer.

Most databases can handle hundreds of queries per second, but thousands? Your app slows to a crawl. Adding another database server just moves the problem. The real solution: serve data from memory 99% of the time using Redis.

The Problem

Database Load Under Scale

100 requests/sec for "get product details":
• Database: 200ms per query
• 100 × 200ms = serious bottleneck
• Add 1000 concurrent users = complete collapse
• Throwing more servers doesn't help (they all queue at the database)

The Solution

Redis Caching Patterns

Redis stores data in memory, delivering results in microseconds instead of milliseconds. But which caching pattern you choose determines whether your cache helps or hurts.

TL;DR

  • Cache-Aside: Check cache first, miss triggers database load + populate cache (most flexible)
  • Write-Through: Update cache and database together (most consistent)
  • TTL Strategy: Keys expire automatically, preventing stale data without manual invalidation
  • Cache Invalidation: Delete cache when data updates, triggering refresh on next request
  • Stampede Prevention: Prevent thundering herd when cache expires or misses expire

Pattern 1: Cache-Aside (Lazy Loading)

How It Works

1. Request comes in
2. Check Redis cache
3. Cache HIT → return immediately (microseconds)
4. Cache MISS → query database (milliseconds)
5. Populate cache with TTL
6. Next request hits cache

Python Implementation

cache_aside_python.py
import redis
import json
from functools import wraps
import time
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
def cache_aside(ttl=3600):
"""Decorator implementing cache-aside pattern"""
def decorator(func):
@wraps(func)
def wrapper(key, *args, **kwargs):
# Check cache first
cached = redis_client.get(key)
if cached:
# Cache hit
return json.loads(cached)
# Cache miss: call function (hits database)
result = func(*args, **kwargs)
# Populate cache with TTL
redis_client.setex(key, ttl, json.dumps(result))
return result
return wrapper
return decorator
# Database query (expensive)
def get_user_from_db(user_id):
# Simulated database query
time.sleep(0.2) # 200ms
return {"id": user_id, "name": f"User {user_id}", "email": f"user{user_id}@example.com"}
# Decorated function
@cache_aside(ttl=3600)
def get_user(user_id):
return get_user_from_db(user_id)
# Usage
print(get_user(f"user:1")) # First call: 200ms (cache miss)
print(get_user(f"user:1")) # Second call: <1ms (cache hit)

Node.js Implementation

cache_aside_node.ts
import {createClient} from 'redis';
import {promisify} from 'util';
const redis = createClient();
redis.connect();
const getAsync = promisify(redis.get).bind(redis);
const setexAsync = promisify(redis.setex).bind(redis);
// Database query (expensive)
async function getUserFromDB(userId: string) {
// Simulated database query
await new Promise((resolve) => setTimeout(resolve, 200));
return {
id: userId,
name: `User ${userId}`,
email: `user${userId}@example.com`,
};
}
// Cache-aside implementation
async function getUser(userId: string, ttl = 3600) {
const cacheKey = `user:${userId}`;
// Check cache
const cached = await getAsync(cacheKey);
if (cached) {
return JSON.parse(cached);
}
// Cache miss: hit database
const user = await getUserFromDB(userId);
// Populate cache
await setexAsync(cacheKey, ttl, JSON.stringify(user));
return user;
}
// Usage
(async () => {
console.time('first');
await getUser('user:1'); // 200ms (miss)
console.timeEnd('first');
console.time('second');
await getUser('user:1'); // <1ms (hit)
console.timeEnd('second');
})();

Go Implementation

cache_aside_go.go
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/redis/go-redis/v9"
"time"
)
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
var rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
func getUserFromDB(ctx context.Context, userID int) (User, error) {
// Simulated database query
time.Sleep(200 * time.Millisecond)
return User{
ID: userID,
Name: fmt.Sprintf("User %d", userID),
Email: fmt.Sprintf("user%d@example.com", userID),
}, nil
}
func getUser(ctx context.Context, userID int) (User, error) {
cacheKey := fmt.Sprintf("user:%d", userID)
// Check cache
val, err := rdb.Get(ctx, cacheKey).Result()
if err == nil {
var user User
json.Unmarshal([]byte(val), &user)
return user, nil
}
// Cache miss: hit database
user, err := getUserFromDB(ctx, userID)
if err != nil {
return user, err
}
// Populate cache (1 hour TTL)
data, _ := json.Marshal(user)
rdb.SetEx(ctx, cacheKey, string(data), time.Hour)
return user, nil
}

Pattern 2: Write-Through

How It Works

1. Data update arrives
2. Update cache AND database together
3. Both succeed or both fail (atomic)
4. Next read hits cache (always consistent)

When to Use

  • Critical data (payments, user accounts)
  • Data that changes frequently
  • When consistency is more important than speed

Python Implementation

write_through_python.py
import redis
import json
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
def update_user_write_through(user_id: int, user_data: dict):
"""Update cache AND database together"""
cache_key = f"user:{user_id}"
try:
# Update cache first (faster)
redis_client.setex(cache_key, 3600, json.dumps(user_data))
# Then update database
# Assuming `db.update_user(user_id, user_data)` exists
db.update_user(user_id, user_data)
return True
except Exception as e:
# If database fails, invalidate cache
redis_client.delete(cache_key)
raise e
# Usage
update_user_write_through(42, {
"name": "Alice",
"email": "[email protected]"
})
# Next read hits cache
user = redis_client.get("user:42")

Node.js Implementation

write_through_node.ts
async function updateUserWriteThrough(userId: string, userData: any) {
const cacheKey = `user:${userId}`;
try {
// Update cache first
await redis.setex(cacheKey, 3600, JSON.stringify(userData));
// Then update database
await db.updateUser(userId, userData);
return true;
} catch (error) {
// Rollback: invalidate cache on database failure
await redis.del(cacheKey);
throw error;
}
}

Pattern 3: TTL Strategy

Automatic Expiration

Redis automatically deletes keys after TTL expires. No manual invalidation needed.

ttl_strategies.py
import redis
from datetime import datetime, timedelta
redis_client = redis.Redis()
# Different TTL for different data types
def cache_with_ttl(key: str, value: str, data_type: str):
ttl_map = {
"user_profile": 3600, # 1 hour (changes infrequently)
"product": 86400, # 24 hours (stable data)
"product_price": 300, # 5 minutes (changes frequently)
"session": 1800, # 30 minutes (security)
"statistics": 60, # 1 minute (real-time)
}
ttl = ttl_map.get(data_type, 3600)
redis_client.setex(key, ttl, value)
# Usage
cache_with_ttl("product:123", "iPhone 15", "product") # 24 hours
cache_with_ttl("price:apple", "$999", "product_price") # 5 minutes
cache_with_ttl("stats:daily", "1000 users", "statistics") # 1 minute

Sliding Window TTL

Reset TTL on each access (useful for sessions):

sliding_ttl.py
def get_with_sliding_ttl(key: str, ttl: int):
"""Get value and reset TTL"""
value = redis_client.get(key)
if value:
# Reset TTL on access
redis_client.expire(key, ttl)
return value
# Usage: Session stays alive as long as user is active
session = get_with_sliding_ttl("session:abc123", 1800) # 30 min

Pattern 4: Cache Invalidation

Event-Driven Invalidation

Delete cache when data changes:

invalidation_event.py
def update_product(product_id: int, new_data: dict):
# Update database
db.update_product(product_id, new_data)
# Invalidate cache immediately
redis_client.delete(f"product:{product_id}")
# Also invalidate related caches (cascade invalidation)
redis_client.delete(f"category:{new_data['category']}")
redis_client.delete("products:all")
# When product price changes
update_product(123, {"price": "$899", "category": "electronics"})
# → Deletes "product:123", "category:electronics", "products:all"

Tag-Based Invalidation

Invalidate multiple keys with one operation:

tag_invalidation.py
import json
def cache_with_tags(key: str, value: str, tags: list):
"""Cache value and tag it for bulk invalidation"""
# Store value
redis_client.setex(key, 3600, value)
# Store tag reference (set of keys with this tag)
for tag in tags:
redis_client.sadd(f"tag:{tag}", key)
def invalidate_by_tag(tag: str):
"""Invalidate all keys with a tag"""
# Get all keys with this tag
keys = redis_client.smembers(f"tag:{tag}")
# Delete all tagged keys
if keys:
redis_client.delete(*keys)
# Clean up tag
redis_client.delete(f"tag:{tag}")
# Usage
cache_with_tags("product:1", "iPhone", ["electronics", "apple"])
cache_with_tags("product:2", "MacBook", ["electronics", "apple"])
cache_with_tags("product:3", "Apple Watch", ["wearables", "apple"])
# Invalidate all Apple products at once
invalidate_by_tag("apple")

Preventing Cache Stampedes

The Problem

Thousands of requests for a popular key
Key expires
ALL requests hit database simultaneously (thundering herd)
Database overload, service degradation

Solution: Probabilistic Regeneration

stampede_prevention.py
import redis
import random
import time
redis_client = redis.Redis()
def get_with_low_ttl_regen(key: str, expensive_calculation, ttl=3600):
"""
Regenerate cache probabilistically near expiration
Avoids thundering herd when TTL reaches 0
"""
value = redis_client.get(key)
if value:
# Check TTL
remaining_ttl = redis_client.ttl(key)
# Near end of life? Regenerate probabilistically
if remaining_ttl < 0:
# Key expired
pass # Recalculate below
elif remaining_ttl < ttl * 0.2: # Last 20% of life
# Probability of regeneration increases as TTL decreases
prob = 1 - (remaining_ttl / (ttl * 0.2))
if random.random() < prob:
# First request regenerates, others get stale data temporarily
new_value = expensive_calculation()
redis_client.setex(key, ttl, new_value)
return new_value
return value
# Cache miss: calculate fresh data
result = expensive_calculation()
redis_client.setex(key, ttl, result)
return result

Distributed Lock Pattern

Prevent parallel cache recalculations:

lock_pattern.py
import time
import uuid
def get_with_lock(key: str, expensive_calculation, lock_timeout=10):
"""Use lock to prevent multiple calculations"""
lock_key = f"lock:{key}"
cache_key = key
# Check cache
cached = redis_client.get(cache_key)
if cached:
return cached
# Try to acquire lock
lock_id = str(uuid.uuid4())
acquired = redis_client.set(lock_key, lock_id, nx=True, ex=lock_timeout)
if acquired:
# We got the lock, calculate
try:
result = expensive_calculation()
redis_client.setex(cache_key, 3600, result)
return result
finally:
# Release lock
if redis_client.get(lock_key) == lock_id:
redis_client.delete(lock_key)
else:
# Someone else got the lock, wait for result
for _ in range(10):
time.sleep(0.1)
cached = redis_client.get(cache_key)
if cached:
return cached
# Timeout: calculate ourselves
return expensive_calculation()

Serialization Strategies

JSON vs MessagePack vs Protocol Buffers

serialization.py
import json
import msgpack
def store_json(key, obj):
"""Simple but larger"""
redis_client.set(key, json.dumps(obj))
def store_msgpack(key, obj):
"""Smaller, faster"""
redis_client.set(key, msgpack.packb(obj))
# Benchmark: 1000 products
products = [{"id": i, "name": f"Product {i}", "price": 99.99} for i in range(1000)]
json_size = len(json.dumps(products)) # ~50KB
msgpack_size = len(msgpack.packb(products)) # ~30KB (40% smaller)
# Choice depends on:
# - JSON: Human readable, broad ecosystem
# - MsgPack: Smaller, faster
# - Protobuf: Strongly typed, enterprise

Monitoring Cache Performance

Track Hit Rate

monitoring.py
import redis
redis_client = redis.Redis()
def get_cache_stats():
"""Get cache performance metrics"""
info = redis_client.info('stats')
total_commands = info.get('total_commands_processed', 0)
hits = info.get('keyspace_hits', 0)
misses = info.get('keyspace_misses', 0)
hit_rate = (hits / (hits + misses)) * 100 if (hits + misses) > 0 else 0
return {
"hit_rate": hit_rate,
"total_hits": hits,
"total_misses": misses,
"evictions": info.get('evicted_keys', 0),
"used_memory": info.get('used_memory_human', 'N/A'),
}
# Target: >80% hit rate
# <50%: Cache is ineffective, reconsider keys/TTLs
# >95%: Good fit for caching strategy

Connection Pooling

Reuse connections instead of creating new ones:

connection_pooling.py
import redis
from redis.connection import ConnectionPool
# Without pooling (inefficient)
conn = redis.Redis()
# With pooling (efficient)
pool = ConnectionPool.from_url(
'redis://localhost:6379',
max_connections=50,
decode_responses=True
)
redis_client = redis.Redis(connection_pool=pool)
# Reuses connections automatically
for i in range(1000):
redis_client.get(f"key:{i}")

Best Practices

1. Consistent Key Naming

# Good: Hierarchical, searchable
"user:123"
"user:123:settings"
"product:456"
"product:456:reviews"
"order:789:items"
# Bad: Ambiguous
"u123"
"prod_456"
"item-789"

2. Set Reasonable TTLs

TTL_STRATEGY = {
"user_profile": 3600, # 1 hour
"product_catalog": 86400, # 24 hours
"session": 1800, # 30 minutes
"api_response": 300, # 5 minutes
"real_time_data": 60, # 1 minute
}

3. Plan for Failures

def safe_cache_get(key: str, fallback_fn):
"""Gracefully handle Redis failures"""
try:
value = redis_client.get(key)
if value:
return value
except redis.ConnectionError:
# Redis down? Use fallback
pass
# No cache: call function
return fallback_fn()

Conclusion

Redis caching is the difference between a responsive application and a slow one.

Choose the right pattern (cache-aside for flexibility, write-through for consistency), set appropriate TTLs, prevent stampedes, and monitor hit rates. Combined with database optimization, caching forms the foundation of high-performance systems.

Resources

Related Posts

You might also enjoy

Check out some of our other posts on similar topics

PostgreSQL Query Optimization: Indexes, EXPLAIN ANALYZE & Execution Plans

PostgreSQL Query Optimization: Indexes, EXPLAIN ANALYZE & Execution Plans

Need to optimize slow PostgreSQL queries? Here's how with EXPLAIN ANALYZE and strategic indexing. Slow database queries kill application performance. But most developers don't know where the actu

Optimizing your python code with __slots__?

Optimizing your python code with __slots__?

Memory Optimization with slots Understanding the Problem Dev Tip: Optimizing Data Models in Big Data Workflows with slots In big data and MLOps workflows, you often work with

AWS EC2 Instance Management with Boto3: Start, Stop, and Query Instances

AWS EC2 Instance Management with Boto3: Start, Stop, and Query Instances

If you've ever spent 20 minutes clicking through the AWS Console just to stop a handful of dev instances, you already know the pain. It's tedious, it doesn't scale, and one wrong click can ruin your a

Top 7 Open Source OCR Models for Document Processing

Top 7 Open Source OCR Models for Document Processing

AI Tool Turn your documents into perfect digital copies with these powerful open source OCR models. No more dealing with messy text extraction get clean, accurate markdown from PDFs, images, and

AWS Secrets Manager

AWS Secrets Manager

Need to load secrets in your Node.js app without exposing them? Here's how. If you're still storing API keys or database credentials in .env files or hardcoding them into your codebase, it's ti

List S3 Buckets

List S3 Buckets

Overview Multi-Profile S3 Management Multi-Profile S3 Safari! Ever juggled multiple AWS accounts and needed a quick S3 bucket inventory across all of them? This Python script is your guid

6 related posts