Spaces:
Sleeping
Sleeping
| """ | |
| Compliance Reporting System | |
| HIPAA/GDPR compliance reporting and audit trail management | |
| Features: | |
| - HIPAA audit trail reports | |
| - GDPR compliance documentation | |
| - Clinical quality metrics tracking | |
| - Review queue performance analysis | |
| - Security incident reporting | |
| - Regulatory compliance dashboards | |
| Author: MiniMax Agent | |
| Date: 2025-10-29 | |
| Version: 1.0.0 | |
| """ | |
| import logging | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime, timedelta | |
| from collections import defaultdict | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| logger = logging.getLogger(__name__) | |
| class ComplianceStandard(Enum): | |
| """Compliance standards""" | |
| HIPAA = "HIPAA" | |
| GDPR = "GDPR" | |
| FDA = "FDA" | |
| ISO13485 = "ISO13485" | |
| class AuditEvent: | |
| """Audit trail event""" | |
| event_id: str | |
| timestamp: str | |
| user_id: str | |
| event_type: str | |
| resource: str | |
| action: str | |
| ip_address: str | |
| success: bool | |
| details: Dict[str, Any] | |
| def to_dict(self) -> Dict[str, Any]: | |
| return asdict(self) | |
| class ComplianceMetric: | |
| """Compliance metric""" | |
| metric_name: str | |
| value: float | |
| target: float | |
| status: str # "compliant", "warning", "non_compliant" | |
| timestamp: str | |
| def to_dict(self) -> Dict[str, Any]: | |
| return asdict(self) | |
| class ComplianceReportingSystem: | |
| """ | |
| Comprehensive compliance reporting system | |
| Generates reports for regulatory audits and quality assurance | |
| """ | |
| def __init__(self): | |
| self.audit_trail: List[AuditEvent] = [] | |
| self.compliance_metrics: Dict[str, List[ComplianceMetric]] = defaultdict(list) | |
| self.phi_access_log: List[Dict[str, Any]] = [] | |
| self.security_incidents: List[Dict[str, Any]] = [] | |
| logger.info("Compliance Reporting System initialized") | |
| def log_audit_event( | |
| self, | |
| user_id: str, | |
| event_type: str, | |
| resource: str, | |
| action: str, | |
| ip_address: str, | |
| success: bool = True, | |
| details: Optional[Dict[str, Any]] = None | |
| ) -> AuditEvent: | |
| """Log an audit event for compliance tracking""" | |
| event = AuditEvent( | |
| event_id=f"audit_{len(self.audit_trail)}_{datetime.utcnow().timestamp()}", | |
| timestamp=datetime.utcnow().isoformat(), | |
| user_id=user_id, | |
| event_type=event_type, | |
| resource=resource, | |
| action=action, | |
| ip_address=ip_address, | |
| success=success, | |
| details=details or {} | |
| ) | |
| self.audit_trail.append(event) | |
| return event | |
| def log_phi_access( | |
| self, | |
| user_id: str, | |
| document_id: str, | |
| action: str, | |
| ip_address: str, | |
| timestamp: Optional[str] = None | |
| ): | |
| """Log PHI access (HIPAA requirement)""" | |
| access_log = { | |
| "timestamp": timestamp or datetime.utcnow().isoformat(), | |
| "user_id": user_id, | |
| "document_id": document_id, | |
| "action": action, | |
| "ip_address": ip_address | |
| } | |
| self.phi_access_log.append(access_log) | |
| # Also log as audit event | |
| self.log_audit_event( | |
| user_id=user_id, | |
| event_type="PHI_ACCESS", | |
| resource=f"document:{document_id}", | |
| action=action, | |
| ip_address=ip_address, | |
| details={"document_id": document_id} | |
| ) | |
| def log_security_incident( | |
| self, | |
| incident_type: str, | |
| severity: str, | |
| description: str, | |
| user_id: Optional[str] = None, | |
| ip_address: Optional[str] = None, | |
| details: Optional[Dict[str, Any]] = None | |
| ): | |
| """Log security incident""" | |
| incident = { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "incident_type": incident_type, | |
| "severity": severity, | |
| "description": description, | |
| "user_id": user_id, | |
| "ip_address": ip_address, | |
| "details": details or {}, | |
| "resolved": False | |
| } | |
| self.security_incidents.append(incident) | |
| logger.warning(f"Security incident logged: {incident_type} (severity: {severity})") | |
| def record_compliance_metric( | |
| self, | |
| metric_name: str, | |
| value: float, | |
| target: float | |
| ): | |
| """Record a compliance metric""" | |
| # Determine status | |
| if value >= target: | |
| status = "compliant" | |
| elif value >= target * 0.9: # Within 10% of target | |
| status = "warning" | |
| else: | |
| status = "non_compliant" | |
| metric = ComplianceMetric( | |
| metric_name=metric_name, | |
| value=value, | |
| target=target, | |
| status=status, | |
| timestamp=datetime.utcnow().isoformat() | |
| ) | |
| self.compliance_metrics[metric_name].append(metric) | |
| def generate_hipaa_report( | |
| self, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None | |
| ) -> Dict[str, Any]: | |
| """Generate HIPAA compliance report""" | |
| if not start_date: | |
| start_date = datetime.utcnow() - timedelta(days=30) | |
| if not end_date: | |
| end_date = datetime.utcnow() | |
| # Filter PHI access logs | |
| phi_accesses = [ | |
| log for log in self.phi_access_log | |
| if start_date <= datetime.fromisoformat(log["timestamp"]) <= end_date | |
| ] | |
| # Aggregate by user | |
| access_by_user = defaultdict(int) | |
| for access in phi_accesses: | |
| access_by_user[access["user_id"]] += 1 | |
| # Aggregate by action | |
| access_by_action = defaultdict(int) | |
| for access in phi_accesses: | |
| access_by_action[access["action"]] += 1 | |
| report = { | |
| "report_type": "HIPAA_COMPLIANCE", | |
| "period": { | |
| "start": start_date.isoformat(), | |
| "end": end_date.isoformat() | |
| }, | |
| "generated_at": datetime.utcnow().isoformat(), | |
| "summary": { | |
| "total_phi_accesses": len(phi_accesses), | |
| "unique_users": len(access_by_user), | |
| "access_by_user": dict(access_by_user), | |
| "access_by_action": dict(access_by_action) | |
| }, | |
| "audit_trail_summary": { | |
| "total_events": len([ | |
| e for e in self.audit_trail | |
| if start_date <= datetime.fromisoformat(e.timestamp) <= end_date | |
| ]), | |
| "phi_access_events": len(phi_accesses) | |
| }, | |
| "security_incidents": len([ | |
| i for i in self.security_incidents | |
| if start_date <= datetime.fromisoformat(i["timestamp"]) <= end_date | |
| ]), | |
| "compliance_status": "COMPLIANT" if len(self.security_incidents) == 0 else "REVIEW_REQUIRED" | |
| } | |
| return report | |
| def generate_gdpr_report( | |
| self, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None | |
| ) -> Dict[str, Any]: | |
| """Generate GDPR compliance report""" | |
| if not start_date: | |
| start_date = datetime.utcnow() - timedelta(days=30) | |
| if not end_date: | |
| end_date = datetime.utcnow() | |
| # Filter relevant audit events | |
| audit_events = [ | |
| e for e in self.audit_trail | |
| if start_date <= datetime.fromisoformat(e.timestamp) <= end_date | |
| ] | |
| # Count data processing activities | |
| data_processing_events = [ | |
| e for e in audit_events | |
| if e.event_type in ["UPLOAD", "PROCESS", "DELETE"] | |
| ] | |
| # Count access events | |
| access_events = [ | |
| e for e in audit_events | |
| if e.event_type in ["VIEW", "DOWNLOAD", "PHI_ACCESS"] | |
| ] | |
| report = { | |
| "report_type": "GDPR_COMPLIANCE", | |
| "period": { | |
| "start": start_date.isoformat(), | |
| "end": end_date.isoformat() | |
| }, | |
| "generated_at": datetime.utcnow().isoformat(), | |
| "data_processing": { | |
| "total_processing_events": len(data_processing_events), | |
| "by_action": self._count_by_field(data_processing_events, "action") | |
| }, | |
| "data_access": { | |
| "total_access_events": len(access_events), | |
| "by_user": self._count_by_field(access_events, "user_id") | |
| }, | |
| "data_retention": { | |
| "retention_policy_days": 2555, # 7 years for medical records | |
| "current_records": len(self.phi_access_log), | |
| "oldest_record": min( | |
| [log["timestamp"] for log in self.phi_access_log], | |
| default=None | |
| ) | |
| }, | |
| "user_rights": { | |
| "access_requests": 0, # Would track actual requests | |
| "deletion_requests": 0, | |
| "portability_requests": 0 | |
| }, | |
| "compliance_status": "COMPLIANT" | |
| } | |
| return report | |
| def generate_quality_metrics_report( | |
| self, | |
| window_days: int = 30 | |
| ) -> Dict[str, Any]: | |
| """Generate clinical quality metrics report""" | |
| cutoff = datetime.utcnow() - timedelta(days=window_days) | |
| # Get recent metrics | |
| recent_metrics = {} | |
| for metric_name, metrics_list in self.compliance_metrics.items(): | |
| recent = [ | |
| m for m in metrics_list | |
| if datetime.fromisoformat(m.timestamp) > cutoff | |
| ] | |
| if recent: | |
| latest = recent[-1] | |
| recent_metrics[metric_name] = { | |
| "current_value": latest.value, | |
| "target": latest.target, | |
| "status": latest.status, | |
| "trend": self._calculate_trend(recent) | |
| } | |
| report = { | |
| "report_type": "QUALITY_METRICS", | |
| "period_days": window_days, | |
| "generated_at": datetime.utcnow().isoformat(), | |
| "metrics": recent_metrics, | |
| "overall_compliance_rate": self._calculate_overall_compliance(), | |
| "non_compliant_metrics": [ | |
| name for name, data in recent_metrics.items() | |
| if data["status"] == "non_compliant" | |
| ] | |
| } | |
| return report | |
| def generate_review_queue_report( | |
| self, | |
| window_days: int = 30 | |
| ) -> Dict[str, Any]: | |
| """Generate review queue performance report""" | |
| cutoff = datetime.utcnow() - timedelta(days=window_days) | |
| # Filter review events from audit trail | |
| review_events = [ | |
| e for e in self.audit_trail | |
| if e.event_type == "REVIEW" and | |
| datetime.fromisoformat(e.timestamp) > cutoff | |
| ] | |
| # Calculate metrics | |
| total_reviews = len(review_events) | |
| reviews_by_user = self._count_by_field(review_events, "user_id") | |
| # Calculate average turnaround time (would need actual data) | |
| avg_turnaround_hours = 24.0 # Placeholder | |
| report = { | |
| "report_type": "REVIEW_QUEUE_PERFORMANCE", | |
| "period_days": window_days, | |
| "generated_at": datetime.utcnow().isoformat(), | |
| "summary": { | |
| "total_reviews": total_reviews, | |
| "average_turnaround_hours": avg_turnaround_hours, | |
| "reviews_by_reviewer": reviews_by_user | |
| }, | |
| "performance_metrics": { | |
| "reviews_per_day": total_reviews / window_days, | |
| "target_turnaround_hours": 24.0, | |
| "turnaround_compliance": "COMPLIANT" if avg_turnaround_hours <= 24 else "NON_COMPLIANT" | |
| } | |
| } | |
| return report | |
| def generate_security_incidents_report( | |
| self, | |
| window_days: int = 30 | |
| ) -> Dict[str, Any]: | |
| """Generate security incidents report""" | |
| cutoff = datetime.utcnow() - timedelta(days=window_days) | |
| recent_incidents = [ | |
| i for i in self.security_incidents | |
| if datetime.fromisoformat(i["timestamp"]) > cutoff | |
| ] | |
| by_severity = self._count_by_field(recent_incidents, "severity") | |
| by_type = self._count_by_field(recent_incidents, "incident_type") | |
| unresolved = [i for i in recent_incidents if not i.get("resolved", False)] | |
| report = { | |
| "report_type": "SECURITY_INCIDENTS", | |
| "period_days": window_days, | |
| "generated_at": datetime.utcnow().isoformat(), | |
| "summary": { | |
| "total_incidents": len(recent_incidents), | |
| "unresolved_incidents": len(unresolved), | |
| "by_severity": by_severity, | |
| "by_type": by_type | |
| }, | |
| "critical_incidents": [ | |
| i for i in recent_incidents | |
| if i["severity"] == "high" | |
| ], | |
| "compliance_impact": "CRITICAL" if len(unresolved) > 0 and any( | |
| i["severity"] == "high" for i in unresolved | |
| ) else "ACCEPTABLE" | |
| } | |
| return report | |
| def get_compliance_dashboard(self) -> Dict[str, Any]: | |
| """Get comprehensive compliance dashboard data""" | |
| return { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "hipaa_status": self._get_hipaa_status(), | |
| "gdpr_status": self._get_gdpr_status(), | |
| "quality_metrics": self._get_quality_status(), | |
| "security_status": self._get_security_status(), | |
| "audit_trail": { | |
| "total_events": len(self.audit_trail), | |
| "phi_accesses": len(self.phi_access_log), | |
| "recent_events": len([ | |
| e for e in self.audit_trail | |
| if datetime.fromisoformat(e.timestamp) > datetime.utcnow() - timedelta(hours=24) | |
| ]) | |
| } | |
| } | |
| def _count_by_field(self, items: List[Any], field: str) -> Dict[str, int]: | |
| """Count items by a specific field""" | |
| counts = defaultdict(int) | |
| for item in items: | |
| if isinstance(item, dict): | |
| value = item.get(field, "unknown") | |
| else: | |
| value = getattr(item, field, "unknown") | |
| counts[value] += 1 | |
| return dict(counts) | |
| def _calculate_trend(self, metrics: List[ComplianceMetric]) -> str: | |
| """Calculate trend from metrics""" | |
| if len(metrics) < 2: | |
| return "stable" | |
| recent_value = metrics[-1].value | |
| previous_value = metrics[-2].value | |
| change_percent = (recent_value - previous_value) / previous_value if previous_value > 0 else 0 | |
| if change_percent > 0.05: | |
| return "improving" | |
| elif change_percent < -0.05: | |
| return "declining" | |
| else: | |
| return "stable" | |
| def _calculate_overall_compliance(self) -> float: | |
| """Calculate overall compliance rate""" | |
| all_metrics = [] | |
| for metrics_list in self.compliance_metrics.values(): | |
| if metrics_list: | |
| all_metrics.append(metrics_list[-1]) | |
| if not all_metrics: | |
| return 1.0 | |
| compliant = sum(1 for m in all_metrics if m.status == "compliant") | |
| return compliant / len(all_metrics) | |
| def _get_hipaa_status(self) -> str: | |
| """Get HIPAA compliance status""" | |
| if len(self.security_incidents) > 0: | |
| return "REVIEW_REQUIRED" | |
| return "COMPLIANT" | |
| def _get_gdpr_status(self) -> str: | |
| """Get GDPR compliance status""" | |
| # Check if audit trail is complete | |
| if len(self.audit_trail) == 0: | |
| return "NOT_CONFIGURED" | |
| return "COMPLIANT" | |
| def _get_quality_status(self) -> str: | |
| """Get quality metrics status""" | |
| compliance_rate = self._calculate_overall_compliance() | |
| if compliance_rate >= 0.95: | |
| return "EXCELLENT" | |
| elif compliance_rate >= 0.85: | |
| return "GOOD" | |
| elif compliance_rate >= 0.75: | |
| return "ACCEPTABLE" | |
| else: | |
| return "NEEDS_IMPROVEMENT" | |
| def _get_security_status(self) -> str: | |
| """Get security status""" | |
| recent_incidents = [ | |
| i for i in self.security_incidents | |
| if datetime.fromisoformat(i["timestamp"]) > datetime.utcnow() - timedelta(days=7) | |
| ] | |
| if any(i["severity"] == "high" for i in recent_incidents): | |
| return "CRITICAL" | |
| elif len(recent_incidents) > 0: | |
| return "WARNING" | |
| else: | |
| return "SECURE" | |
| # Global instance | |
| _compliance_system = None | |
| def get_compliance_system() -> ComplianceReportingSystem: | |
| """Get singleton compliance system instance""" | |
| global _compliance_system | |
| if _compliance_system is None: | |
| _compliance_system = ComplianceReportingSystem() | |
| return _compliance_system | |