High-performance caching with Redis and in-memory backends using the cashews library.
Overview
svc-infra provides a comprehensive caching layer:
- Multiple Backends: Redis for production, in-memory for development/testing
- Decorator-Based API:
@cache_readand@cache_writefor easy caching - Resource Helpers: Entity-based caching with
resource()for standardized key patterns - TTL Management: Configurable TTL buckets (short, default, long)
- Cache Invalidation: Tag-based invalidation with automatic recaching
- FastAPI Integration: One-liner
add_cache(app)with lifecycle management
Quick Start
Basic Setup
from svc_infra.cache import cache_read, cache_write, init_cache
# Initialize cache (uses CACHE_URL env var or defaults to memory)
init_cache()
@cache_read(key="user:{user_id}", ttl=300)
async def get_user(user_id: int):
return await db.fetch_user(user_id)
@cache_write(tags=["user:{user_id}"])
async def update_user(user_id: int, data: dict):
await db.update_user(user_id, data)
return dataFastAPI Integration
from fastapi import FastAPI
from svc_infra.cache import add_cache, cache_read
app = FastAPI()
# Wire cache lifecycle (startup/shutdown)
add_cache(app)
@app.get("/users/{user_id}")
@cache_read(key="user:{user_id}", ttl=300)
async def get_user(user_id: int):
return await fetch_user_from_db(user_id)Configuration
Environment Variables
| Variable | Default | Description |
|---|---|---|
CACHE_URL | — | Cache backend URL (Redis or memory) |
REDIS_URL | — | Fallback if CACHE_URL not set |
CACHE_PREFIX | svc | Namespace prefix for all cache keys |
CACHE_VERSION | v1 | Version suffix for cache key namespacing |
CACHE_TTL_DEFAULT | 300 | Default TTL in seconds (5 minutes) |
CACHE_TTL_SHORT | 30 | Short TTL in seconds (30 seconds) |
CACHE_TTL_LONG | 3600 | Long TTL in seconds (1 hour) |
Backend URLs
# Redis (production)
CACHE_URL=redis://localhost:6379/0
CACHE_URL=redis://user:password@redis.example.com:6379/0
CACHE_URL=rediss://redis.example.com:6379/0 # TLS
# In-memory (development/testing)
CACHE_URL=mem://Programmatic Configuration
from svc_infra.cache import add_cache, init_cache
# Option 1: With FastAPI app
add_cache(
app,
url="redis://localhost:6379/0",
prefix="myapp",
version="v2",
)
# Option 2: Standalone initialization
init_cache(
url="redis://localhost:6379/0",
prefix="myapp",
version="v1",
)Decorator Patterns
@cache_read - Caching Read Operations
Cache function results with automatic key generation:
from svc_infra.cache import cache_read
# Basic usage with key template
@cache_read(key="user:{user_id}", ttl=300)
async def get_user(user_id: int):
return await db.fetch_user(user_id)
# Multiple parameters in key
@cache_read(key="org:{org_id}:user:{user_id}", ttl=600)
async def get_org_user(org_id: int, user_id: int):
return await db.fetch_org_user(org_id, user_id)
# Tuple key format (auto-converted to template)
@cache_read(key=("user", "profile", "{user_id}"), ttl=300)
async def get_user_profile(user_id: int):
return await db.fetch_profile(user_id)Parameters
| Parameter | Type | Description |
|---|---|---|
key | str | tuple[str, ...] | Cache key template with {param} placeholders |
ttl | int | Time to live in seconds (defaults to CACHE_TTL_DEFAULT) |
tags | Iterable[str] | Cache tags for invalidation (defaults to key template) |
early_ttl | int | Early expiration for cache warming |
refresh | bool | Whether to refresh cache on access |
@cache_write - Invalidating on Writes
Invalidate cache tags after write operations:
from svc_infra.cache import cache_write
@cache_write(tags=["user:{user_id}"])
async def update_user(user_id: int, data: dict):
return await db.update_user(user_id, data)
# Multiple tags
@cache_write(tags=["user:{user_id}", "org:{org_id}:users"])
async def update_user_org(user_id: int, org_id: int, data: dict):
return await db.update_user(user_id, data)Dynamic Tags
from svc_infra.cache import cache_write
def get_user_tags(user_id: int, **kwargs) -> list[str]:
return [f"user:{user_id}", f"user:{user_id}:profile"]
@cache_write(tags=get_user_tags)
async def update_user(user_id: int, data: dict):
return await db.update_user(user_id, data)Recaching After Invalidation
Warm the cache immediately after invalidation:
from svc_infra.cache import cache_read, cache_write, recache
@cache_read(key="user:{user_id}", ttl=300)
async def get_user(user_id: int):
return await db.fetch_user(user_id)
@cache_write(
tags=["user:{user_id}"],
recache=[recache(get_user, include=["user_id"])]
)
async def update_user(user_id: int, data: dict):
await db.update_user(user_id, data)
return dataRecache Options
from svc_infra.cache import recache
# Include specific parameters
recache(get_user, include=["user_id"])
# Rename parameters
recache(get_org_user, rename={"user_id": "id"})
# Add extra parameters
recache(get_user_with_options, extra={"include_deleted": False})
# Combine options
recache(
get_user_details,
include=["user_id"],
extra={"expand": True},
)Resource Helper
The resource() helper provides entity-based caching with standardized key patterns:
from svc_infra.cache import resource
# Define a resource
user = resource("user", "user_id")
@user.cache_read(suffix="profile", ttl=300)
async def get_user_profile(user_id: int):
# Key: user:profile:{user_id}
# Tags: ["user:{user_id}"]
return await db.fetch_profile(user_id)
@user.cache_read(suffix="settings", ttl=600)
async def get_user_settings(user_id: int):
# Key: user:settings:{user_id}
return await db.fetch_settings(user_id)
@user.cache_write()
async def update_user(user_id: int, data: dict):
# Invalidates all user:{user_id} tagged entries
return await db.update_user(user_id, data)Resource with Recaching
user = resource("user", "user_id")
@user.cache_write(
recache=[
(get_user_profile, lambda user_id, **_: {"user_id": user_id}),
(get_user_settings, lambda user_id, **_: {"user_id": user_id}),
],
recache_max_concurrency=3,
)
async def update_user(user_id: int, data: dict):
return await db.update_user(user_id, data)Custom Key Templates
user = resource("user", "user_id")
@user.cache_read(
suffix="profile",
ttl=300,
key_template="v2:user:{user_id}:profile", # Custom key
tags_template=("user:{user_id}", "profiles"), # Custom tags
)
async def get_user_profile_v2(user_id: int):
return await db.fetch_profile(user_id)TTL Strategies
Built-in TTL Buckets
from svc_infra.cache.ttl import TTL_SHORT, TTL_DEFAULT, TTL_LONG
# Short-lived data (30 seconds) - rate limits, sessions
@cache_read(key="rate:{ip}", ttl=TTL_SHORT)
async def get_rate_limit(ip: str): ...
# Default TTL (5 minutes) - most data
@cache_read(key="user:{user_id}", ttl=TTL_DEFAULT)
async def get_user(user_id: int): ...
# Long-lived data (1 hour) - config, static content
@cache_read(key="config:{key}", ttl=TTL_LONG)
async def get_config(key: str): ...Environment-Based TTL
# Override defaults via environment
CACHE_TTL_DEFAULT=600 # 10 minutes
CACHE_TTL_SHORT=15 # 15 seconds
CACHE_TTL_LONG=7200 # 2 hoursTTL by Data Type
| Data Type | Recommended TTL | Rationale |
|---|---|---|
| Session data | 30s - 60s | Needs freshness for security |
| User profiles | 5m - 15m | Rarely changes, but should reflect updates |
| Configuration | 1h - 24h | Changes infrequently |
| Rate limits | 1s - 60s | Must be accurate for enforcement |
| Search results | 5m - 30m | Can be slightly stale |
| Static content | 1h - 24h | Rarely changes |
Caching Strategies
Cache-Aside Pattern (Default)
The default pattern - check cache first, fetch on miss:
@cache_read(key="user:{user_id}", ttl=300)
async def get_user(user_id: int):
# Called only on cache miss
return await db.fetch_user(user_id)Write-Through Pattern
Update cache and database together:
@cache_write(
tags=["user:{user_id}"],
recache=[recache(get_user, include=["user_id"])]
)
async def update_user(user_id: int, data: dict):
# 1. Update database
result = await db.update_user(user_id, data)
# 2. Invalidate old cache (via tags)
# 3. Warm cache with new data (via recache)
return resultCache Stampede Prevention
Use locking to prevent multiple simultaneous cache rebuilds:
user = resource("user", "user_id")
@user.cache_read(suffix="profile", ttl=300, lock=True)
async def get_user_profile(user_id: int):
# Only one request will hit the DB; others wait for cache
return await expensive_db_query(user_id)Graceful Degradation
Handle cache failures gracefully:
from svc_infra.cache import get_cache
import logging
logger = logging.getLogger(__name__)
async def get_user_with_fallback(user_id: int):
cache = get_cache()
key = f"user:{user_id}"
try:
cached = await cache.get(key)
if cached:
return cached
except Exception as e:
logger.warning(f"Cache read failed: {e}")
# Fallback to database
user = await db.fetch_user(user_id)
try:
await cache.set(key, user, ttl=300)
except Exception as e:
logger.warning(f"Cache write failed: {e}")
return userCache Invalidation
Tag-Based Invalidation
from svc_infra.cache import cache_write
# All entries tagged with "user:{user_id}" are invalidated
@cache_write(tags=["user:{user_id}"])
async def update_user(user_id: int, data: dict):
return await db.update_user(user_id, data)
# Invalidate multiple tag patterns
@cache_write(tags=["user:{user_id}", "org:{org_id}:members"])
async def remove_user_from_org(user_id: int, org_id: int):
return await db.remove_from_org(user_id, org_id)Manual Invalidation
from svc_infra.cache import get_cache
async def invalidate_user_cache(user_id: int):
cache = get_cache()
# Delete specific key
await cache.delete(f"user:{user_id}")
# Delete by pattern (Redis only)
await cache.delete_match(f"user:{user_id}:*")Event-Driven Invalidation
Integrate with webhooks or message queues:
from svc_infra.cache import get_cache
async def handle_user_updated_event(event: dict):
user_id = event["user_id"]
cache = get_cache()
# Invalidate all user-related cache entries
await cache.delete_match(f"user:{user_id}:*")Direct Cache Access
Access the underlying cache instance for advanced operations:
from svc_infra.cache import get_cache
async def advanced_cache_ops():
cache = get_cache()
# Basic operations
await cache.set("key", "value", ttl=300)
value = await cache.get("key")
await cache.delete("key")
# Check existence
exists = await cache.exists("key")
# Set if not exists
await cache.set("key", "value", exist=False)
# Get and delete (pop)
value = await cache.get_delete("key")
# Increment/decrement
await cache.incr("counter")
await cache.incr("counter", 5)
# Pattern deletion (Redis)
await cache.delete_match("user:*:profile")App State Access
When using add_cache(app):
from fastapi import FastAPI, Request
app = FastAPI()
add_cache(app)
@app.get("/stats")
async def get_stats(request: Request):
cache = request.app.state.cache
# Use cache directly
return await cache.get("stats")Production Recommendations
Redis Configuration
# Production Redis settings
CACHE_URL=redis://redis.example.com:6379/0
# With authentication
CACHE_URL=redis://user:password@redis.example.com:6379/0
# With TLS
CACHE_URL=rediss://redis.example.com:6379/0Redis Cluster
For high availability, use Redis Cluster:
# Redis Cluster (comma-separated nodes)
CACHE_URL=redis://node1:6379,node2:6379,node3:6379/0Memory Limits
Configure Redis maxmemory:
# redis.conf
maxmemory 256mb
maxmemory-policy allkeys-lruEviction Policies
| Policy | Use Case |
|---|---|
allkeys-lru | General caching - evict least recently used |
allkeys-lfu | Hot data caching - evict least frequently used |
volatile-ttl | TTL-based eviction of expiring keys first |
noeviction | Don't evict (returns error when full) |
Connection Pooling
The cache backend uses connection pooling by default. For high-throughput:
# Redis connection pool is managed by cashews
# No additional configuration needed for most use casesMonitoring Cache Performance
from svc_infra.cache import get_cache
async def cache_health_check():
cache = get_cache()
try:
await cache.ping()
return {"status": "healthy"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}Testing
In-Memory Backend
Use memory backend for tests:
import pytest
from svc_infra.cache import init_cache, cache_read
@pytest.fixture(autouse=True)
def setup_cache():
init_cache(url="mem://", prefix="test", version="v1")
yield
@cache_read(key="user:{user_id}", ttl=300)
async def get_user(user_id: int):
return {"id": user_id, "name": "Test User"}
async def test_cache_read():
# First call - cache miss
result1 = await get_user(1)
assert result1["id"] == 1
# Second call - cache hit
result2 = await get_user(1)
assert result2 == result1Mocking Cache
from unittest.mock import AsyncMock, patch
async def test_with_mocked_cache():
with patch("svc_infra.cache.get_cache") as mock_cache:
mock_cache.return_value.get = AsyncMock(return_value={"id": 1})
mock_cache.return_value.set = AsyncMock()
# Your test code hereTroubleshooting
Stale Data
Cache returns outdated data after updatesSolutions:
- Ensure
@cache_writeis applied to all mutation endpoints - Verify tag patterns match between read and write decorators
- Check TTL is appropriate for your use case
Connection Issues
redis.exceptions.ConnectionError: Error connecting to RedisSolutions:
- Verify
CACHE_URLorREDIS_URLis correct - Check Redis is running and accessible
- Verify network/firewall allows connection
- Check Redis authentication credentials
High Memory Usage
Redis using excessive memorySolutions:
- Review TTL values - reduce for frequently changing data
- Configure
maxmemoryand eviction policy - Audit cache key patterns for cardinality
- Use
delete_matchto clean up stale keys
Cache Stampede
Multiple simultaneous requests hitting databaseSolutions:
- Use
lock=Trueinresource().cache_read() - Implement cache warming on startup
- Use
early_ttlfor background refresh
Key Collisions
Cache returning wrong dataSolutions:
- Ensure unique cache key templates
- Include all relevant parameters in key
- Use
CACHE_VERSIONfor breaking changes - Verify
CACHE_PREFIXis unique per environment
See Also
- Jobs Guide - Background job processing
- Environment Reference - All cache environment variables
- Resilience Patterns - Circuit breakers and fallbacks
- Observability Guide - Monitoring cache performance