Spaces:
Sleeping
Sleeping
| """ | |
| Production Logging Infrastructure | |
| Structured logging with medical-specific fields and compliance features | |
| Features: | |
| - JSON-structured logging for machine parsing | |
| - Medical-specific log fields (PHI anonymization, confidence scores) | |
| - Log levels with appropriate categorization | |
| - Security event logging | |
| - Compliance-ready log retention | |
| - Centralized log aggregation support | |
| Author: MiniMax Agent | |
| Date: 2025-10-29 | |
| Version: 1.0.0 | |
| """ | |
| import logging | |
| import json | |
| import hashlib | |
| from typing import Dict, Any, Optional | |
| from datetime import datetime | |
| from enum import Enum | |
| import traceback | |
| class LogLevel(Enum): | |
| """Standard log levels""" | |
| DEBUG = "DEBUG" | |
| INFO = "INFO" | |
| WARNING = "WARNING" | |
| ERROR = "ERROR" | |
| CRITICAL = "CRITICAL" | |
| class EventCategory(Enum): | |
| """Event categories for medical AI platform""" | |
| AUTHENTICATION = "authentication" | |
| AUTHORIZATION = "authorization" | |
| PHI_ACCESS = "phi_access" | |
| MODEL_INFERENCE = "model_inference" | |
| DATA_PROCESSING = "data_processing" | |
| SYSTEM_EVENT = "system_event" | |
| SECURITY_EVENT = "security_event" | |
| COMPLIANCE_EVENT = "compliance_event" | |
| PERFORMANCE_EVENT = "performance_event" | |
| ERROR_EVENT = "error_event" | |
| class MedicalLogger: | |
| """ | |
| Medical-grade structured logger with compliance features | |
| Implements HIPAA-compliant logging with PHI protection | |
| """ | |
| def __init__( | |
| self, | |
| service_name: str, | |
| environment: str = "production" | |
| ): | |
| self.service_name = service_name | |
| self.environment = environment | |
| self.logger = logging.getLogger(service_name) | |
| self.logger.setLevel(logging.DEBUG) | |
| # Setup JSON formatter | |
| self._setup_json_handler() | |
| # Track logging statistics | |
| self.log_counts = {level.value: 0 for level in LogLevel} | |
| def _setup_json_handler(self): | |
| """Setup JSON-formatted log handler""" | |
| handler = logging.StreamHandler() | |
| handler.setLevel(logging.DEBUG) | |
| # Custom formatter for JSON output | |
| formatter = logging.Formatter( | |
| '{"timestamp": "%(asctime)s", "level": "%(levelname)s", ' | |
| '"service": "%(name)s", "message": "%(message)s"}' | |
| ) | |
| handler.setFormatter(formatter) | |
| self.logger.addHandler(handler) | |
| def _anonymize_phi(self, data: Any) -> Any: | |
| """Anonymize PHI in log data""" | |
| if isinstance(data, dict): | |
| anonymized = {} | |
| phi_fields = ["patient_id", "patient_name", "ssn", "mrn", "email", "phone"] | |
| for key, value in data.items(): | |
| if any(phi_field in key.lower() for phi_field in phi_fields): | |
| # Hash PHI fields | |
| if isinstance(value, str): | |
| anonymized[key] = hashlib.sha256(value.encode()).hexdigest()[:16] | |
| else: | |
| anonymized[key] = "[REDACTED]" | |
| elif isinstance(value, (dict, list)): | |
| anonymized[key] = self._anonymize_phi(value) | |
| else: | |
| anonymized[key] = value | |
| return anonymized | |
| elif isinstance(data, list): | |
| return [self._anonymize_phi(item) for item in data] | |
| return data | |
| def _create_log_entry( | |
| self, | |
| level: LogLevel, | |
| message: str, | |
| category: EventCategory, | |
| details: Optional[Dict[str, Any]] = None, | |
| user_id: Optional[str] = None, | |
| document_id: Optional[str] = None, | |
| model_id: Optional[str] = None, | |
| confidence: Optional[float] = None, | |
| anonymize: bool = True | |
| ) -> Dict[str, Any]: | |
| """Create structured log entry""" | |
| log_entry = { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "level": level.value, | |
| "service": self.service_name, | |
| "environment": self.environment, | |
| "category": category.value, | |
| "message": message | |
| } | |
| # Add optional fields | |
| if user_id: | |
| log_entry["user_id"] = user_id | |
| if document_id: | |
| log_entry["document_id"] = document_id | |
| if model_id: | |
| log_entry["model_id"] = model_id | |
| if confidence is not None: | |
| log_entry["confidence"] = confidence | |
| if details: | |
| # Anonymize PHI if requested | |
| if anonymize: | |
| details = self._anonymize_phi(details) | |
| log_entry["details"] = details | |
| return log_entry | |
| def log( | |
| self, | |
| level: LogLevel, | |
| message: str, | |
| category: EventCategory = EventCategory.SYSTEM_EVENT, | |
| **kwargs | |
| ): | |
| """Generic log method""" | |
| log_entry = self._create_log_entry(level, message, category, **kwargs) | |
| # Increment counter | |
| self.log_counts[level.value] += 1 | |
| # Log at appropriate level | |
| if level == LogLevel.DEBUG: | |
| self.logger.debug(json.dumps(log_entry)) | |
| elif level == LogLevel.INFO: | |
| self.logger.info(json.dumps(log_entry)) | |
| elif level == LogLevel.WARNING: | |
| self.logger.warning(json.dumps(log_entry)) | |
| elif level == LogLevel.ERROR: | |
| self.logger.error(json.dumps(log_entry)) | |
| elif level == LogLevel.CRITICAL: | |
| self.logger.critical(json.dumps(log_entry)) | |
| def info(self, message: str, category: EventCategory = EventCategory.SYSTEM_EVENT, **kwargs): | |
| """Log info message""" | |
| self.log(LogLevel.INFO, message, category, **kwargs) | |
| def warning(self, message: str, category: EventCategory = EventCategory.SYSTEM_EVENT, **kwargs): | |
| """Log warning message""" | |
| self.log(LogLevel.WARNING, message, category, **kwargs) | |
| def error(self, message: str, category: EventCategory = EventCategory.ERROR_EVENT, **kwargs): | |
| """Log error message""" | |
| self.log(LogLevel.ERROR, message, category, **kwargs) | |
| def critical(self, message: str, category: EventCategory = EventCategory.ERROR_EVENT, **kwargs): | |
| """Log critical message""" | |
| self.log(LogLevel.CRITICAL, message, category, **kwargs) | |
| def debug(self, message: str, category: EventCategory = EventCategory.SYSTEM_EVENT, **kwargs): | |
| """Log debug message""" | |
| self.log(LogLevel.DEBUG, message, category, **kwargs) | |
| def log_authentication( | |
| self, | |
| user_id: str, | |
| success: bool, | |
| ip_address: str, | |
| details: Optional[Dict[str, Any]] = None | |
| ): | |
| """Log authentication event""" | |
| message = f"Authentication {'successful' if success else 'failed'} for user {user_id}" | |
| self.log( | |
| LogLevel.INFO if success else LogLevel.WARNING, | |
| message, | |
| EventCategory.AUTHENTICATION, | |
| user_id=user_id, | |
| details={ | |
| "ip_address": ip_address, | |
| "success": success, | |
| **(details or {}) | |
| } | |
| ) | |
| def log_phi_access( | |
| self, | |
| user_id: str, | |
| document_id: str, | |
| action: str, | |
| ip_address: str, | |
| details: Optional[Dict[str, Any]] = None | |
| ): | |
| """Log PHI access event (HIPAA requirement)""" | |
| message = f"PHI access: {action} on document {document_id} by user {user_id}" | |
| self.log( | |
| LogLevel.INFO, | |
| message, | |
| EventCategory.PHI_ACCESS, | |
| user_id=user_id, | |
| document_id=document_id, | |
| details={ | |
| "action": action, | |
| "ip_address": ip_address, | |
| **(details or {}) | |
| }, | |
| anonymize=False # PHI access logs must be complete | |
| ) | |
| def log_model_inference( | |
| self, | |
| model_id: str, | |
| document_id: str, | |
| confidence: float, | |
| duration_seconds: float, | |
| success: bool, | |
| details: Optional[Dict[str, Any]] = None | |
| ): | |
| """Log model inference event""" | |
| message = f"Model inference: {model_id} on {document_id} ({'success' if success else 'failed'})" | |
| self.log( | |
| LogLevel.INFO, | |
| message, | |
| EventCategory.MODEL_INFERENCE, | |
| document_id=document_id, | |
| model_id=model_id, | |
| confidence=confidence, | |
| details={ | |
| "duration_seconds": duration_seconds, | |
| "success": success, | |
| **(details or {}) | |
| } | |
| ) | |
| def log_security_event( | |
| self, | |
| event_type: str, | |
| severity: str, | |
| user_id: Optional[str] = None, | |
| ip_address: Optional[str] = None, | |
| details: Optional[Dict[str, Any]] = None | |
| ): | |
| """Log security event""" | |
| message = f"Security event: {event_type} (severity: {severity})" | |
| level = LogLevel.CRITICAL if severity == "high" else LogLevel.WARNING | |
| self.log( | |
| level, | |
| message, | |
| EventCategory.SECURITY_EVENT, | |
| user_id=user_id, | |
| details={ | |
| "event_type": event_type, | |
| "severity": severity, | |
| "ip_address": ip_address, | |
| **(details or {}) | |
| } | |
| ) | |
| def log_exception( | |
| self, | |
| exception: Exception, | |
| context: str, | |
| user_id: Optional[str] = None, | |
| document_id: Optional[str] = None | |
| ): | |
| """Log exception with stack trace""" | |
| message = f"Exception in {context}: {str(exception)}" | |
| self.log( | |
| LogLevel.ERROR, | |
| message, | |
| EventCategory.ERROR_EVENT, | |
| user_id=user_id, | |
| document_id=document_id, | |
| details={ | |
| "exception_type": type(exception).__name__, | |
| "exception_message": str(exception), | |
| "stack_trace": traceback.format_exc(), | |
| "context": context | |
| } | |
| ) | |
| def get_log_statistics(self) -> Dict[str, int]: | |
| """Get logging statistics""" | |
| return dict(self.log_counts) | |
| # Global logger instance | |
| _medical_logger = None | |
| def get_medical_logger(service_name: str = "medical_ai_platform") -> MedicalLogger: | |
| """Get singleton medical logger instance""" | |
| global _medical_logger | |
| if _medical_logger is None: | |
| _medical_logger = MedicalLogger(service_name) | |
| return _medical_logger | |