Spaces:
Sleeping
Sleeping
| """ | |
| HIPAA Data Retention Policies & Auto-Purge | |
| Provides: | |
| - Configurable retention periods for intake sessions and audit logs | |
| - Scheduled background auto-purge of expired data | |
| - Manual purge trigger for admin use | |
| - Purge logging for compliance audit trail | |
| """ | |
| import asyncio | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| from sqlalchemy import delete, func, select | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from app.config import settings | |
| from app.db.database import AsyncSessionLocal | |
| from app.db.models import AuditLog, IntakeSession | |
| logger = logging.getLogger(__name__) | |
| _purge_task: Optional[asyncio.Task] = None | |
| async def purge_expired_sessions(db: AsyncSession) -> int: | |
| """Delete intake sessions older than the retention period. | |
| Returns the number of deleted records. | |
| """ | |
| if settings.retention_sessions_days <= 0: | |
| return 0 # Retention disabled (keep forever) | |
| cutoff = datetime.utcnow() - timedelta(days=settings.retention_sessions_days) | |
| # Count first for logging | |
| count_result = await db.execute( | |
| select(func.count()).select_from(IntakeSession).where( | |
| IntakeSession.created_at < cutoff | |
| ) | |
| ) | |
| count = count_result.scalar() or 0 | |
| if count > 0: | |
| await db.execute( | |
| delete(IntakeSession).where(IntakeSession.created_at < cutoff) | |
| ) | |
| await db.commit() | |
| logger.info(f"Purged {count} expired intake sessions (older than {settings.retention_sessions_days} days)") | |
| return count | |
| async def purge_expired_audit_logs(db: AsyncSession) -> int: | |
| """Delete audit logs older than the retention period. | |
| Returns the number of deleted records. | |
| Note: HIPAA requires minimum 6 years for audit logs. | |
| """ | |
| if settings.retention_audit_logs_days <= 0: | |
| return 0 # Retention disabled (keep forever) | |
| cutoff = datetime.utcnow() - timedelta(days=settings.retention_audit_logs_days) | |
| count_result = await db.execute( | |
| select(func.count()).select_from(AuditLog).where( | |
| AuditLog.timestamp < cutoff | |
| ) | |
| ) | |
| count = count_result.scalar() or 0 | |
| if count > 0: | |
| await db.execute( | |
| delete(AuditLog).where(AuditLog.timestamp < cutoff) | |
| ) | |
| await db.commit() | |
| logger.info(f"Purged {count} expired audit logs (older than {settings.retention_audit_logs_days} days)") | |
| return count | |
| async def run_purge() -> dict: | |
| """Execute a full purge cycle and return results.""" | |
| results = {"sessions_purged": 0, "audit_logs_purged": 0, "errors": []} | |
| async with AsyncSessionLocal() as db: | |
| try: | |
| results["sessions_purged"] = await purge_expired_sessions(db) | |
| except Exception as exc: | |
| logger.error(f"Session purge failed: {exc}") | |
| results["errors"].append(f"session_purge: {exc}") | |
| try: | |
| results["audit_logs_purged"] = await purge_expired_audit_logs(db) | |
| except Exception as exc: | |
| logger.error(f"Audit log purge failed: {exc}") | |
| results["errors"].append(f"audit_log_purge: {exc}") | |
| return results | |
| async def get_retention_stats() -> dict: | |
| """Get current retention statistics for the admin dashboard.""" | |
| async with AsyncSessionLocal() as db: | |
| session_count = await db.execute( | |
| select(func.count()).select_from(IntakeSession) | |
| ) | |
| audit_count = await db.execute( | |
| select(func.count()).select_from(AuditLog) | |
| ) | |
| # Count records that would be purged | |
| sessions_total = session_count.scalar() or 0 | |
| audits_total = audit_count.scalar() or 0 | |
| sessions_expired = 0 | |
| audits_expired = 0 | |
| if settings.retention_sessions_days > 0: | |
| cutoff = datetime.utcnow() - timedelta(days=settings.retention_sessions_days) | |
| result = await db.execute( | |
| select(func.count()).select_from(IntakeSession).where( | |
| IntakeSession.created_at < cutoff | |
| ) | |
| ) | |
| sessions_expired = result.scalar() or 0 | |
| if settings.retention_audit_logs_days > 0: | |
| cutoff = datetime.utcnow() - timedelta(days=settings.retention_audit_logs_days) | |
| result = await db.execute( | |
| select(func.count()).select_from(AuditLog).where( | |
| AuditLog.timestamp < cutoff | |
| ) | |
| ) | |
| audits_expired = result.scalar() or 0 | |
| return { | |
| "sessions": { | |
| "total": sessions_total, | |
| "expired": sessions_expired, | |
| "retention_days": settings.retention_sessions_days, | |
| }, | |
| "audit_logs": { | |
| "total": audits_total, | |
| "expired": audits_expired, | |
| "retention_days": settings.retention_audit_logs_days, | |
| }, | |
| "auto_purge_enabled": settings.auto_purge_enabled, | |
| "purge_interval_hours": settings.auto_purge_interval_hours, | |
| } | |
| async def _purge_loop(): | |
| """Background loop that periodically runs the purge cycle.""" | |
| interval = settings.auto_purge_interval_hours * 3600 | |
| logger.info( | |
| f"Auto-purge scheduler started (interval: {settings.auto_purge_interval_hours}h, " | |
| f"sessions: {settings.retention_sessions_days}d, " | |
| f"audit logs: {settings.retention_audit_logs_days}d)" | |
| ) | |
| while True: | |
| try: | |
| await asyncio.sleep(interval) | |
| if settings.auto_purge_enabled: | |
| results = await run_purge() | |
| if results["sessions_purged"] or results["audit_logs_purged"]: | |
| logger.info(f"Auto-purge completed: {results}") | |
| except asyncio.CancelledError: | |
| logger.info("Auto-purge scheduler stopped.") | |
| break | |
| except Exception as exc: | |
| logger.error(f"Auto-purge cycle error: {exc}") | |
| def start_purge_scheduler(): | |
| """Start the background purge scheduler.""" | |
| global _purge_task | |
| if not settings.auto_purge_enabled: | |
| logger.info("Auto-purge is disabled by configuration.") | |
| return | |
| if _purge_task is not None: | |
| return # Already running | |
| _purge_task = asyncio.ensure_future(_purge_loop()) | |
| def stop_purge_scheduler(): | |
| """Stop the background purge scheduler.""" | |
| global _purge_task | |
| if _purge_task is not None: | |
| _purge_task.cancel() | |
| _purge_task = None | |