|
|
""" |
|
|
Production Logging Infrastructure |
|
|
Structured logging with medical-specific fields and compliance features |
|
|
|
|
|
Features: |
|
|
- JSON-structured logging for machine parsing |
|
|
- Medical-specific log fields (PHI anonymization, confidence scores) |
|
|
- Log levels with appropriate categorization |
|
|
- Security event logging |
|
|
- Compliance-ready log retention |
|
|
- Centralized log aggregation support |
|
|
|
|
|
Author: MiniMax Agent |
|
|
Date: 2025-10-29 |
|
|
Version: 1.0.0 |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import json |
|
|
import hashlib |
|
|
from typing import Dict, Any, Optional |
|
|
from datetime import datetime |
|
|
from enum import Enum |
|
|
import traceback |
|
|
|
|
|
|
|
|
class LogLevel(Enum): |
|
|
"""Standard log levels""" |
|
|
DEBUG = "DEBUG" |
|
|
INFO = "INFO" |
|
|
WARNING = "WARNING" |
|
|
ERROR = "ERROR" |
|
|
CRITICAL = "CRITICAL" |
|
|
|
|
|
|
|
|
class EventCategory(Enum): |
|
|
"""Event categories for medical AI platform""" |
|
|
AUTHENTICATION = "authentication" |
|
|
AUTHORIZATION = "authorization" |
|
|
PHI_ACCESS = "phi_access" |
|
|
MODEL_INFERENCE = "model_inference" |
|
|
DATA_PROCESSING = "data_processing" |
|
|
SYSTEM_EVENT = "system_event" |
|
|
SECURITY_EVENT = "security_event" |
|
|
COMPLIANCE_EVENT = "compliance_event" |
|
|
PERFORMANCE_EVENT = "performance_event" |
|
|
ERROR_EVENT = "error_event" |
|
|
|
|
|
|
|
|
class MedicalLogger: |
|
|
""" |
|
|
Medical-grade structured logger with compliance features |
|
|
Implements HIPAA-compliant logging with PHI protection |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
service_name: str, |
|
|
environment: str = "production" |
|
|
): |
|
|
self.service_name = service_name |
|
|
self.environment = environment |
|
|
self.logger = logging.getLogger(service_name) |
|
|
self.logger.setLevel(logging.DEBUG) |
|
|
|
|
|
|
|
|
self._setup_json_handler() |
|
|
|
|
|
|
|
|
self.log_counts = {level.value: 0 for level in LogLevel} |
|
|
|
|
|
def _setup_json_handler(self): |
|
|
"""Setup JSON-formatted log handler""" |
|
|
handler = logging.StreamHandler() |
|
|
handler.setLevel(logging.DEBUG) |
|
|
|
|
|
|
|
|
formatter = logging.Formatter( |
|
|
'{"timestamp": "%(asctime)s", "level": "%(levelname)s", ' |
|
|
'"service": "%(name)s", "message": "%(message)s"}' |
|
|
) |
|
|
handler.setFormatter(formatter) |
|
|
|
|
|
self.logger.addHandler(handler) |
|
|
|
|
|
def _anonymize_phi(self, data: Any) -> Any: |
|
|
"""Anonymize PHI in log data""" |
|
|
if isinstance(data, dict): |
|
|
anonymized = {} |
|
|
phi_fields = ["patient_id", "patient_name", "ssn", "mrn", "email", "phone"] |
|
|
|
|
|
for key, value in data.items(): |
|
|
if any(phi_field in key.lower() for phi_field in phi_fields): |
|
|
|
|
|
if isinstance(value, str): |
|
|
anonymized[key] = hashlib.sha256(value.encode()).hexdigest()[:16] |
|
|
else: |
|
|
anonymized[key] = "[REDACTED]" |
|
|
elif isinstance(value, (dict, list)): |
|
|
anonymized[key] = self._anonymize_phi(value) |
|
|
else: |
|
|
anonymized[key] = value |
|
|
|
|
|
return anonymized |
|
|
|
|
|
elif isinstance(data, list): |
|
|
return [self._anonymize_phi(item) for item in data] |
|
|
|
|
|
return data |
|
|
|
|
|
def _create_log_entry( |
|
|
self, |
|
|
level: LogLevel, |
|
|
message: str, |
|
|
category: EventCategory, |
|
|
details: Optional[Dict[str, Any]] = None, |
|
|
user_id: Optional[str] = None, |
|
|
document_id: Optional[str] = None, |
|
|
model_id: Optional[str] = None, |
|
|
confidence: Optional[float] = None, |
|
|
anonymize: bool = True |
|
|
) -> Dict[str, Any]: |
|
|
"""Create structured log entry""" |
|
|
|
|
|
log_entry = { |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"level": level.value, |
|
|
"service": self.service_name, |
|
|
"environment": self.environment, |
|
|
"category": category.value, |
|
|
"message": message |
|
|
} |
|
|
|
|
|
|
|
|
if user_id: |
|
|
log_entry["user_id"] = user_id |
|
|
|
|
|
if document_id: |
|
|
log_entry["document_id"] = document_id |
|
|
|
|
|
if model_id: |
|
|
log_entry["model_id"] = model_id |
|
|
|
|
|
if confidence is not None: |
|
|
log_entry["confidence"] = confidence |
|
|
|
|
|
if details: |
|
|
|
|
|
if anonymize: |
|
|
details = self._anonymize_phi(details) |
|
|
log_entry["details"] = details |
|
|
|
|
|
return log_entry |
|
|
|
|
|
def log( |
|
|
self, |
|
|
level: LogLevel, |
|
|
message: str, |
|
|
category: EventCategory = EventCategory.SYSTEM_EVENT, |
|
|
**kwargs |
|
|
): |
|
|
"""Generic log method""" |
|
|
log_entry = self._create_log_entry(level, message, category, **kwargs) |
|
|
|
|
|
|
|
|
self.log_counts[level.value] += 1 |
|
|
|
|
|
|
|
|
if level == LogLevel.DEBUG: |
|
|
self.logger.debug(json.dumps(log_entry)) |
|
|
elif level == LogLevel.INFO: |
|
|
self.logger.info(json.dumps(log_entry)) |
|
|
elif level == LogLevel.WARNING: |
|
|
self.logger.warning(json.dumps(log_entry)) |
|
|
elif level == LogLevel.ERROR: |
|
|
self.logger.error(json.dumps(log_entry)) |
|
|
elif level == LogLevel.CRITICAL: |
|
|
self.logger.critical(json.dumps(log_entry)) |
|
|
|
|
|
def info(self, message: str, category: EventCategory = EventCategory.SYSTEM_EVENT, **kwargs): |
|
|
"""Log info message""" |
|
|
self.log(LogLevel.INFO, message, category, **kwargs) |
|
|
|
|
|
def warning(self, message: str, category: EventCategory = EventCategory.SYSTEM_EVENT, **kwargs): |
|
|
"""Log warning message""" |
|
|
self.log(LogLevel.WARNING, message, category, **kwargs) |
|
|
|
|
|
def error(self, message: str, category: EventCategory = EventCategory.ERROR_EVENT, **kwargs): |
|
|
"""Log error message""" |
|
|
self.log(LogLevel.ERROR, message, category, **kwargs) |
|
|
|
|
|
def critical(self, message: str, category: EventCategory = EventCategory.ERROR_EVENT, **kwargs): |
|
|
"""Log critical message""" |
|
|
self.log(LogLevel.CRITICAL, message, category, **kwargs) |
|
|
|
|
|
def debug(self, message: str, category: EventCategory = EventCategory.SYSTEM_EVENT, **kwargs): |
|
|
"""Log debug message""" |
|
|
self.log(LogLevel.DEBUG, message, category, **kwargs) |
|
|
|
|
|
def log_authentication( |
|
|
self, |
|
|
user_id: str, |
|
|
success: bool, |
|
|
ip_address: str, |
|
|
details: Optional[Dict[str, Any]] = None |
|
|
): |
|
|
"""Log authentication event""" |
|
|
message = f"Authentication {'successful' if success else 'failed'} for user {user_id}" |
|
|
|
|
|
self.log( |
|
|
LogLevel.INFO if success else LogLevel.WARNING, |
|
|
message, |
|
|
EventCategory.AUTHENTICATION, |
|
|
user_id=user_id, |
|
|
details={ |
|
|
"ip_address": ip_address, |
|
|
"success": success, |
|
|
**(details or {}) |
|
|
} |
|
|
) |
|
|
|
|
|
def log_phi_access( |
|
|
self, |
|
|
user_id: str, |
|
|
document_id: str, |
|
|
action: str, |
|
|
ip_address: str, |
|
|
details: Optional[Dict[str, Any]] = None |
|
|
): |
|
|
"""Log PHI access event (HIPAA requirement)""" |
|
|
message = f"PHI access: {action} on document {document_id} by user {user_id}" |
|
|
|
|
|
self.log( |
|
|
LogLevel.INFO, |
|
|
message, |
|
|
EventCategory.PHI_ACCESS, |
|
|
user_id=user_id, |
|
|
document_id=document_id, |
|
|
details={ |
|
|
"action": action, |
|
|
"ip_address": ip_address, |
|
|
**(details or {}) |
|
|
}, |
|
|
anonymize=False |
|
|
) |
|
|
|
|
|
def log_model_inference( |
|
|
self, |
|
|
model_id: str, |
|
|
document_id: str, |
|
|
confidence: float, |
|
|
duration_seconds: float, |
|
|
success: bool, |
|
|
details: Optional[Dict[str, Any]] = None |
|
|
): |
|
|
"""Log model inference event""" |
|
|
message = f"Model inference: {model_id} on {document_id} ({'success' if success else 'failed'})" |
|
|
|
|
|
self.log( |
|
|
LogLevel.INFO, |
|
|
message, |
|
|
EventCategory.MODEL_INFERENCE, |
|
|
document_id=document_id, |
|
|
model_id=model_id, |
|
|
confidence=confidence, |
|
|
details={ |
|
|
"duration_seconds": duration_seconds, |
|
|
"success": success, |
|
|
**(details or {}) |
|
|
} |
|
|
) |
|
|
|
|
|
def log_security_event( |
|
|
self, |
|
|
event_type: str, |
|
|
severity: str, |
|
|
user_id: Optional[str] = None, |
|
|
ip_address: Optional[str] = None, |
|
|
details: Optional[Dict[str, Any]] = None |
|
|
): |
|
|
"""Log security event""" |
|
|
message = f"Security event: {event_type} (severity: {severity})" |
|
|
|
|
|
level = LogLevel.CRITICAL if severity == "high" else LogLevel.WARNING |
|
|
|
|
|
self.log( |
|
|
level, |
|
|
message, |
|
|
EventCategory.SECURITY_EVENT, |
|
|
user_id=user_id, |
|
|
details={ |
|
|
"event_type": event_type, |
|
|
"severity": severity, |
|
|
"ip_address": ip_address, |
|
|
**(details or {}) |
|
|
} |
|
|
) |
|
|
|
|
|
def log_exception( |
|
|
self, |
|
|
exception: Exception, |
|
|
context: str, |
|
|
user_id: Optional[str] = None, |
|
|
document_id: Optional[str] = None |
|
|
): |
|
|
"""Log exception with stack trace""" |
|
|
message = f"Exception in {context}: {str(exception)}" |
|
|
|
|
|
self.log( |
|
|
LogLevel.ERROR, |
|
|
message, |
|
|
EventCategory.ERROR_EVENT, |
|
|
user_id=user_id, |
|
|
document_id=document_id, |
|
|
details={ |
|
|
"exception_type": type(exception).__name__, |
|
|
"exception_message": str(exception), |
|
|
"stack_trace": traceback.format_exc(), |
|
|
"context": context |
|
|
} |
|
|
) |
|
|
|
|
|
def get_log_statistics(self) -> Dict[str, int]: |
|
|
"""Get logging statistics""" |
|
|
return dict(self.log_counts) |
|
|
|
|
|
|
|
|
|
|
|
_medical_logger = None |
|
|
|
|
|
|
|
|
def get_medical_logger(service_name: str = "medical_ai_platform") -> MedicalLogger: |
|
|
"""Get singleton medical logger instance""" |
|
|
global _medical_logger |
|
|
if _medical_logger is None: |
|
|
_medical_logger = MedicalLogger(service_name) |
|
|
return _medical_logger |
|
|
|