medical-report-analyzer / compliance_reporting.py
snikhilesh's picture
Deploy backend with monitoring infrastructure - Complete Medical AI Platform
13d5ab4 verified
"""
Compliance Reporting System
HIPAA/GDPR compliance reporting and audit trail management
Features:
- HIPAA audit trail reports
- GDPR compliance documentation
- Clinical quality metrics tracking
- Review queue performance analysis
- Security incident reporting
- Regulatory compliance dashboards
Author: MiniMax Agent
Date: 2025-10-29
Version: 1.0.0
"""
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from collections import defaultdict
from dataclasses import dataclass, asdict
from enum import Enum
logger = logging.getLogger(__name__)
class ComplianceStandard(Enum):
"""Compliance standards"""
HIPAA = "HIPAA"
GDPR = "GDPR"
FDA = "FDA"
ISO13485 = "ISO13485"
@dataclass
class AuditEvent:
"""Audit trail event"""
event_id: str
timestamp: str
user_id: str
event_type: str
resource: str
action: str
ip_address: str
success: bool
details: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class ComplianceMetric:
"""Compliance metric"""
metric_name: str
value: float
target: float
status: str # "compliant", "warning", "non_compliant"
timestamp: str
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class ComplianceReportingSystem:
"""
Comprehensive compliance reporting system
Generates reports for regulatory audits and quality assurance
"""
def __init__(self):
self.audit_trail: List[AuditEvent] = []
self.compliance_metrics: Dict[str, List[ComplianceMetric]] = defaultdict(list)
self.phi_access_log: List[Dict[str, Any]] = []
self.security_incidents: List[Dict[str, Any]] = []
logger.info("Compliance Reporting System initialized")
def log_audit_event(
self,
user_id: str,
event_type: str,
resource: str,
action: str,
ip_address: str,
success: bool = True,
details: Optional[Dict[str, Any]] = None
) -> AuditEvent:
"""Log an audit event for compliance tracking"""
event = AuditEvent(
event_id=f"audit_{len(self.audit_trail)}_{datetime.utcnow().timestamp()}",
timestamp=datetime.utcnow().isoformat(),
user_id=user_id,
event_type=event_type,
resource=resource,
action=action,
ip_address=ip_address,
success=success,
details=details or {}
)
self.audit_trail.append(event)
return event
def log_phi_access(
self,
user_id: str,
document_id: str,
action: str,
ip_address: str,
timestamp: Optional[str] = None
):
"""Log PHI access (HIPAA requirement)"""
access_log = {
"timestamp": timestamp or datetime.utcnow().isoformat(),
"user_id": user_id,
"document_id": document_id,
"action": action,
"ip_address": ip_address
}
self.phi_access_log.append(access_log)
# Also log as audit event
self.log_audit_event(
user_id=user_id,
event_type="PHI_ACCESS",
resource=f"document:{document_id}",
action=action,
ip_address=ip_address,
details={"document_id": document_id}
)
def log_security_incident(
self,
incident_type: str,
severity: str,
description: str,
user_id: Optional[str] = None,
ip_address: Optional[str] = None,
details: Optional[Dict[str, Any]] = None
):
"""Log security incident"""
incident = {
"timestamp": datetime.utcnow().isoformat(),
"incident_type": incident_type,
"severity": severity,
"description": description,
"user_id": user_id,
"ip_address": ip_address,
"details": details or {},
"resolved": False
}
self.security_incidents.append(incident)
logger.warning(f"Security incident logged: {incident_type} (severity: {severity})")
def record_compliance_metric(
self,
metric_name: str,
value: float,
target: float
):
"""Record a compliance metric"""
# Determine status
if value >= target:
status = "compliant"
elif value >= target * 0.9: # Within 10% of target
status = "warning"
else:
status = "non_compliant"
metric = ComplianceMetric(
metric_name=metric_name,
value=value,
target=target,
status=status,
timestamp=datetime.utcnow().isoformat()
)
self.compliance_metrics[metric_name].append(metric)
def generate_hipaa_report(
self,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""Generate HIPAA compliance report"""
if not start_date:
start_date = datetime.utcnow() - timedelta(days=30)
if not end_date:
end_date = datetime.utcnow()
# Filter PHI access logs
phi_accesses = [
log for log in self.phi_access_log
if start_date <= datetime.fromisoformat(log["timestamp"]) <= end_date
]
# Aggregate by user
access_by_user = defaultdict(int)
for access in phi_accesses:
access_by_user[access["user_id"]] += 1
# Aggregate by action
access_by_action = defaultdict(int)
for access in phi_accesses:
access_by_action[access["action"]] += 1
report = {
"report_type": "HIPAA_COMPLIANCE",
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat()
},
"generated_at": datetime.utcnow().isoformat(),
"summary": {
"total_phi_accesses": len(phi_accesses),
"unique_users": len(access_by_user),
"access_by_user": dict(access_by_user),
"access_by_action": dict(access_by_action)
},
"audit_trail_summary": {
"total_events": len([
e for e in self.audit_trail
if start_date <= datetime.fromisoformat(e.timestamp) <= end_date
]),
"phi_access_events": len(phi_accesses)
},
"security_incidents": len([
i for i in self.security_incidents
if start_date <= datetime.fromisoformat(i["timestamp"]) <= end_date
]),
"compliance_status": "COMPLIANT" if len(self.security_incidents) == 0 else "REVIEW_REQUIRED"
}
return report
def generate_gdpr_report(
self,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""Generate GDPR compliance report"""
if not start_date:
start_date = datetime.utcnow() - timedelta(days=30)
if not end_date:
end_date = datetime.utcnow()
# Filter relevant audit events
audit_events = [
e for e in self.audit_trail
if start_date <= datetime.fromisoformat(e.timestamp) <= end_date
]
# Count data processing activities
data_processing_events = [
e for e in audit_events
if e.event_type in ["UPLOAD", "PROCESS", "DELETE"]
]
# Count access events
access_events = [
e for e in audit_events
if e.event_type in ["VIEW", "DOWNLOAD", "PHI_ACCESS"]
]
report = {
"report_type": "GDPR_COMPLIANCE",
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat()
},
"generated_at": datetime.utcnow().isoformat(),
"data_processing": {
"total_processing_events": len(data_processing_events),
"by_action": self._count_by_field(data_processing_events, "action")
},
"data_access": {
"total_access_events": len(access_events),
"by_user": self._count_by_field(access_events, "user_id")
},
"data_retention": {
"retention_policy_days": 2555, # 7 years for medical records
"current_records": len(self.phi_access_log),
"oldest_record": min(
[log["timestamp"] for log in self.phi_access_log],
default=None
)
},
"user_rights": {
"access_requests": 0, # Would track actual requests
"deletion_requests": 0,
"portability_requests": 0
},
"compliance_status": "COMPLIANT"
}
return report
def generate_quality_metrics_report(
self,
window_days: int = 30
) -> Dict[str, Any]:
"""Generate clinical quality metrics report"""
cutoff = datetime.utcnow() - timedelta(days=window_days)
# Get recent metrics
recent_metrics = {}
for metric_name, metrics_list in self.compliance_metrics.items():
recent = [
m for m in metrics_list
if datetime.fromisoformat(m.timestamp) > cutoff
]
if recent:
latest = recent[-1]
recent_metrics[metric_name] = {
"current_value": latest.value,
"target": latest.target,
"status": latest.status,
"trend": self._calculate_trend(recent)
}
report = {
"report_type": "QUALITY_METRICS",
"period_days": window_days,
"generated_at": datetime.utcnow().isoformat(),
"metrics": recent_metrics,
"overall_compliance_rate": self._calculate_overall_compliance(),
"non_compliant_metrics": [
name for name, data in recent_metrics.items()
if data["status"] == "non_compliant"
]
}
return report
def generate_review_queue_report(
self,
window_days: int = 30
) -> Dict[str, Any]:
"""Generate review queue performance report"""
cutoff = datetime.utcnow() - timedelta(days=window_days)
# Filter review events from audit trail
review_events = [
e for e in self.audit_trail
if e.event_type == "REVIEW" and
datetime.fromisoformat(e.timestamp) > cutoff
]
# Calculate metrics
total_reviews = len(review_events)
reviews_by_user = self._count_by_field(review_events, "user_id")
# Calculate average turnaround time (would need actual data)
avg_turnaround_hours = 24.0 # Placeholder
report = {
"report_type": "REVIEW_QUEUE_PERFORMANCE",
"period_days": window_days,
"generated_at": datetime.utcnow().isoformat(),
"summary": {
"total_reviews": total_reviews,
"average_turnaround_hours": avg_turnaround_hours,
"reviews_by_reviewer": reviews_by_user
},
"performance_metrics": {
"reviews_per_day": total_reviews / window_days,
"target_turnaround_hours": 24.0,
"turnaround_compliance": "COMPLIANT" if avg_turnaround_hours <= 24 else "NON_COMPLIANT"
}
}
return report
def generate_security_incidents_report(
self,
window_days: int = 30
) -> Dict[str, Any]:
"""Generate security incidents report"""
cutoff = datetime.utcnow() - timedelta(days=window_days)
recent_incidents = [
i for i in self.security_incidents
if datetime.fromisoformat(i["timestamp"]) > cutoff
]
by_severity = self._count_by_field(recent_incidents, "severity")
by_type = self._count_by_field(recent_incidents, "incident_type")
unresolved = [i for i in recent_incidents if not i.get("resolved", False)]
report = {
"report_type": "SECURITY_INCIDENTS",
"period_days": window_days,
"generated_at": datetime.utcnow().isoformat(),
"summary": {
"total_incidents": len(recent_incidents),
"unresolved_incidents": len(unresolved),
"by_severity": by_severity,
"by_type": by_type
},
"critical_incidents": [
i for i in recent_incidents
if i["severity"] == "high"
],
"compliance_impact": "CRITICAL" if len(unresolved) > 0 and any(
i["severity"] == "high" for i in unresolved
) else "ACCEPTABLE"
}
return report
def get_compliance_dashboard(self) -> Dict[str, Any]:
"""Get comprehensive compliance dashboard data"""
return {
"timestamp": datetime.utcnow().isoformat(),
"hipaa_status": self._get_hipaa_status(),
"gdpr_status": self._get_gdpr_status(),
"quality_metrics": self._get_quality_status(),
"security_status": self._get_security_status(),
"audit_trail": {
"total_events": len(self.audit_trail),
"phi_accesses": len(self.phi_access_log),
"recent_events": len([
e for e in self.audit_trail
if datetime.fromisoformat(e.timestamp) > datetime.utcnow() - timedelta(hours=24)
])
}
}
def _count_by_field(self, items: List[Any], field: str) -> Dict[str, int]:
"""Count items by a specific field"""
counts = defaultdict(int)
for item in items:
if isinstance(item, dict):
value = item.get(field, "unknown")
else:
value = getattr(item, field, "unknown")
counts[value] += 1
return dict(counts)
def _calculate_trend(self, metrics: List[ComplianceMetric]) -> str:
"""Calculate trend from metrics"""
if len(metrics) < 2:
return "stable"
recent_value = metrics[-1].value
previous_value = metrics[-2].value
change_percent = (recent_value - previous_value) / previous_value if previous_value > 0 else 0
if change_percent > 0.05:
return "improving"
elif change_percent < -0.05:
return "declining"
else:
return "stable"
def _calculate_overall_compliance(self) -> float:
"""Calculate overall compliance rate"""
all_metrics = []
for metrics_list in self.compliance_metrics.values():
if metrics_list:
all_metrics.append(metrics_list[-1])
if not all_metrics:
return 1.0
compliant = sum(1 for m in all_metrics if m.status == "compliant")
return compliant / len(all_metrics)
def _get_hipaa_status(self) -> str:
"""Get HIPAA compliance status"""
if len(self.security_incidents) > 0:
return "REVIEW_REQUIRED"
return "COMPLIANT"
def _get_gdpr_status(self) -> str:
"""Get GDPR compliance status"""
# Check if audit trail is complete
if len(self.audit_trail) == 0:
return "NOT_CONFIGURED"
return "COMPLIANT"
def _get_quality_status(self) -> str:
"""Get quality metrics status"""
compliance_rate = self._calculate_overall_compliance()
if compliance_rate >= 0.95:
return "EXCELLENT"
elif compliance_rate >= 0.85:
return "GOOD"
elif compliance_rate >= 0.75:
return "ACCEPTABLE"
else:
return "NEEDS_IMPROVEMENT"
def _get_security_status(self) -> str:
"""Get security status"""
recent_incidents = [
i for i in self.security_incidents
if datetime.fromisoformat(i["timestamp"]) > datetime.utcnow() - timedelta(days=7)
]
if any(i["severity"] == "high" for i in recent_incidents):
return "CRITICAL"
elif len(recent_incidents) > 0:
return "WARNING"
else:
return "SECURE"
# Global instance
_compliance_system = None
def get_compliance_system() -> ComplianceReportingSystem:
"""Get singleton compliance system instance"""
global _compliance_system
if _compliance_system is None:
_compliance_system = ComplianceReportingSystem()
return _compliance_system