svc-infra provides comprehensive data lifecycle management: fixtures for reference data, retention policies for cleanup, GDPR erasure workflows, and backup verification.
Quick Start
from fastapi import FastAPI
from svc_infra.data.add import add_data_lifecycle
from svc_infra.data.fixtures import make_on_load_fixtures
from svc_infra.data.retention import RetentionPolicy
app = FastAPI()
# Auto-migrate and load fixtures on startup
add_data_lifecycle(
app,
auto_migrate=True,
on_load_fixtures=make_on_load_fixtures(
load_default_categories,
load_admin_users,
run_once_file=".fixtures_loaded",
),
)Fixtures (Reference Data)
Load initial/reference data on application startup with idempotency support.
Basic Usage
from svc_infra.data.fixtures import run_fixtures, make_on_load_fixtures
# Define fixture loaders
async def load_categories(session):
"""Load default product categories."""
defaults = [
{"id": "electronics", "name": "Electronics"},
{"id": "clothing", "name": "Clothing"},
{"id": "home", "name": "Home & Garden"},
]
for cat in defaults:
await session.merge(Category(**cat))
await session.commit()
async def load_admin_user(session):
"""Create default admin user."""
admin = await session.get(User, "admin")
if not admin:
admin = User(id="admin", email="admin@example.com", role="admin")
session.add(admin)
await session.commit()
# Run manually
await run_fixtures([load_categories, load_admin_user])Idempotent Fixtures
Use a sentinel file to ensure fixtures run only once:
await run_fixtures(
[load_categories, load_admin_user],
run_once_file=".fixtures_loaded",
)
# Creates .fixtures_loaded on success
# Subsequent calls skip if file existsStartup Integration
from svc_infra.data.fixtures import make_on_load_fixtures
from svc_infra.data.add import add_data_lifecycle
# Create startup fixture loader
on_load = make_on_load_fixtures(
load_categories,
load_admin_user,
load_feature_flags,
run_once_file=".fixtures_loaded",
)
add_data_lifecycle(app, on_load_fixtures=on_load)Environment-Specific Fixtures
import os
def load_fixtures_for_env():
"""Load fixtures based on environment."""
env = os.getenv("ENV", "development")
loaders = [load_default_config]
if env == "development":
loaders.extend([load_test_users, load_sample_data])
elif env == "staging":
loaders.extend([load_qa_users])
# Production: only default config
return loaders
add_data_lifecycle(
app,
on_load_fixtures=make_on_load_fixtures(
*load_fixtures_for_env(),
run_once_file=f".fixtures_{os.getenv('ENV', 'dev')}_loaded",
),
)Fixture Best Practices
# Good: Idempotent fixtures using merge/upsert
async def load_settings(session):
for setting in DEFAULT_SETTINGS:
await session.merge(Setting(**setting))
await session.commit()
# Good: Check before insert
async def load_admin(session):
if not await session.get(User, "admin"):
session.add(User(id="admin", ...))
await session.commit()
# Bad: Will fail on duplicate
async def load_data(session):
session.add(Category(id="electronics", ...)) # Fails if exists!
await session.commit()Retention Policies
Automatically clean up old data based on configurable retention periods.
Defining Policies
from svc_infra.data.retention import RetentionPolicy
# Soft delete logs older than 30 days
log_retention = RetentionPolicy(
name="audit-logs",
model=AuditLog,
older_than_days=30,
soft_delete_field="deleted_at", # Column to set timestamp
hard_delete=False,
)
# Hard delete temporary data older than 7 days
temp_retention = RetentionPolicy(
name="temp-files",
model=TemporaryUpload,
older_than_days=7,
soft_delete_field=None,
hard_delete=True, # Actually DELETE rows
)
# With extra filtering conditions
session_retention = RetentionPolicy(
name="expired-sessions",
model=Session,
older_than_days=90,
extra_where=[Session.is_active == False],
hard_delete=True,
)Running Purges
from svc_infra.data.retention import run_retention_purge
# Manual execution
async def purge_old_data(session):
policies = [log_retention, temp_retention, session_retention]
affected = await run_retention_purge(session, policies)
print(f"Purged {affected} rows")Scheduled Purges
Integrate with the jobs system for automatic cleanup:
from svc_infra.jobs import JobQueue
async def retention_job():
"""Scheduled job to run retention purges."""
async with get_session() as session:
affected = await run_retention_purge(session, [
log_retention,
temp_retention,
session_retention,
])
logger.info(f"Retention purge: {affected} rows affected")
return affected
# Register with job scheduler
queue.schedule(
name="retention-purge",
handler=retention_job,
interval_hours=6,
)Soft Delete Pattern
Models should include soft delete columns:
from sqlalchemy import Column, DateTime, Boolean
from sqlalchemy.orm import DeclarativeBase
class SoftDeleteMixin:
deleted_at = Column(DateTime, nullable=True, index=True)
is_active = Column(Boolean, default=True, index=True)
class AuditLog(Base, SoftDeleteMixin):
__tablename__ = "audit_logs"
id = Column(Integer, primary_key=True)
action = Column(String)
created_at = Column(DateTime, server_default=func.now())Retention in CRUD Repositories
from svc_infra.db.sql import SqlRepository
class AuditLogRepository(SqlRepository):
model = AuditLog
async def list_active(self, session, *, limit: int, offset: int):
"""List only non-deleted logs."""
return await self.list(
session,
limit=limit,
offset=offset,
where=[AuditLog.deleted_at.is_(None)],
)GDPR Erasure
Comply with data subject deletion requests with composable erasure plans.
Defining Erasure Steps
from svc_infra.data.erasure import ErasureStep, ErasurePlan
# Individual erasure operations
async def erase_user_profile(session, principal_id: str) -> int:
"""Delete user profile data."""
stmt = delete(UserProfile).where(UserProfile.user_id == principal_id)
result = await session.execute(stmt)
return result.rowcount
async def erase_user_orders(session, principal_id: str) -> int:
"""Anonymize order records (required for accounting)."""
stmt = (
update(Order)
.where(Order.user_id == principal_id)
.values(
user_id=None,
email="[REDACTED]",
name="[REDACTED]",
address="[REDACTED]",
)
)
result = await session.execute(stmt)
return result.rowcount
async def erase_user_comments(session, principal_id: str) -> int:
"""Delete all user comments."""
stmt = delete(Comment).where(Comment.author_id == principal_id)
result = await session.execute(stmt)
return result.rowcount
async def erase_user_account(session, principal_id: str) -> int:
"""Finally delete the user account."""
stmt = delete(User).where(User.id == principal_id)
result = await session.execute(stmt)
return result.rowcountComposing Erasure Plans
from svc_infra.data.erasure import ErasureStep, ErasurePlan
user_erasure_plan = ErasurePlan(steps=[
# Order matters! Delete dependencies first
ErasureStep(name="comments", run=erase_user_comments),
ErasureStep(name="orders", run=erase_user_orders), # Anonymize, not delete
ErasureStep(name="profile", run=erase_user_profile),
ErasureStep(name="account", run=erase_user_account),
])Executing Erasure
from svc_infra.data.erasure import run_erasure
async def handle_deletion_request(user_id: str):
"""Process GDPR Article 17 deletion request."""
async with get_session() as session:
# Run erasure with audit callback
affected = await run_erasure(
session,
principal_id=user_id,
plan=user_erasure_plan,
on_audit=log_erasure_event,
)
await session.commit()
logger.info(f"Erased data for user {user_id}: {affected} rows affected")
return affected
def log_erasure_event(event: str, context: dict):
"""Audit callback for compliance logging."""
audit_logger.info(
event,
extra={
"principal_id": context["principal_id"],
"affected_rows": context["affected"],
"timestamp": datetime.utcnow().isoformat(),
},
)API Endpoint
from fastapi import APIRouter, BackgroundTasks
router = APIRouter()
@router.post("/gdpr/erasure/{user_id}")
async def request_erasure(
user_id: str,
background_tasks: BackgroundTasks,
current_user: User = Depends(require_admin),
):
"""GDPR Article 17 - Right to Erasure."""
# Verify the request (user exists, can be deleted, etc.)
user = await get_user(user_id)
if not user:
raise HTTPException(404, "User not found")
# Queue erasure as background task
background_tasks.add_task(handle_deletion_request, user_id)
return {"status": "accepted", "message": "Erasure request queued"}Cascade Handling
For complex relationships, order steps carefully:
# Schema:
# User -> Orders -> OrderItems
# User -> Comments
# User -> UserProfile
cascade_erasure_plan = ErasurePlan(steps=[
# Level 3: Deepest dependencies first
ErasureStep(name="order_items", run=erase_order_items_by_user),
# Level 2: Parent of order_items
ErasureStep(name="orders", run=erase_user_orders),
ErasureStep(name="comments", run=erase_user_comments),
ErasureStep(name="profile", run=erase_user_profile),
# Level 1: Top-level entity last
ErasureStep(name="account", run=erase_user_account),
])Backup Verification
Ensure backup health with automated verification.
Basic Verification
from svc_infra.data.backup import verify_backups, BackupHealthReport
# Simple check
report = verify_backups(
last_success=datetime(2024, 1, 15, 12, 0), # Last successful backup
retention_days=1, # Expect daily backups
)
if report.ok:
print("Backups are healthy")
else:
print(f"Backup issue: {report.message}")Custom Backup Checker
from svc_infra.data.backup import BackupHealthReport
async def check_s3_backups() -> BackupHealthReport:
"""Check backup health in S3."""
try:
# List recent backups
response = s3.list_objects_v2(
Bucket="my-backups",
Prefix="db-backups/",
)
if not response.get("Contents"):
return BackupHealthReport(
ok=False,
last_success=None,
retention_days=1,
message="No backups found",
)
# Find most recent
latest = max(response["Contents"], key=lambda x: x["LastModified"])
last_success = latest["LastModified"]
# Check if recent enough
age_hours = (datetime.utcnow() - last_success).total_seconds() / 3600
return BackupHealthReport(
ok=age_hours < 24,
last_success=last_success,
retention_days=1,
message=f"Latest backup: {age_hours:.1f} hours ago",
)
except Exception as e:
return BackupHealthReport(
ok=False,
last_success=None,
retention_days=1,
message=str(e),
)Scheduled Verification Job
from svc_infra.data.backup import make_backup_verification_job
def alert_on_backup_failure(report: BackupHealthReport):
"""Send alert if backups are unhealthy."""
if not report.ok:
send_slack_alert(
channel="#ops-alerts",
message=f"🚨 Backup verification failed: {report.message}",
)
backup_job = make_backup_verification_job(
checker=check_s3_backups,
on_report=alert_on_backup_failure,
)
# Schedule with jobs runner
queue.schedule(
name="backup-verify",
handler=backup_job,
interval_hours=12,
)Scheduling Integration
Use the jobs system for automated lifecycle management.
JOBS_SCHEDULE_JSON Configuration
export JOBS_SCHEDULE_JSON='[
{"name": "retention-purge", "interval": "6h", "handler": "app.jobs:run_retention"},
{"name": "backup-verify", "interval": "12h", "handler": "app.jobs:verify_backups_job"},
{"name": "session-cleanup", "interval": "1h", "handler": "app.jobs:cleanup_sessions"}
]'Job Handler Implementation
# app/jobs.py
from svc_infra.data.retention import run_retention_purge
from svc_infra.data.backup import verify_backups
async def run_retention():
"""Scheduled retention purge."""
async with get_session() as session:
return await run_retention_purge(session, [
audit_log_retention,
temp_file_retention,
old_notification_retention,
])
def verify_backups_job():
"""Scheduled backup verification."""
report = check_backup_status()
if not report.ok:
send_ops_alert(f"Backup unhealthy: {report.message}")
return report
async def cleanup_sessions():
"""Clean up expired sessions."""
async with get_session() as session:
# Delete sessions older than 30 days
cutoff = datetime.utcnow() - timedelta(days=30)
stmt = delete(Session).where(Session.expires_at < cutoff)
result = await session.execute(stmt)
await session.commit()
return result.rowcountCompliance
GDPR Checklist
- Right to Access (Article 15) — Export user data on request
- Right to Erasure (Article 17) — Delete user data with
ErasurePlan - Right to Portability (Article 20) — Export data in machine-readable format
- Data Retention — Define and enforce
RetentionPolicyfor all PII - Audit Trails — Log all erasure operations with
on_auditcallback - Consent Records — Maintain proof of consent with timestamps
Data Subject Request Workflow
@router.post("/gdpr/export/{user_id}")
async def export_user_data(user_id: str):
"""GDPR Article 20 - Data Portability."""
async with get_session() as session:
user = await get_user(session, user_id)
orders = await get_user_orders(session, user_id)
comments = await get_user_comments(session, user_id)
return {
"user": user.to_dict(),
"orders": [o.to_dict() for o in orders],
"comments": [c.to_dict() for c in comments],
"exported_at": datetime.utcnow().isoformat(),
}
@router.post("/gdpr/rectify/{user_id}")
async def rectify_user_data(
user_id: str,
updates: UserUpdate,
):
"""GDPR Article 16 - Right to Rectification."""
async with get_session() as session:
user = await get_user(session, user_id)
for key, value in updates.dict(exclude_unset=True).items():
setattr(user, key, value)
await session.commit()
# Audit log
audit_logger.info("user_data_rectified", extra={
"user_id": user_id,
"fields": list(updates.dict(exclude_unset=True).keys()),
})
return userAudit Logging
import structlog
audit_logger = structlog.get_logger("audit")
def log_data_lifecycle_event(event: str, context: dict):
"""Standard audit log format for compliance."""
audit_logger.info(
event,
principal_id=context.get("principal_id"),
affected_rows=context.get("affected"),
action_type=context.get("action_type"),
timestamp=datetime.utcnow().isoformat(),
operator=context.get("operator"), # Admin who initiated
)Production Recommendations
Retention Scheduling
| Data Type | Retention | Schedule | Strategy |
|---|---|---|---|
| Audit logs | 90 days | Weekly | Soft delete |
| Temp files | 7 days | Daily | Hard delete |
| Sessions | 30 days | Hourly | Hard delete |
| Analytics | 365 days | Monthly | Archive |
Erasure Performance
# Good: Batch delete with limit
async def erase_in_batches(session, user_id: str, batch_size: int = 1000):
total = 0
while True:
stmt = (
delete(Comment)
.where(Comment.author_id == user_id)
.limit(batch_size)
)
result = await session.execute(stmt)
if result.rowcount == 0:
break
total += result.rowcount
await session.commit() # Commit each batch
return total
# Bad: Single large delete (can lock tables)
async def erase_all_at_once(session, user_id: str):
stmt = delete(Comment).where(Comment.author_id == user_id)
result = await session.execute(stmt) # May timeout!
return result.rowcountBackup Verification
- Run verification jobs every 12 hours minimum
- Alert immediately on any failure
- Check both backup existence AND restorability
- Maintain backup metadata (size, duration, checksum)
Troubleshooting
Fixtures Not Running
Symptom: Expected data not present after startup.
Diagnosis:
# Check sentinel file
ls -la .fixtures_loaded
# Check logs for fixture execution
grep "fixture" logs/app.logSolutions:
- Delete sentinel file to re-run:
rm .fixtures_loaded - Verify fixture functions are async-compatible
- Check for exceptions in fixture code
- Ensure database connection is available during startup
Retention Not Purging
Symptom: Old data not being deleted.
Diagnosis:
# Check policy configuration
print(f"Cutoff: {older_than_days} days")
print(f"Model has created_at: {hasattr(model, 'created_at')}")
print(f"Soft delete field: {soft_delete_field}")Solutions:
- Verify
created_atcolumn exists on model - Check
extra_whereconditions aren't too restrictive - Confirm job is actually being scheduled
- Review database permissions for DELETE operations
Erasure Failing
Symptom: GDPR erasure requests fail or incomplete.
Diagnosis:
# Test each step individually
for step in erasure_plan.steps:
try:
result = await step.run(session, user_id)
print(f"{step.name}: {result} rows")
except Exception as e:
print(f"{step.name}: FAILED - {e}")Solutions:
- Check foreign key constraints — delete in correct order
- Verify user_id is correct type (string vs UUID)
- Add missing cascade delete rules
- Check database transaction isolation level
API Reference
run_fixtures
async def run_fixtures(
loaders: Iterable[Callable[[], None | Awaitable[None]]],
*,
run_once_file: str | None = None,
) -> None:
"""Run fixture loaders with optional idempotency."""RetentionPolicy
@dataclass(frozen=True)
class RetentionPolicy:
name: str # Policy identifier
model: Any # SQLAlchemy model
older_than_days: int # Age threshold
soft_delete_field: str | None # Column for soft delete
extra_where: Sequence[Any] | None # Additional filters
hard_delete: bool # If True, DELETE rowsrun_retention_purge
async def run_retention_purge(
session: AsyncSession,
policies: Iterable[RetentionPolicy],
) -> int:
"""Execute retention policies, return total affected rows."""ErasurePlan
@dataclass(frozen=True)
class ErasureStep:
name: str
run: Callable[[Session, str], Awaitable[int] | int]
@dataclass(frozen=True)
class ErasurePlan:
steps: Iterable[ErasureStep]run_erasure
async def run_erasure(
session: AsyncSession,
principal_id: str,
plan: ErasurePlan,
*,
on_audit: Callable[[str, dict], None] | None = None,
) -> int:
"""Execute erasure plan with optional audit callback."""verify_backups
def verify_backups(
*,
last_success: datetime | None = None,
retention_days: int | None = None,
) -> BackupHealthReport:
"""Return backup health report."""
@dataclass(frozen=True)
class BackupHealthReport:
ok: bool
last_success: datetime | None
retention_days: int | None
message: stradd_data_lifecycle
def add_data_lifecycle(
app: FastAPI,
*,
auto_migrate: bool = True,
database_url: str | None = None,
discover_packages: list[str] | None = None,
with_payments: bool | None = None,
on_load_fixtures: Callable[[], None] | None = None,
retention_jobs: Iterable[Callable] | None = None,
erasure_job: Callable[[str], None] | None = None,
) -> None:
"""Wire data lifecycle conveniences on app startup."""See Also
- Database Guide — SQL session and repository patterns
- Jobs Guide — Background job scheduling
- Auth Guide — User management
- CLI Reference — Database migration commands