|
|
""" |
|
|
Compliance Reporting System |
|
|
HIPAA/GDPR compliance reporting and audit trail management |
|
|
|
|
|
Features: |
|
|
- HIPAA audit trail reports |
|
|
- GDPR compliance documentation |
|
|
- Clinical quality metrics tracking |
|
|
- Review queue performance analysis |
|
|
- Security incident reporting |
|
|
- Regulatory compliance dashboards |
|
|
|
|
|
Author: MiniMax Agent |
|
|
Date: 2025-10-29 |
|
|
Version: 1.0.0 |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from typing import Dict, List, Any, Optional |
|
|
from datetime import datetime, timedelta |
|
|
from collections import defaultdict |
|
|
from dataclasses import dataclass, asdict |
|
|
from enum import Enum |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class ComplianceStandard(Enum): |
|
|
"""Compliance standards""" |
|
|
HIPAA = "HIPAA" |
|
|
GDPR = "GDPR" |
|
|
FDA = "FDA" |
|
|
ISO13485 = "ISO13485" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AuditEvent: |
|
|
"""Audit trail event""" |
|
|
event_id: str |
|
|
timestamp: str |
|
|
user_id: str |
|
|
event_type: str |
|
|
resource: str |
|
|
action: str |
|
|
ip_address: str |
|
|
success: bool |
|
|
details: Dict[str, Any] |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
return asdict(self) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ComplianceMetric: |
|
|
"""Compliance metric""" |
|
|
metric_name: str |
|
|
value: float |
|
|
target: float |
|
|
status: str |
|
|
timestamp: str |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
return asdict(self) |
|
|
|
|
|
|
|
|
class ComplianceReportingSystem: |
|
|
""" |
|
|
Comprehensive compliance reporting system |
|
|
Generates reports for regulatory audits and quality assurance |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.audit_trail: List[AuditEvent] = [] |
|
|
self.compliance_metrics: Dict[str, List[ComplianceMetric]] = defaultdict(list) |
|
|
self.phi_access_log: List[Dict[str, Any]] = [] |
|
|
self.security_incidents: List[Dict[str, Any]] = [] |
|
|
|
|
|
logger.info("Compliance Reporting System initialized") |
|
|
|
|
|
def log_audit_event( |
|
|
self, |
|
|
user_id: str, |
|
|
event_type: str, |
|
|
resource: str, |
|
|
action: str, |
|
|
ip_address: str, |
|
|
success: bool = True, |
|
|
details: Optional[Dict[str, Any]] = None |
|
|
) -> AuditEvent: |
|
|
"""Log an audit event for compliance tracking""" |
|
|
|
|
|
event = AuditEvent( |
|
|
event_id=f"audit_{len(self.audit_trail)}_{datetime.utcnow().timestamp()}", |
|
|
timestamp=datetime.utcnow().isoformat(), |
|
|
user_id=user_id, |
|
|
event_type=event_type, |
|
|
resource=resource, |
|
|
action=action, |
|
|
ip_address=ip_address, |
|
|
success=success, |
|
|
details=details or {} |
|
|
) |
|
|
|
|
|
self.audit_trail.append(event) |
|
|
|
|
|
return event |
|
|
|
|
|
def log_phi_access( |
|
|
self, |
|
|
user_id: str, |
|
|
document_id: str, |
|
|
action: str, |
|
|
ip_address: str, |
|
|
timestamp: Optional[str] = None |
|
|
): |
|
|
"""Log PHI access (HIPAA requirement)""" |
|
|
|
|
|
access_log = { |
|
|
"timestamp": timestamp or datetime.utcnow().isoformat(), |
|
|
"user_id": user_id, |
|
|
"document_id": document_id, |
|
|
"action": action, |
|
|
"ip_address": ip_address |
|
|
} |
|
|
|
|
|
self.phi_access_log.append(access_log) |
|
|
|
|
|
|
|
|
self.log_audit_event( |
|
|
user_id=user_id, |
|
|
event_type="PHI_ACCESS", |
|
|
resource=f"document:{document_id}", |
|
|
action=action, |
|
|
ip_address=ip_address, |
|
|
details={"document_id": document_id} |
|
|
) |
|
|
|
|
|
def log_security_incident( |
|
|
self, |
|
|
incident_type: str, |
|
|
severity: str, |
|
|
description: str, |
|
|
user_id: Optional[str] = None, |
|
|
ip_address: Optional[str] = None, |
|
|
details: Optional[Dict[str, Any]] = None |
|
|
): |
|
|
"""Log security incident""" |
|
|
|
|
|
incident = { |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"incident_type": incident_type, |
|
|
"severity": severity, |
|
|
"description": description, |
|
|
"user_id": user_id, |
|
|
"ip_address": ip_address, |
|
|
"details": details or {}, |
|
|
"resolved": False |
|
|
} |
|
|
|
|
|
self.security_incidents.append(incident) |
|
|
|
|
|
logger.warning(f"Security incident logged: {incident_type} (severity: {severity})") |
|
|
|
|
|
def record_compliance_metric( |
|
|
self, |
|
|
metric_name: str, |
|
|
value: float, |
|
|
target: float |
|
|
): |
|
|
"""Record a compliance metric""" |
|
|
|
|
|
|
|
|
if value >= target: |
|
|
status = "compliant" |
|
|
elif value >= target * 0.9: |
|
|
status = "warning" |
|
|
else: |
|
|
status = "non_compliant" |
|
|
|
|
|
metric = ComplianceMetric( |
|
|
metric_name=metric_name, |
|
|
value=value, |
|
|
target=target, |
|
|
status=status, |
|
|
timestamp=datetime.utcnow().isoformat() |
|
|
) |
|
|
|
|
|
self.compliance_metrics[metric_name].append(metric) |
|
|
|
|
|
def generate_hipaa_report( |
|
|
self, |
|
|
start_date: Optional[datetime] = None, |
|
|
end_date: Optional[datetime] = None |
|
|
) -> Dict[str, Any]: |
|
|
"""Generate HIPAA compliance report""" |
|
|
|
|
|
if not start_date: |
|
|
start_date = datetime.utcnow() - timedelta(days=30) |
|
|
if not end_date: |
|
|
end_date = datetime.utcnow() |
|
|
|
|
|
|
|
|
phi_accesses = [ |
|
|
log for log in self.phi_access_log |
|
|
if start_date <= datetime.fromisoformat(log["timestamp"]) <= end_date |
|
|
] |
|
|
|
|
|
|
|
|
access_by_user = defaultdict(int) |
|
|
for access in phi_accesses: |
|
|
access_by_user[access["user_id"]] += 1 |
|
|
|
|
|
|
|
|
access_by_action = defaultdict(int) |
|
|
for access in phi_accesses: |
|
|
access_by_action[access["action"]] += 1 |
|
|
|
|
|
report = { |
|
|
"report_type": "HIPAA_COMPLIANCE", |
|
|
"period": { |
|
|
"start": start_date.isoformat(), |
|
|
"end": end_date.isoformat() |
|
|
}, |
|
|
"generated_at": datetime.utcnow().isoformat(), |
|
|
"summary": { |
|
|
"total_phi_accesses": len(phi_accesses), |
|
|
"unique_users": len(access_by_user), |
|
|
"access_by_user": dict(access_by_user), |
|
|
"access_by_action": dict(access_by_action) |
|
|
}, |
|
|
"audit_trail_summary": { |
|
|
"total_events": len([ |
|
|
e for e in self.audit_trail |
|
|
if start_date <= datetime.fromisoformat(e.timestamp) <= end_date |
|
|
]), |
|
|
"phi_access_events": len(phi_accesses) |
|
|
}, |
|
|
"security_incidents": len([ |
|
|
i for i in self.security_incidents |
|
|
if start_date <= datetime.fromisoformat(i["timestamp"]) <= end_date |
|
|
]), |
|
|
"compliance_status": "COMPLIANT" if len(self.security_incidents) == 0 else "REVIEW_REQUIRED" |
|
|
} |
|
|
|
|
|
return report |
|
|
|
|
|
def generate_gdpr_report( |
|
|
self, |
|
|
start_date: Optional[datetime] = None, |
|
|
end_date: Optional[datetime] = None |
|
|
) -> Dict[str, Any]: |
|
|
"""Generate GDPR compliance report""" |
|
|
|
|
|
if not start_date: |
|
|
start_date = datetime.utcnow() - timedelta(days=30) |
|
|
if not end_date: |
|
|
end_date = datetime.utcnow() |
|
|
|
|
|
|
|
|
audit_events = [ |
|
|
e for e in self.audit_trail |
|
|
if start_date <= datetime.fromisoformat(e.timestamp) <= end_date |
|
|
] |
|
|
|
|
|
|
|
|
data_processing_events = [ |
|
|
e for e in audit_events |
|
|
if e.event_type in ["UPLOAD", "PROCESS", "DELETE"] |
|
|
] |
|
|
|
|
|
|
|
|
access_events = [ |
|
|
e for e in audit_events |
|
|
if e.event_type in ["VIEW", "DOWNLOAD", "PHI_ACCESS"] |
|
|
] |
|
|
|
|
|
report = { |
|
|
"report_type": "GDPR_COMPLIANCE", |
|
|
"period": { |
|
|
"start": start_date.isoformat(), |
|
|
"end": end_date.isoformat() |
|
|
}, |
|
|
"generated_at": datetime.utcnow().isoformat(), |
|
|
"data_processing": { |
|
|
"total_processing_events": len(data_processing_events), |
|
|
"by_action": self._count_by_field(data_processing_events, "action") |
|
|
}, |
|
|
"data_access": { |
|
|
"total_access_events": len(access_events), |
|
|
"by_user": self._count_by_field(access_events, "user_id") |
|
|
}, |
|
|
"data_retention": { |
|
|
"retention_policy_days": 2555, |
|
|
"current_records": len(self.phi_access_log), |
|
|
"oldest_record": min( |
|
|
[log["timestamp"] for log in self.phi_access_log], |
|
|
default=None |
|
|
) |
|
|
}, |
|
|
"user_rights": { |
|
|
"access_requests": 0, |
|
|
"deletion_requests": 0, |
|
|
"portability_requests": 0 |
|
|
}, |
|
|
"compliance_status": "COMPLIANT" |
|
|
} |
|
|
|
|
|
return report |
|
|
|
|
|
def generate_quality_metrics_report( |
|
|
self, |
|
|
window_days: int = 30 |
|
|
) -> Dict[str, Any]: |
|
|
"""Generate clinical quality metrics report""" |
|
|
|
|
|
cutoff = datetime.utcnow() - timedelta(days=window_days) |
|
|
|
|
|
|
|
|
recent_metrics = {} |
|
|
for metric_name, metrics_list in self.compliance_metrics.items(): |
|
|
recent = [ |
|
|
m for m in metrics_list |
|
|
if datetime.fromisoformat(m.timestamp) > cutoff |
|
|
] |
|
|
|
|
|
if recent: |
|
|
latest = recent[-1] |
|
|
recent_metrics[metric_name] = { |
|
|
"current_value": latest.value, |
|
|
"target": latest.target, |
|
|
"status": latest.status, |
|
|
"trend": self._calculate_trend(recent) |
|
|
} |
|
|
|
|
|
report = { |
|
|
"report_type": "QUALITY_METRICS", |
|
|
"period_days": window_days, |
|
|
"generated_at": datetime.utcnow().isoformat(), |
|
|
"metrics": recent_metrics, |
|
|
"overall_compliance_rate": self._calculate_overall_compliance(), |
|
|
"non_compliant_metrics": [ |
|
|
name for name, data in recent_metrics.items() |
|
|
if data["status"] == "non_compliant" |
|
|
] |
|
|
} |
|
|
|
|
|
return report |
|
|
|
|
|
def generate_review_queue_report( |
|
|
self, |
|
|
window_days: int = 30 |
|
|
) -> Dict[str, Any]: |
|
|
"""Generate review queue performance report""" |
|
|
|
|
|
cutoff = datetime.utcnow() - timedelta(days=window_days) |
|
|
|
|
|
|
|
|
review_events = [ |
|
|
e for e in self.audit_trail |
|
|
if e.event_type == "REVIEW" and |
|
|
datetime.fromisoformat(e.timestamp) > cutoff |
|
|
] |
|
|
|
|
|
|
|
|
total_reviews = len(review_events) |
|
|
reviews_by_user = self._count_by_field(review_events, "user_id") |
|
|
|
|
|
|
|
|
avg_turnaround_hours = 24.0 |
|
|
|
|
|
report = { |
|
|
"report_type": "REVIEW_QUEUE_PERFORMANCE", |
|
|
"period_days": window_days, |
|
|
"generated_at": datetime.utcnow().isoformat(), |
|
|
"summary": { |
|
|
"total_reviews": total_reviews, |
|
|
"average_turnaround_hours": avg_turnaround_hours, |
|
|
"reviews_by_reviewer": reviews_by_user |
|
|
}, |
|
|
"performance_metrics": { |
|
|
"reviews_per_day": total_reviews / window_days, |
|
|
"target_turnaround_hours": 24.0, |
|
|
"turnaround_compliance": "COMPLIANT" if avg_turnaround_hours <= 24 else "NON_COMPLIANT" |
|
|
} |
|
|
} |
|
|
|
|
|
return report |
|
|
|
|
|
def generate_security_incidents_report( |
|
|
self, |
|
|
window_days: int = 30 |
|
|
) -> Dict[str, Any]: |
|
|
"""Generate security incidents report""" |
|
|
|
|
|
cutoff = datetime.utcnow() - timedelta(days=window_days) |
|
|
|
|
|
recent_incidents = [ |
|
|
i for i in self.security_incidents |
|
|
if datetime.fromisoformat(i["timestamp"]) > cutoff |
|
|
] |
|
|
|
|
|
by_severity = self._count_by_field(recent_incidents, "severity") |
|
|
by_type = self._count_by_field(recent_incidents, "incident_type") |
|
|
|
|
|
unresolved = [i for i in recent_incidents if not i.get("resolved", False)] |
|
|
|
|
|
report = { |
|
|
"report_type": "SECURITY_INCIDENTS", |
|
|
"period_days": window_days, |
|
|
"generated_at": datetime.utcnow().isoformat(), |
|
|
"summary": { |
|
|
"total_incidents": len(recent_incidents), |
|
|
"unresolved_incidents": len(unresolved), |
|
|
"by_severity": by_severity, |
|
|
"by_type": by_type |
|
|
}, |
|
|
"critical_incidents": [ |
|
|
i for i in recent_incidents |
|
|
if i["severity"] == "high" |
|
|
], |
|
|
"compliance_impact": "CRITICAL" if len(unresolved) > 0 and any( |
|
|
i["severity"] == "high" for i in unresolved |
|
|
) else "ACCEPTABLE" |
|
|
} |
|
|
|
|
|
return report |
|
|
|
|
|
def get_compliance_dashboard(self) -> Dict[str, Any]: |
|
|
"""Get comprehensive compliance dashboard data""" |
|
|
|
|
|
return { |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"hipaa_status": self._get_hipaa_status(), |
|
|
"gdpr_status": self._get_gdpr_status(), |
|
|
"quality_metrics": self._get_quality_status(), |
|
|
"security_status": self._get_security_status(), |
|
|
"audit_trail": { |
|
|
"total_events": len(self.audit_trail), |
|
|
"phi_accesses": len(self.phi_access_log), |
|
|
"recent_events": len([ |
|
|
e for e in self.audit_trail |
|
|
if datetime.fromisoformat(e.timestamp) > datetime.utcnow() - timedelta(hours=24) |
|
|
]) |
|
|
} |
|
|
} |
|
|
|
|
|
def _count_by_field(self, items: List[Any], field: str) -> Dict[str, int]: |
|
|
"""Count items by a specific field""" |
|
|
counts = defaultdict(int) |
|
|
for item in items: |
|
|
if isinstance(item, dict): |
|
|
value = item.get(field, "unknown") |
|
|
else: |
|
|
value = getattr(item, field, "unknown") |
|
|
counts[value] += 1 |
|
|
return dict(counts) |
|
|
|
|
|
def _calculate_trend(self, metrics: List[ComplianceMetric]) -> str: |
|
|
"""Calculate trend from metrics""" |
|
|
if len(metrics) < 2: |
|
|
return "stable" |
|
|
|
|
|
recent_value = metrics[-1].value |
|
|
previous_value = metrics[-2].value |
|
|
|
|
|
change_percent = (recent_value - previous_value) / previous_value if previous_value > 0 else 0 |
|
|
|
|
|
if change_percent > 0.05: |
|
|
return "improving" |
|
|
elif change_percent < -0.05: |
|
|
return "declining" |
|
|
else: |
|
|
return "stable" |
|
|
|
|
|
def _calculate_overall_compliance(self) -> float: |
|
|
"""Calculate overall compliance rate""" |
|
|
all_metrics = [] |
|
|
for metrics_list in self.compliance_metrics.values(): |
|
|
if metrics_list: |
|
|
all_metrics.append(metrics_list[-1]) |
|
|
|
|
|
if not all_metrics: |
|
|
return 1.0 |
|
|
|
|
|
compliant = sum(1 for m in all_metrics if m.status == "compliant") |
|
|
return compliant / len(all_metrics) |
|
|
|
|
|
def _get_hipaa_status(self) -> str: |
|
|
"""Get HIPAA compliance status""" |
|
|
if len(self.security_incidents) > 0: |
|
|
return "REVIEW_REQUIRED" |
|
|
return "COMPLIANT" |
|
|
|
|
|
def _get_gdpr_status(self) -> str: |
|
|
"""Get GDPR compliance status""" |
|
|
|
|
|
if len(self.audit_trail) == 0: |
|
|
return "NOT_CONFIGURED" |
|
|
return "COMPLIANT" |
|
|
|
|
|
def _get_quality_status(self) -> str: |
|
|
"""Get quality metrics status""" |
|
|
compliance_rate = self._calculate_overall_compliance() |
|
|
|
|
|
if compliance_rate >= 0.95: |
|
|
return "EXCELLENT" |
|
|
elif compliance_rate >= 0.85: |
|
|
return "GOOD" |
|
|
elif compliance_rate >= 0.75: |
|
|
return "ACCEPTABLE" |
|
|
else: |
|
|
return "NEEDS_IMPROVEMENT" |
|
|
|
|
|
def _get_security_status(self) -> str: |
|
|
"""Get security status""" |
|
|
recent_incidents = [ |
|
|
i for i in self.security_incidents |
|
|
if datetime.fromisoformat(i["timestamp"]) > datetime.utcnow() - timedelta(days=7) |
|
|
] |
|
|
|
|
|
if any(i["severity"] == "high" for i in recent_incidents): |
|
|
return "CRITICAL" |
|
|
elif len(recent_incidents) > 0: |
|
|
return "WARNING" |
|
|
else: |
|
|
return "SECURE" |
|
|
|
|
|
|
|
|
|
|
|
_compliance_system = None |
|
|
|
|
|
|
|
|
def get_compliance_system() -> ComplianceReportingSystem: |
|
|
"""Get singleton compliance system instance""" |
|
|
global _compliance_system |
|
|
if _compliance_system is None: |
|
|
_compliance_system = ComplianceReportingSystem() |
|
|
return _compliance_system |
|
|
|