""" Security Module - HIPAA/GDPR Compliance Features Implements authentication, authorization, audit logging, and encryption """ import logging import hashlib import secrets import json from datetime import datetime, timedelta from typing import Dict, List, Any, Optional from functools import wraps import jwt from fastapi import HTTPException, Request, Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials logger = logging.getLogger(__name__) # Security configuration SECRET_KEY = secrets.token_urlsafe(32) # In production, load from environment ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 class AuditLogger: """ HIPAA-compliant audit logging Tracks all access to PHI (Protected Health Information) """ def __init__(self): self.audit_log_path = "logs/audit.log" logger.info("Audit Logger initialized") def log_access( self, user_id: str, action: str, resource: str, ip_address: str, status: str, details: Optional[Dict[str, Any]] = None ): """Log access to medical data""" try: audit_entry = { "timestamp": datetime.utcnow().isoformat(), "user_id": user_id, "action": action, "resource": resource, "ip_address": self._anonymize_ip(ip_address), "status": status, "details": details or {} } # Log to file logger.info(f"AUDIT: {json.dumps(audit_entry)}") # In production, also store in database for long-term retention except Exception as e: logger.error(f"Audit logging failed: {str(e)}") def _anonymize_ip(self, ip_address: str) -> str: """Anonymize IP address for GDPR compliance""" # Hash the last octet for IPv4 or last 80 bits for IPv6 if ':' in ip_address: # IPv6 parts = ip_address.split(':') return ':'.join(parts[:4]) + ':xxxx' else: # IPv4 parts = ip_address.split('.') return '.'.join(parts[:3]) + '.xxx' def log_phi_access( self, user_id: str, document_id: str, action: str, ip_address: str ): """Specific logging for PHI access""" self.log_access( user_id=user_id, action=f"PHI_{action}", resource=f"document:{document_id}", ip_address=ip_address, status="SUCCESS", details={"phi_accessed": True} ) class SecurityManager: """ Manages authentication, authorization, and encryption """ def __init__(self): self.audit_logger = AuditLogger() self.security_bearer = HTTPBearer(auto_error=False) logger.info("Security Manager initialized") def create_access_token(self, user_id: str, email: str) -> str: """Create JWT access token""" expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) payload = { "sub": user_id, "email": email, "exp": expire, "iat": datetime.utcnow() } token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) return token def verify_token(self, token: str) -> Optional[Dict[str, Any]]: """Verify and decode JWT token""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except jwt.ExpiredSignatureError: logger.warning("Token expired") return None except jwt.JWTError as e: logger.warning(f"Token verification failed: {str(e)}") return None async def get_current_user( self, request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False)) ) -> Dict[str, Any]: """ FastAPI dependency for protected routes Validates JWT token and returns user info """ # For development/demo, allow anonymous access but log it if not credentials: logger.warning("Anonymous access - should be restricted in production") anonymous_user = { "user_id": "anonymous", "email": "anonymous@demo.local", "is_anonymous": True } # Log anonymous access client_ip = request.client.host if request.client else "unknown" self.audit_logger.log_access( user_id="anonymous", action="API_ACCESS", resource=request.url.path, ip_address=client_ip, status="WARNING_ANONYMOUS" ) return anonymous_user # Verify token token = credentials.credentials payload = self.verify_token(token) if not payload: raise HTTPException( status_code=401, detail="Invalid or expired authentication token" ) user_info = { "user_id": payload.get("sub"), "email": payload.get("email"), "is_anonymous": False } # Log authenticated access client_ip = request.client.host if request.client else "unknown" self.audit_logger.log_access( user_id=user_info["user_id"], action="API_ACCESS", resource=request.url.path, ip_address=client_ip, status="SUCCESS" ) return user_info def hash_phi_identifier(self, identifier: str) -> str: """ Hash PHI identifiers for pseudonymization Required for GDPR compliance """ return hashlib.sha256(identifier.encode()).hexdigest() def sanitize_response(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Remove or redact sensitive information from API responses """ # In production, implement comprehensive PII/PHI redaction # For now, basic sanitization if "error" in data: # Don't expose internal error details data["error"] = "An error occurred during processing" return data class DataEncryption: """ Handles encryption of data at rest and in transit Required for HIPAA/GDPR compliance """ def __init__(self): # In production, use proper key management (e.g., AWS KMS, Azure Key Vault) self.encryption_key = self._load_or_generate_key() logger.info("Data Encryption initialized") def _load_or_generate_key(self) -> bytes: """Load encryption key from secure storage""" # In production, load from secure key management system # For demo, generate a key return secrets.token_bytes(32) def encrypt_data(self, data: bytes) -> bytes: """ Encrypt sensitive data using AES-256 """ # In production, implement proper AES-256 encryption # For now, return as-is (encryption would require cryptography library) logger.warning("Encryption not fully implemented - add cryptography library") return data def decrypt_data(self, encrypted_data: bytes) -> bytes: """Decrypt data""" logger.warning("Decryption not fully implemented - add cryptography library") return encrypted_data def secure_delete(self, file_path: str): """ Securely delete files containing PHI HIPAA requires secure deletion """ import os try: # In production, overwrite file multiple times before deletion if os.path.exists(file_path): # Overwrite with random data file_size = os.path.getsize(file_path) with open(file_path, 'wb') as f: f.write(secrets.token_bytes(file_size)) # Delete file os.remove(file_path) logger.info(f"Securely deleted file: {file_path}") except Exception as e: logger.error(f"Secure deletion failed: {str(e)}") class ComplianceValidator: """ Validates compliance with HIPAA and GDPR requirements """ def __init__(self): self.required_features = { "encryption_at_rest": False, # Would be True in production "encryption_in_transit": True, # HTTPS enforced "access_logging": True, "user_authentication": True, # Available but not enforced in demo "data_retention_policy": False, # Would implement in production "right_to_erasure": False, # GDPR - would implement in production "consent_management": False # Would implement in production } def check_compliance(self) -> Dict[str, Any]: """Check current compliance status""" total_features = len(self.required_features) implemented_features = sum(1 for v in self.required_features.values() if v) return { "compliance_score": f"{implemented_features}/{total_features}", "percentage": round((implemented_features / total_features) * 100, 1), "features": self.required_features, "status": "DEMO_MODE" if implemented_features < total_features else "COMPLIANT", "recommendations": self._get_recommendations() } def _get_recommendations(self) -> List[str]: """Get compliance recommendations""" recommendations = [] for feature, implemented in self.required_features.items(): if not implemented: recommendations.append( f"Implement {feature.replace('_', ' ').title()}" ) return recommendations # Global security manager instance _security_manager = None def get_security_manager() -> SecurityManager: """Get singleton security manager instance""" global _security_manager if _security_manager is None: _security_manager = SecurityManager() return _security_manager # Decorator for protected routes def require_auth(func): """Decorator to protect endpoints with authentication""" @wraps(func) async def wrapper(*args, **kwargs): # In production, enforce authentication # For demo, log warning and allow access logger.warning(f"Protected endpoint accessed: {func.__name__}") return await func(*args, **kwargs) return wrapper