snikhilesh commited on
Commit
671569c
·
verified ·
1 Parent(s): 07eb542

Deploy compliance_reporting.py to backend/ directory

Browse files
Files changed (1) hide show
  1. backend/compliance_reporting.py +538 -0
backend/compliance_reporting.py ADDED
@@ -0,0 +1,538 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Compliance Reporting System
3
+ HIPAA/GDPR compliance reporting and audit trail management
4
+
5
+ Features:
6
+ - HIPAA audit trail reports
7
+ - GDPR compliance documentation
8
+ - Clinical quality metrics tracking
9
+ - Review queue performance analysis
10
+ - Security incident reporting
11
+ - Regulatory compliance dashboards
12
+
13
+ Author: MiniMax Agent
14
+ Date: 2025-10-29
15
+ Version: 1.0.0
16
+ """
17
+
18
+ import logging
19
+ from typing import Dict, List, Any, Optional
20
+ from datetime import datetime, timedelta
21
+ from collections import defaultdict
22
+ from dataclasses import dataclass, asdict
23
+ from enum import Enum
24
+
25
+ logger = logging.getLogger(__name__)
26
+
27
+
28
+ class ComplianceStandard(Enum):
29
+ """Compliance standards"""
30
+ HIPAA = "HIPAA"
31
+ GDPR = "GDPR"
32
+ FDA = "FDA"
33
+ ISO13485 = "ISO13485"
34
+
35
+
36
+ @dataclass
37
+ class AuditEvent:
38
+ """Audit trail event"""
39
+ event_id: str
40
+ timestamp: str
41
+ user_id: str
42
+ event_type: str
43
+ resource: str
44
+ action: str
45
+ ip_address: str
46
+ success: bool
47
+ details: Dict[str, Any]
48
+
49
+ def to_dict(self) -> Dict[str, Any]:
50
+ return asdict(self)
51
+
52
+
53
+ @dataclass
54
+ class ComplianceMetric:
55
+ """Compliance metric"""
56
+ metric_name: str
57
+ value: float
58
+ target: float
59
+ status: str # "compliant", "warning", "non_compliant"
60
+ timestamp: str
61
+
62
+ def to_dict(self) -> Dict[str, Any]:
63
+ return asdict(self)
64
+
65
+
66
+ class ComplianceReportingSystem:
67
+ """
68
+ Comprehensive compliance reporting system
69
+ Generates reports for regulatory audits and quality assurance
70
+ """
71
+
72
+ def __init__(self):
73
+ self.audit_trail: List[AuditEvent] = []
74
+ self.compliance_metrics: Dict[str, List[ComplianceMetric]] = defaultdict(list)
75
+ self.phi_access_log: List[Dict[str, Any]] = []
76
+ self.security_incidents: List[Dict[str, Any]] = []
77
+
78
+ logger.info("Compliance Reporting System initialized")
79
+
80
+ def log_audit_event(
81
+ self,
82
+ user_id: str,
83
+ event_type: str,
84
+ resource: str,
85
+ action: str,
86
+ ip_address: str,
87
+ success: bool = True,
88
+ details: Optional[Dict[str, Any]] = None
89
+ ) -> AuditEvent:
90
+ """Log an audit event for compliance tracking"""
91
+
92
+ event = AuditEvent(
93
+ event_id=f"audit_{len(self.audit_trail)}_{datetime.utcnow().timestamp()}",
94
+ timestamp=datetime.utcnow().isoformat(),
95
+ user_id=user_id,
96
+ event_type=event_type,
97
+ resource=resource,
98
+ action=action,
99
+ ip_address=ip_address,
100
+ success=success,
101
+ details=details or {}
102
+ )
103
+
104
+ self.audit_trail.append(event)
105
+
106
+ return event
107
+
108
+ def log_phi_access(
109
+ self,
110
+ user_id: str,
111
+ document_id: str,
112
+ action: str,
113
+ ip_address: str,
114
+ timestamp: Optional[str] = None
115
+ ):
116
+ """Log PHI access (HIPAA requirement)"""
117
+
118
+ access_log = {
119
+ "timestamp": timestamp or datetime.utcnow().isoformat(),
120
+ "user_id": user_id,
121
+ "document_id": document_id,
122
+ "action": action,
123
+ "ip_address": ip_address
124
+ }
125
+
126
+ self.phi_access_log.append(access_log)
127
+
128
+ # Also log as audit event
129
+ self.log_audit_event(
130
+ user_id=user_id,
131
+ event_type="PHI_ACCESS",
132
+ resource=f"document:{document_id}",
133
+ action=action,
134
+ ip_address=ip_address,
135
+ details={"document_id": document_id}
136
+ )
137
+
138
+ def log_security_incident(
139
+ self,
140
+ incident_type: str,
141
+ severity: str,
142
+ description: str,
143
+ user_id: Optional[str] = None,
144
+ ip_address: Optional[str] = None,
145
+ details: Optional[Dict[str, Any]] = None
146
+ ):
147
+ """Log security incident"""
148
+
149
+ incident = {
150
+ "timestamp": datetime.utcnow().isoformat(),
151
+ "incident_type": incident_type,
152
+ "severity": severity,
153
+ "description": description,
154
+ "user_id": user_id,
155
+ "ip_address": ip_address,
156
+ "details": details or {},
157
+ "resolved": False
158
+ }
159
+
160
+ self.security_incidents.append(incident)
161
+
162
+ logger.warning(f"Security incident logged: {incident_type} (severity: {severity})")
163
+
164
+ def record_compliance_metric(
165
+ self,
166
+ metric_name: str,
167
+ value: float,
168
+ target: float
169
+ ):
170
+ """Record a compliance metric"""
171
+
172
+ # Determine status
173
+ if value >= target:
174
+ status = "compliant"
175
+ elif value >= target * 0.9: # Within 10% of target
176
+ status = "warning"
177
+ else:
178
+ status = "non_compliant"
179
+
180
+ metric = ComplianceMetric(
181
+ metric_name=metric_name,
182
+ value=value,
183
+ target=target,
184
+ status=status,
185
+ timestamp=datetime.utcnow().isoformat()
186
+ )
187
+
188
+ self.compliance_metrics[metric_name].append(metric)
189
+
190
+ def generate_hipaa_report(
191
+ self,
192
+ start_date: Optional[datetime] = None,
193
+ end_date: Optional[datetime] = None
194
+ ) -> Dict[str, Any]:
195
+ """Generate HIPAA compliance report"""
196
+
197
+ if not start_date:
198
+ start_date = datetime.utcnow() - timedelta(days=30)
199
+ if not end_date:
200
+ end_date = datetime.utcnow()
201
+
202
+ # Filter PHI access logs
203
+ phi_accesses = [
204
+ log for log in self.phi_access_log
205
+ if start_date <= datetime.fromisoformat(log["timestamp"]) <= end_date
206
+ ]
207
+
208
+ # Aggregate by user
209
+ access_by_user = defaultdict(int)
210
+ for access in phi_accesses:
211
+ access_by_user[access["user_id"]] += 1
212
+
213
+ # Aggregate by action
214
+ access_by_action = defaultdict(int)
215
+ for access in phi_accesses:
216
+ access_by_action[access["action"]] += 1
217
+
218
+ report = {
219
+ "report_type": "HIPAA_COMPLIANCE",
220
+ "period": {
221
+ "start": start_date.isoformat(),
222
+ "end": end_date.isoformat()
223
+ },
224
+ "generated_at": datetime.utcnow().isoformat(),
225
+ "summary": {
226
+ "total_phi_accesses": len(phi_accesses),
227
+ "unique_users": len(access_by_user),
228
+ "access_by_user": dict(access_by_user),
229
+ "access_by_action": dict(access_by_action)
230
+ },
231
+ "audit_trail_summary": {
232
+ "total_events": len([
233
+ e for e in self.audit_trail
234
+ if start_date <= datetime.fromisoformat(e.timestamp) <= end_date
235
+ ]),
236
+ "phi_access_events": len(phi_accesses)
237
+ },
238
+ "security_incidents": len([
239
+ i for i in self.security_incidents
240
+ if start_date <= datetime.fromisoformat(i["timestamp"]) <= end_date
241
+ ]),
242
+ "compliance_status": "COMPLIANT" if len(self.security_incidents) == 0 else "REVIEW_REQUIRED"
243
+ }
244
+
245
+ return report
246
+
247
+ def generate_gdpr_report(
248
+ self,
249
+ start_date: Optional[datetime] = None,
250
+ end_date: Optional[datetime] = None
251
+ ) -> Dict[str, Any]:
252
+ """Generate GDPR compliance report"""
253
+
254
+ if not start_date:
255
+ start_date = datetime.utcnow() - timedelta(days=30)
256
+ if not end_date:
257
+ end_date = datetime.utcnow()
258
+
259
+ # Filter relevant audit events
260
+ audit_events = [
261
+ e for e in self.audit_trail
262
+ if start_date <= datetime.fromisoformat(e.timestamp) <= end_date
263
+ ]
264
+
265
+ # Count data processing activities
266
+ data_processing_events = [
267
+ e for e in audit_events
268
+ if e.event_type in ["UPLOAD", "PROCESS", "DELETE"]
269
+ ]
270
+
271
+ # Count access events
272
+ access_events = [
273
+ e for e in audit_events
274
+ if e.event_type in ["VIEW", "DOWNLOAD", "PHI_ACCESS"]
275
+ ]
276
+
277
+ report = {
278
+ "report_type": "GDPR_COMPLIANCE",
279
+ "period": {
280
+ "start": start_date.isoformat(),
281
+ "end": end_date.isoformat()
282
+ },
283
+ "generated_at": datetime.utcnow().isoformat(),
284
+ "data_processing": {
285
+ "total_processing_events": len(data_processing_events),
286
+ "by_action": self._count_by_field(data_processing_events, "action")
287
+ },
288
+ "data_access": {
289
+ "total_access_events": len(access_events),
290
+ "by_user": self._count_by_field(access_events, "user_id")
291
+ },
292
+ "data_retention": {
293
+ "retention_policy_days": 2555, # 7 years for medical records
294
+ "current_records": len(self.phi_access_log),
295
+ "oldest_record": min(
296
+ [log["timestamp"] for log in self.phi_access_log],
297
+ default=None
298
+ )
299
+ },
300
+ "user_rights": {
301
+ "access_requests": 0, # Would track actual requests
302
+ "deletion_requests": 0,
303
+ "portability_requests": 0
304
+ },
305
+ "compliance_status": "COMPLIANT"
306
+ }
307
+
308
+ return report
309
+
310
+ def generate_quality_metrics_report(
311
+ self,
312
+ window_days: int = 30
313
+ ) -> Dict[str, Any]:
314
+ """Generate clinical quality metrics report"""
315
+
316
+ cutoff = datetime.utcnow() - timedelta(days=window_days)
317
+
318
+ # Get recent metrics
319
+ recent_metrics = {}
320
+ for metric_name, metrics_list in self.compliance_metrics.items():
321
+ recent = [
322
+ m for m in metrics_list
323
+ if datetime.fromisoformat(m.timestamp) > cutoff
324
+ ]
325
+
326
+ if recent:
327
+ latest = recent[-1]
328
+ recent_metrics[metric_name] = {
329
+ "current_value": latest.value,
330
+ "target": latest.target,
331
+ "status": latest.status,
332
+ "trend": self._calculate_trend(recent)
333
+ }
334
+
335
+ report = {
336
+ "report_type": "QUALITY_METRICS",
337
+ "period_days": window_days,
338
+ "generated_at": datetime.utcnow().isoformat(),
339
+ "metrics": recent_metrics,
340
+ "overall_compliance_rate": self._calculate_overall_compliance(),
341
+ "non_compliant_metrics": [
342
+ name for name, data in recent_metrics.items()
343
+ if data["status"] == "non_compliant"
344
+ ]
345
+ }
346
+
347
+ return report
348
+
349
+ def generate_review_queue_report(
350
+ self,
351
+ window_days: int = 30
352
+ ) -> Dict[str, Any]:
353
+ """Generate review queue performance report"""
354
+
355
+ cutoff = datetime.utcnow() - timedelta(days=window_days)
356
+
357
+ # Filter review events from audit trail
358
+ review_events = [
359
+ e for e in self.audit_trail
360
+ if e.event_type == "REVIEW" and
361
+ datetime.fromisoformat(e.timestamp) > cutoff
362
+ ]
363
+
364
+ # Calculate metrics
365
+ total_reviews = len(review_events)
366
+ reviews_by_user = self._count_by_field(review_events, "user_id")
367
+
368
+ # Calculate average turnaround time (would need actual data)
369
+ avg_turnaround_hours = 24.0 # Placeholder
370
+
371
+ report = {
372
+ "report_type": "REVIEW_QUEUE_PERFORMANCE",
373
+ "period_days": window_days,
374
+ "generated_at": datetime.utcnow().isoformat(),
375
+ "summary": {
376
+ "total_reviews": total_reviews,
377
+ "average_turnaround_hours": avg_turnaround_hours,
378
+ "reviews_by_reviewer": reviews_by_user
379
+ },
380
+ "performance_metrics": {
381
+ "reviews_per_day": total_reviews / window_days,
382
+ "target_turnaround_hours": 24.0,
383
+ "turnaround_compliance": "COMPLIANT" if avg_turnaround_hours <= 24 else "NON_COMPLIANT"
384
+ }
385
+ }
386
+
387
+ return report
388
+
389
+ def generate_security_incidents_report(
390
+ self,
391
+ window_days: int = 30
392
+ ) -> Dict[str, Any]:
393
+ """Generate security incidents report"""
394
+
395
+ cutoff = datetime.utcnow() - timedelta(days=window_days)
396
+
397
+ recent_incidents = [
398
+ i for i in self.security_incidents
399
+ if datetime.fromisoformat(i["timestamp"]) > cutoff
400
+ ]
401
+
402
+ by_severity = self._count_by_field(recent_incidents, "severity")
403
+ by_type = self._count_by_field(recent_incidents, "incident_type")
404
+
405
+ unresolved = [i for i in recent_incidents if not i.get("resolved", False)]
406
+
407
+ report = {
408
+ "report_type": "SECURITY_INCIDENTS",
409
+ "period_days": window_days,
410
+ "generated_at": datetime.utcnow().isoformat(),
411
+ "summary": {
412
+ "total_incidents": len(recent_incidents),
413
+ "unresolved_incidents": len(unresolved),
414
+ "by_severity": by_severity,
415
+ "by_type": by_type
416
+ },
417
+ "critical_incidents": [
418
+ i for i in recent_incidents
419
+ if i["severity"] == "high"
420
+ ],
421
+ "compliance_impact": "CRITICAL" if len(unresolved) > 0 and any(
422
+ i["severity"] == "high" for i in unresolved
423
+ ) else "ACCEPTABLE"
424
+ }
425
+
426
+ return report
427
+
428
+ def get_compliance_dashboard(self) -> Dict[str, Any]:
429
+ """Get comprehensive compliance dashboard data"""
430
+
431
+ return {
432
+ "timestamp": datetime.utcnow().isoformat(),
433
+ "hipaa_status": self._get_hipaa_status(),
434
+ "gdpr_status": self._get_gdpr_status(),
435
+ "quality_metrics": self._get_quality_status(),
436
+ "security_status": self._get_security_status(),
437
+ "audit_trail": {
438
+ "total_events": len(self.audit_trail),
439
+ "phi_accesses": len(self.phi_access_log),
440
+ "recent_events": len([
441
+ e for e in self.audit_trail
442
+ if datetime.fromisoformat(e.timestamp) > datetime.utcnow() - timedelta(hours=24)
443
+ ])
444
+ }
445
+ }
446
+
447
+ def _count_by_field(self, items: List[Any], field: str) -> Dict[str, int]:
448
+ """Count items by a specific field"""
449
+ counts = defaultdict(int)
450
+ for item in items:
451
+ if isinstance(item, dict):
452
+ value = item.get(field, "unknown")
453
+ else:
454
+ value = getattr(item, field, "unknown")
455
+ counts[value] += 1
456
+ return dict(counts)
457
+
458
+ def _calculate_trend(self, metrics: List[ComplianceMetric]) -> str:
459
+ """Calculate trend from metrics"""
460
+ if len(metrics) < 2:
461
+ return "stable"
462
+
463
+ recent_value = metrics[-1].value
464
+ previous_value = metrics[-2].value
465
+
466
+ change_percent = (recent_value - previous_value) / previous_value if previous_value > 0 else 0
467
+
468
+ if change_percent > 0.05:
469
+ return "improving"
470
+ elif change_percent < -0.05:
471
+ return "declining"
472
+ else:
473
+ return "stable"
474
+
475
+ def _calculate_overall_compliance(self) -> float:
476
+ """Calculate overall compliance rate"""
477
+ all_metrics = []
478
+ for metrics_list in self.compliance_metrics.values():
479
+ if metrics_list:
480
+ all_metrics.append(metrics_list[-1])
481
+
482
+ if not all_metrics:
483
+ return 1.0
484
+
485
+ compliant = sum(1 for m in all_metrics if m.status == "compliant")
486
+ return compliant / len(all_metrics)
487
+
488
+ def _get_hipaa_status(self) -> str:
489
+ """Get HIPAA compliance status"""
490
+ if len(self.security_incidents) > 0:
491
+ return "REVIEW_REQUIRED"
492
+ return "COMPLIANT"
493
+
494
+ def _get_gdpr_status(self) -> str:
495
+ """Get GDPR compliance status"""
496
+ # Check if audit trail is complete
497
+ if len(self.audit_trail) == 0:
498
+ return "NOT_CONFIGURED"
499
+ return "COMPLIANT"
500
+
501
+ def _get_quality_status(self) -> str:
502
+ """Get quality metrics status"""
503
+ compliance_rate = self._calculate_overall_compliance()
504
+
505
+ if compliance_rate >= 0.95:
506
+ return "EXCELLENT"
507
+ elif compliance_rate >= 0.85:
508
+ return "GOOD"
509
+ elif compliance_rate >= 0.75:
510
+ return "ACCEPTABLE"
511
+ else:
512
+ return "NEEDS_IMPROVEMENT"
513
+
514
+ def _get_security_status(self) -> str:
515
+ """Get security status"""
516
+ recent_incidents = [
517
+ i for i in self.security_incidents
518
+ if datetime.fromisoformat(i["timestamp"]) > datetime.utcnow() - timedelta(days=7)
519
+ ]
520
+
521
+ if any(i["severity"] == "high" for i in recent_incidents):
522
+ return "CRITICAL"
523
+ elif len(recent_incidents) > 0:
524
+ return "WARNING"
525
+ else:
526
+ return "SECURE"
527
+
528
+
529
+ # Global instance
530
+ _compliance_system = None
531
+
532
+
533
+ def get_compliance_system() -> ComplianceReportingSystem:
534
+ """Get singleton compliance system instance"""
535
+ global _compliance_system
536
+ if _compliance_system is None:
537
+ _compliance_system = ComplianceReportingSystem()
538
+ return _compliance_system