""" Compliance Reporting System HIPAA/GDPR compliance reporting and audit trail management Features: - HIPAA audit trail reports - GDPR compliance documentation - Clinical quality metrics tracking - Review queue performance analysis - Security incident reporting - Regulatory compliance dashboards Author: MiniMax Agent Date: 2025-10-29 Version: 1.0.0 """ import logging from typing import Dict, List, Any, Optional from datetime import datetime, timedelta from collections import defaultdict from dataclasses import dataclass, asdict from enum import Enum logger = logging.getLogger(__name__) class ComplianceStandard(Enum): """Compliance standards""" HIPAA = "HIPAA" GDPR = "GDPR" FDA = "FDA" ISO13485 = "ISO13485" @dataclass class AuditEvent: """Audit trail event""" event_id: str timestamp: str user_id: str event_type: str resource: str action: str ip_address: str success: bool details: Dict[str, Any] def to_dict(self) -> Dict[str, Any]: return asdict(self) @dataclass class ComplianceMetric: """Compliance metric""" metric_name: str value: float target: float status: str # "compliant", "warning", "non_compliant" timestamp: str def to_dict(self) -> Dict[str, Any]: return asdict(self) class ComplianceReportingSystem: """ Comprehensive compliance reporting system Generates reports for regulatory audits and quality assurance """ def __init__(self): self.audit_trail: List[AuditEvent] = [] self.compliance_metrics: Dict[str, List[ComplianceMetric]] = defaultdict(list) self.phi_access_log: List[Dict[str, Any]] = [] self.security_incidents: List[Dict[str, Any]] = [] logger.info("Compliance Reporting System initialized") def log_audit_event( self, user_id: str, event_type: str, resource: str, action: str, ip_address: str, success: bool = True, details: Optional[Dict[str, Any]] = None ) -> AuditEvent: """Log an audit event for compliance tracking""" event = AuditEvent( event_id=f"audit_{len(self.audit_trail)}_{datetime.utcnow().timestamp()}", timestamp=datetime.utcnow().isoformat(), user_id=user_id, event_type=event_type, resource=resource, action=action, ip_address=ip_address, success=success, details=details or {} ) self.audit_trail.append(event) return event def log_phi_access( self, user_id: str, document_id: str, action: str, ip_address: str, timestamp: Optional[str] = None ): """Log PHI access (HIPAA requirement)""" access_log = { "timestamp": timestamp or datetime.utcnow().isoformat(), "user_id": user_id, "document_id": document_id, "action": action, "ip_address": ip_address } self.phi_access_log.append(access_log) # Also log as audit event self.log_audit_event( user_id=user_id, event_type="PHI_ACCESS", resource=f"document:{document_id}", action=action, ip_address=ip_address, details={"document_id": document_id} ) def log_security_incident( self, incident_type: str, severity: str, description: str, user_id: Optional[str] = None, ip_address: Optional[str] = None, details: Optional[Dict[str, Any]] = None ): """Log security incident""" incident = { "timestamp": datetime.utcnow().isoformat(), "incident_type": incident_type, "severity": severity, "description": description, "user_id": user_id, "ip_address": ip_address, "details": details or {}, "resolved": False } self.security_incidents.append(incident) logger.warning(f"Security incident logged: {incident_type} (severity: {severity})") def record_compliance_metric( self, metric_name: str, value: float, target: float ): """Record a compliance metric""" # Determine status if value >= target: status = "compliant" elif value >= target * 0.9: # Within 10% of target status = "warning" else: status = "non_compliant" metric = ComplianceMetric( metric_name=metric_name, value=value, target=target, status=status, timestamp=datetime.utcnow().isoformat() ) self.compliance_metrics[metric_name].append(metric) def generate_hipaa_report( self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ) -> Dict[str, Any]: """Generate HIPAA compliance report""" if not start_date: start_date = datetime.utcnow() - timedelta(days=30) if not end_date: end_date = datetime.utcnow() # Filter PHI access logs phi_accesses = [ log for log in self.phi_access_log if start_date <= datetime.fromisoformat(log["timestamp"]) <= end_date ] # Aggregate by user access_by_user = defaultdict(int) for access in phi_accesses: access_by_user[access["user_id"]] += 1 # Aggregate by action access_by_action = defaultdict(int) for access in phi_accesses: access_by_action[access["action"]] += 1 report = { "report_type": "HIPAA_COMPLIANCE", "period": { "start": start_date.isoformat(), "end": end_date.isoformat() }, "generated_at": datetime.utcnow().isoformat(), "summary": { "total_phi_accesses": len(phi_accesses), "unique_users": len(access_by_user), "access_by_user": dict(access_by_user), "access_by_action": dict(access_by_action) }, "audit_trail_summary": { "total_events": len([ e for e in self.audit_trail if start_date <= datetime.fromisoformat(e.timestamp) <= end_date ]), "phi_access_events": len(phi_accesses) }, "security_incidents": len([ i for i in self.security_incidents if start_date <= datetime.fromisoformat(i["timestamp"]) <= end_date ]), "compliance_status": "COMPLIANT" if len(self.security_incidents) == 0 else "REVIEW_REQUIRED" } return report def generate_gdpr_report( self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ) -> Dict[str, Any]: """Generate GDPR compliance report""" if not start_date: start_date = datetime.utcnow() - timedelta(days=30) if not end_date: end_date = datetime.utcnow() # Filter relevant audit events audit_events = [ e for e in self.audit_trail if start_date <= datetime.fromisoformat(e.timestamp) <= end_date ] # Count data processing activities data_processing_events = [ e for e in audit_events if e.event_type in ["UPLOAD", "PROCESS", "DELETE"] ] # Count access events access_events = [ e for e in audit_events if e.event_type in ["VIEW", "DOWNLOAD", "PHI_ACCESS"] ] report = { "report_type": "GDPR_COMPLIANCE", "period": { "start": start_date.isoformat(), "end": end_date.isoformat() }, "generated_at": datetime.utcnow().isoformat(), "data_processing": { "total_processing_events": len(data_processing_events), "by_action": self._count_by_field(data_processing_events, "action") }, "data_access": { "total_access_events": len(access_events), "by_user": self._count_by_field(access_events, "user_id") }, "data_retention": { "retention_policy_days": 2555, # 7 years for medical records "current_records": len(self.phi_access_log), "oldest_record": min( [log["timestamp"] for log in self.phi_access_log], default=None ) }, "user_rights": { "access_requests": 0, # Would track actual requests "deletion_requests": 0, "portability_requests": 0 }, "compliance_status": "COMPLIANT" } return report def generate_quality_metrics_report( self, window_days: int = 30 ) -> Dict[str, Any]: """Generate clinical quality metrics report""" cutoff = datetime.utcnow() - timedelta(days=window_days) # Get recent metrics recent_metrics = {} for metric_name, metrics_list in self.compliance_metrics.items(): recent = [ m for m in metrics_list if datetime.fromisoformat(m.timestamp) > cutoff ] if recent: latest = recent[-1] recent_metrics[metric_name] = { "current_value": latest.value, "target": latest.target, "status": latest.status, "trend": self._calculate_trend(recent) } report = { "report_type": "QUALITY_METRICS", "period_days": window_days, "generated_at": datetime.utcnow().isoformat(), "metrics": recent_metrics, "overall_compliance_rate": self._calculate_overall_compliance(), "non_compliant_metrics": [ name for name, data in recent_metrics.items() if data["status"] == "non_compliant" ] } return report def generate_review_queue_report( self, window_days: int = 30 ) -> Dict[str, Any]: """Generate review queue performance report""" cutoff = datetime.utcnow() - timedelta(days=window_days) # Filter review events from audit trail review_events = [ e for e in self.audit_trail if e.event_type == "REVIEW" and datetime.fromisoformat(e.timestamp) > cutoff ] # Calculate metrics total_reviews = len(review_events) reviews_by_user = self._count_by_field(review_events, "user_id") # Calculate average turnaround time (would need actual data) avg_turnaround_hours = 24.0 # Placeholder report = { "report_type": "REVIEW_QUEUE_PERFORMANCE", "period_days": window_days, "generated_at": datetime.utcnow().isoformat(), "summary": { "total_reviews": total_reviews, "average_turnaround_hours": avg_turnaround_hours, "reviews_by_reviewer": reviews_by_user }, "performance_metrics": { "reviews_per_day": total_reviews / window_days, "target_turnaround_hours": 24.0, "turnaround_compliance": "COMPLIANT" if avg_turnaround_hours <= 24 else "NON_COMPLIANT" } } return report def generate_security_incidents_report( self, window_days: int = 30 ) -> Dict[str, Any]: """Generate security incidents report""" cutoff = datetime.utcnow() - timedelta(days=window_days) recent_incidents = [ i for i in self.security_incidents if datetime.fromisoformat(i["timestamp"]) > cutoff ] by_severity = self._count_by_field(recent_incidents, "severity") by_type = self._count_by_field(recent_incidents, "incident_type") unresolved = [i for i in recent_incidents if not i.get("resolved", False)] report = { "report_type": "SECURITY_INCIDENTS", "period_days": window_days, "generated_at": datetime.utcnow().isoformat(), "summary": { "total_incidents": len(recent_incidents), "unresolved_incidents": len(unresolved), "by_severity": by_severity, "by_type": by_type }, "critical_incidents": [ i for i in recent_incidents if i["severity"] == "high" ], "compliance_impact": "CRITICAL" if len(unresolved) > 0 and any( i["severity"] == "high" for i in unresolved ) else "ACCEPTABLE" } return report def get_compliance_dashboard(self) -> Dict[str, Any]: """Get comprehensive compliance dashboard data""" return { "timestamp": datetime.utcnow().isoformat(), "hipaa_status": self._get_hipaa_status(), "gdpr_status": self._get_gdpr_status(), "quality_metrics": self._get_quality_status(), "security_status": self._get_security_status(), "audit_trail": { "total_events": len(self.audit_trail), "phi_accesses": len(self.phi_access_log), "recent_events": len([ e for e in self.audit_trail if datetime.fromisoformat(e.timestamp) > datetime.utcnow() - timedelta(hours=24) ]) } } def _count_by_field(self, items: List[Any], field: str) -> Dict[str, int]: """Count items by a specific field""" counts = defaultdict(int) for item in items: if isinstance(item, dict): value = item.get(field, "unknown") else: value = getattr(item, field, "unknown") counts[value] += 1 return dict(counts) def _calculate_trend(self, metrics: List[ComplianceMetric]) -> str: """Calculate trend from metrics""" if len(metrics) < 2: return "stable" recent_value = metrics[-1].value previous_value = metrics[-2].value change_percent = (recent_value - previous_value) / previous_value if previous_value > 0 else 0 if change_percent > 0.05: return "improving" elif change_percent < -0.05: return "declining" else: return "stable" def _calculate_overall_compliance(self) -> float: """Calculate overall compliance rate""" all_metrics = [] for metrics_list in self.compliance_metrics.values(): if metrics_list: all_metrics.append(metrics_list[-1]) if not all_metrics: return 1.0 compliant = sum(1 for m in all_metrics if m.status == "compliant") return compliant / len(all_metrics) def _get_hipaa_status(self) -> str: """Get HIPAA compliance status""" if len(self.security_incidents) > 0: return "REVIEW_REQUIRED" return "COMPLIANT" def _get_gdpr_status(self) -> str: """Get GDPR compliance status""" # Check if audit trail is complete if len(self.audit_trail) == 0: return "NOT_CONFIGURED" return "COMPLIANT" def _get_quality_status(self) -> str: """Get quality metrics status""" compliance_rate = self._calculate_overall_compliance() if compliance_rate >= 0.95: return "EXCELLENT" elif compliance_rate >= 0.85: return "GOOD" elif compliance_rate >= 0.75: return "ACCEPTABLE" else: return "NEEDS_IMPROVEMENT" def _get_security_status(self) -> str: """Get security status""" recent_incidents = [ i for i in self.security_incidents if datetime.fromisoformat(i["timestamp"]) > datetime.utcnow() - timedelta(days=7) ] if any(i["severity"] == "high" for i in recent_incidents): return "CRITICAL" elif len(recent_incidents) > 0: return "WARNING" else: return "SECURE" # Global instance _compliance_system = None def get_compliance_system() -> ComplianceReportingSystem: """Get singleton compliance system instance""" global _compliance_system if _compliance_system is None: _compliance_system = ComplianceReportingSystem() return _compliance_system