medical-report-analyzer / backend /production_logging.py
snikhilesh's picture
Deploy production_logging.py to backend/ directory
bc3fc7c verified
"""
Production Logging Infrastructure
Structured logging with medical-specific fields and compliance features
Features:
- JSON-structured logging for machine parsing
- Medical-specific log fields (PHI anonymization, confidence scores)
- Log levels with appropriate categorization
- Security event logging
- Compliance-ready log retention
- Centralized log aggregation support
Author: MiniMax Agent
Date: 2025-10-29
Version: 1.0.0
"""
import logging
import json
import hashlib
from typing import Dict, Any, Optional
from datetime import datetime
from enum import Enum
import traceback
class LogLevel(Enum):
"""Standard log levels"""
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
class EventCategory(Enum):
"""Event categories for medical AI platform"""
AUTHENTICATION = "authentication"
AUTHORIZATION = "authorization"
PHI_ACCESS = "phi_access"
MODEL_INFERENCE = "model_inference"
DATA_PROCESSING = "data_processing"
SYSTEM_EVENT = "system_event"
SECURITY_EVENT = "security_event"
COMPLIANCE_EVENT = "compliance_event"
PERFORMANCE_EVENT = "performance_event"
ERROR_EVENT = "error_event"
class MedicalLogger:
"""
Medical-grade structured logger with compliance features
Implements HIPAA-compliant logging with PHI protection
"""
def __init__(
self,
service_name: str,
environment: str = "production"
):
self.service_name = service_name
self.environment = environment
self.logger = logging.getLogger(service_name)
self.logger.setLevel(logging.DEBUG)
# Setup JSON formatter
self._setup_json_handler()
# Track logging statistics
self.log_counts = {level.value: 0 for level in LogLevel}
def _setup_json_handler(self):
"""Setup JSON-formatted log handler"""
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
# Custom formatter for JSON output
formatter = logging.Formatter(
'{"timestamp": "%(asctime)s", "level": "%(levelname)s", '
'"service": "%(name)s", "message": "%(message)s"}'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def _anonymize_phi(self, data: Any) -> Any:
"""Anonymize PHI in log data"""
if isinstance(data, dict):
anonymized = {}
phi_fields = ["patient_id", "patient_name", "ssn", "mrn", "email", "phone"]
for key, value in data.items():
if any(phi_field in key.lower() for phi_field in phi_fields):
# Hash PHI fields
if isinstance(value, str):
anonymized[key] = hashlib.sha256(value.encode()).hexdigest()[:16]
else:
anonymized[key] = "[REDACTED]"
elif isinstance(value, (dict, list)):
anonymized[key] = self._anonymize_phi(value)
else:
anonymized[key] = value
return anonymized
elif isinstance(data, list):
return [self._anonymize_phi(item) for item in data]
return data
def _create_log_entry(
self,
level: LogLevel,
message: str,
category: EventCategory,
details: Optional[Dict[str, Any]] = None,
user_id: Optional[str] = None,
document_id: Optional[str] = None,
model_id: Optional[str] = None,
confidence: Optional[float] = None,
anonymize: bool = True
) -> Dict[str, Any]:
"""Create structured log entry"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": level.value,
"service": self.service_name,
"environment": self.environment,
"category": category.value,
"message": message
}
# Add optional fields
if user_id:
log_entry["user_id"] = user_id
if document_id:
log_entry["document_id"] = document_id
if model_id:
log_entry["model_id"] = model_id
if confidence is not None:
log_entry["confidence"] = confidence
if details:
# Anonymize PHI if requested
if anonymize:
details = self._anonymize_phi(details)
log_entry["details"] = details
return log_entry
def log(
self,
level: LogLevel,
message: str,
category: EventCategory = EventCategory.SYSTEM_EVENT,
**kwargs
):
"""Generic log method"""
log_entry = self._create_log_entry(level, message, category, **kwargs)
# Increment counter
self.log_counts[level.value] += 1
# Log at appropriate level
if level == LogLevel.DEBUG:
self.logger.debug(json.dumps(log_entry))
elif level == LogLevel.INFO:
self.logger.info(json.dumps(log_entry))
elif level == LogLevel.WARNING:
self.logger.warning(json.dumps(log_entry))
elif level == LogLevel.ERROR:
self.logger.error(json.dumps(log_entry))
elif level == LogLevel.CRITICAL:
self.logger.critical(json.dumps(log_entry))
def info(self, message: str, category: EventCategory = EventCategory.SYSTEM_EVENT, **kwargs):
"""Log info message"""
self.log(LogLevel.INFO, message, category, **kwargs)
def warning(self, message: str, category: EventCategory = EventCategory.SYSTEM_EVENT, **kwargs):
"""Log warning message"""
self.log(LogLevel.WARNING, message, category, **kwargs)
def error(self, message: str, category: EventCategory = EventCategory.ERROR_EVENT, **kwargs):
"""Log error message"""
self.log(LogLevel.ERROR, message, category, **kwargs)
def critical(self, message: str, category: EventCategory = EventCategory.ERROR_EVENT, **kwargs):
"""Log critical message"""
self.log(LogLevel.CRITICAL, message, category, **kwargs)
def debug(self, message: str, category: EventCategory = EventCategory.SYSTEM_EVENT, **kwargs):
"""Log debug message"""
self.log(LogLevel.DEBUG, message, category, **kwargs)
def log_authentication(
self,
user_id: str,
success: bool,
ip_address: str,
details: Optional[Dict[str, Any]] = None
):
"""Log authentication event"""
message = f"Authentication {'successful' if success else 'failed'} for user {user_id}"
self.log(
LogLevel.INFO if success else LogLevel.WARNING,
message,
EventCategory.AUTHENTICATION,
user_id=user_id,
details={
"ip_address": ip_address,
"success": success,
**(details or {})
}
)
def log_phi_access(
self,
user_id: str,
document_id: str,
action: str,
ip_address: str,
details: Optional[Dict[str, Any]] = None
):
"""Log PHI access event (HIPAA requirement)"""
message = f"PHI access: {action} on document {document_id} by user {user_id}"
self.log(
LogLevel.INFO,
message,
EventCategory.PHI_ACCESS,
user_id=user_id,
document_id=document_id,
details={
"action": action,
"ip_address": ip_address,
**(details or {})
},
anonymize=False # PHI access logs must be complete
)
def log_model_inference(
self,
model_id: str,
document_id: str,
confidence: float,
duration_seconds: float,
success: bool,
details: Optional[Dict[str, Any]] = None
):
"""Log model inference event"""
message = f"Model inference: {model_id} on {document_id} ({'success' if success else 'failed'})"
self.log(
LogLevel.INFO,
message,
EventCategory.MODEL_INFERENCE,
document_id=document_id,
model_id=model_id,
confidence=confidence,
details={
"duration_seconds": duration_seconds,
"success": success,
**(details or {})
}
)
def log_security_event(
self,
event_type: str,
severity: str,
user_id: Optional[str] = None,
ip_address: Optional[str] = None,
details: Optional[Dict[str, Any]] = None
):
"""Log security event"""
message = f"Security event: {event_type} (severity: {severity})"
level = LogLevel.CRITICAL if severity == "high" else LogLevel.WARNING
self.log(
level,
message,
EventCategory.SECURITY_EVENT,
user_id=user_id,
details={
"event_type": event_type,
"severity": severity,
"ip_address": ip_address,
**(details or {})
}
)
def log_exception(
self,
exception: Exception,
context: str,
user_id: Optional[str] = None,
document_id: Optional[str] = None
):
"""Log exception with stack trace"""
message = f"Exception in {context}: {str(exception)}"
self.log(
LogLevel.ERROR,
message,
EventCategory.ERROR_EVENT,
user_id=user_id,
document_id=document_id,
details={
"exception_type": type(exception).__name__,
"exception_message": str(exception),
"stack_trace": traceback.format_exc(),
"context": context
}
)
def get_log_statistics(self) -> Dict[str, int]:
"""Get logging statistics"""
return dict(self.log_counts)
# Global logger instance
_medical_logger = None
def get_medical_logger(service_name: str = "medical_ai_platform") -> MedicalLogger:
"""Get singleton medical logger instance"""
global _medical_logger
if _medical_logger is None:
_medical_logger = MedicalLogger(service_name)
return _medical_logger