""" Production Logging Infrastructure Structured logging with medical-specific fields and compliance features Features: - JSON-structured logging for machine parsing - Medical-specific log fields (PHI anonymization, confidence scores) - Log levels with appropriate categorization - Security event logging - Compliance-ready log retention - Centralized log aggregation support Author: MiniMax Agent Date: 2025-10-29 Version: 1.0.0 """ import logging import json import hashlib from typing import Dict, Any, Optional from datetime import datetime from enum import Enum import traceback class LogLevel(Enum): """Standard log levels""" DEBUG = "DEBUG" INFO = "INFO" WARNING = "WARNING" ERROR = "ERROR" CRITICAL = "CRITICAL" class EventCategory(Enum): """Event categories for medical AI platform""" AUTHENTICATION = "authentication" AUTHORIZATION = "authorization" PHI_ACCESS = "phi_access" MODEL_INFERENCE = "model_inference" DATA_PROCESSING = "data_processing" SYSTEM_EVENT = "system_event" SECURITY_EVENT = "security_event" COMPLIANCE_EVENT = "compliance_event" PERFORMANCE_EVENT = "performance_event" ERROR_EVENT = "error_event" class MedicalLogger: """ Medical-grade structured logger with compliance features Implements HIPAA-compliant logging with PHI protection """ def __init__( self, service_name: str, environment: str = "production" ): self.service_name = service_name self.environment = environment self.logger = logging.getLogger(service_name) self.logger.setLevel(logging.DEBUG) # Setup JSON formatter self._setup_json_handler() # Track logging statistics self.log_counts = {level.value: 0 for level in LogLevel} def _setup_json_handler(self): """Setup JSON-formatted log handler""" handler = logging.StreamHandler() handler.setLevel(logging.DEBUG) # Custom formatter for JSON output formatter = logging.Formatter( '{"timestamp": "%(asctime)s", "level": "%(levelname)s", ' '"service": "%(name)s", "message": "%(message)s"}' ) handler.setFormatter(formatter) self.logger.addHandler(handler) def _anonymize_phi(self, data: Any) -> Any: """Anonymize PHI in log data""" if isinstance(data, dict): anonymized = {} phi_fields = ["patient_id", "patient_name", "ssn", "mrn", "email", "phone"] for key, value in data.items(): if any(phi_field in key.lower() for phi_field in phi_fields): # Hash PHI fields if isinstance(value, str): anonymized[key] = hashlib.sha256(value.encode()).hexdigest()[:16] else: anonymized[key] = "[REDACTED]" elif isinstance(value, (dict, list)): anonymized[key] = self._anonymize_phi(value) else: anonymized[key] = value return anonymized elif isinstance(data, list): return [self._anonymize_phi(item) for item in data] return data def _create_log_entry( self, level: LogLevel, message: str, category: EventCategory, details: Optional[Dict[str, Any]] = None, user_id: Optional[str] = None, document_id: Optional[str] = None, model_id: Optional[str] = None, confidence: Optional[float] = None, anonymize: bool = True ) -> Dict[str, Any]: """Create structured log entry""" log_entry = { "timestamp": datetime.utcnow().isoformat(), "level": level.value, "service": self.service_name, "environment": self.environment, "category": category.value, "message": message } # Add optional fields if user_id: log_entry["user_id"] = user_id if document_id: log_entry["document_id"] = document_id if model_id: log_entry["model_id"] = model_id if confidence is not None: log_entry["confidence"] = confidence if details: # Anonymize PHI if requested if anonymize: details = self._anonymize_phi(details) log_entry["details"] = details return log_entry def log( self, level: LogLevel, message: str, category: EventCategory = EventCategory.SYSTEM_EVENT, **kwargs ): """Generic log method""" log_entry = self._create_log_entry(level, message, category, **kwargs) # Increment counter self.log_counts[level.value] += 1 # Log at appropriate level if level == LogLevel.DEBUG: self.logger.debug(json.dumps(log_entry)) elif level == LogLevel.INFO: self.logger.info(json.dumps(log_entry)) elif level == LogLevel.WARNING: self.logger.warning(json.dumps(log_entry)) elif level == LogLevel.ERROR: self.logger.error(json.dumps(log_entry)) elif level == LogLevel.CRITICAL: self.logger.critical(json.dumps(log_entry)) def info(self, message: str, category: EventCategory = EventCategory.SYSTEM_EVENT, **kwargs): """Log info message""" self.log(LogLevel.INFO, message, category, **kwargs) def warning(self, message: str, category: EventCategory = EventCategory.SYSTEM_EVENT, **kwargs): """Log warning message""" self.log(LogLevel.WARNING, message, category, **kwargs) def error(self, message: str, category: EventCategory = EventCategory.ERROR_EVENT, **kwargs): """Log error message""" self.log(LogLevel.ERROR, message, category, **kwargs) def critical(self, message: str, category: EventCategory = EventCategory.ERROR_EVENT, **kwargs): """Log critical message""" self.log(LogLevel.CRITICAL, message, category, **kwargs) def debug(self, message: str, category: EventCategory = EventCategory.SYSTEM_EVENT, **kwargs): """Log debug message""" self.log(LogLevel.DEBUG, message, category, **kwargs) def log_authentication( self, user_id: str, success: bool, ip_address: str, details: Optional[Dict[str, Any]] = None ): """Log authentication event""" message = f"Authentication {'successful' if success else 'failed'} for user {user_id}" self.log( LogLevel.INFO if success else LogLevel.WARNING, message, EventCategory.AUTHENTICATION, user_id=user_id, details={ "ip_address": ip_address, "success": success, **(details or {}) } ) def log_phi_access( self, user_id: str, document_id: str, action: str, ip_address: str, details: Optional[Dict[str, Any]] = None ): """Log PHI access event (HIPAA requirement)""" message = f"PHI access: {action} on document {document_id} by user {user_id}" self.log( LogLevel.INFO, message, EventCategory.PHI_ACCESS, user_id=user_id, document_id=document_id, details={ "action": action, "ip_address": ip_address, **(details or {}) }, anonymize=False # PHI access logs must be complete ) def log_model_inference( self, model_id: str, document_id: str, confidence: float, duration_seconds: float, success: bool, details: Optional[Dict[str, Any]] = None ): """Log model inference event""" message = f"Model inference: {model_id} on {document_id} ({'success' if success else 'failed'})" self.log( LogLevel.INFO, message, EventCategory.MODEL_INFERENCE, document_id=document_id, model_id=model_id, confidence=confidence, details={ "duration_seconds": duration_seconds, "success": success, **(details or {}) } ) def log_security_event( self, event_type: str, severity: str, user_id: Optional[str] = None, ip_address: Optional[str] = None, details: Optional[Dict[str, Any]] = None ): """Log security event""" message = f"Security event: {event_type} (severity: {severity})" level = LogLevel.CRITICAL if severity == "high" else LogLevel.WARNING self.log( level, message, EventCategory.SECURITY_EVENT, user_id=user_id, details={ "event_type": event_type, "severity": severity, "ip_address": ip_address, **(details or {}) } ) def log_exception( self, exception: Exception, context: str, user_id: Optional[str] = None, document_id: Optional[str] = None ): """Log exception with stack trace""" message = f"Exception in {context}: {str(exception)}" self.log( LogLevel.ERROR, message, EventCategory.ERROR_EVENT, user_id=user_id, document_id=document_id, details={ "exception_type": type(exception).__name__, "exception_message": str(exception), "stack_trace": traceback.format_exc(), "context": context } ) def get_log_statistics(self) -> Dict[str, int]: """Get logging statistics""" return dict(self.log_counts) # Global logger instance _medical_logger = None def get_medical_logger(service_name: str = "medical_ai_platform") -> MedicalLogger: """Get singleton medical logger instance""" global _medical_logger if _medical_logger is None: _medical_logger = MedicalLogger(service_name) return _medical_logger