snikhilesh commited on
Commit
da8d026
·
verified ·
1 Parent(s): 671569c

Deploy confidence_gating_system.py to backend/ directory

Browse files
Files changed (1) hide show
  1. backend/confidence_gating_system.py +621 -0
backend/confidence_gating_system.py ADDED
@@ -0,0 +1,621 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Confidence Gating and Validation System - Phase 4
3
+ Implements composite confidence scoring, thresholds, and human review queue management.
4
+
5
+ This module builds on the preprocessing pipeline and model routing to provide intelligent
6
+ confidence-based gating, validation workflows, and review queue management for medical AI.
7
+
8
+ Author: MiniMax Agent
9
+ Date: 2025-10-29
10
+ Version: 1.0.0
11
+ """
12
+
13
+ import os
14
+ import logging
15
+ import asyncio
16
+ import time
17
+ import json
18
+ import hashlib
19
+ from typing import Dict, List, Optional, Any, Tuple, Union
20
+ from dataclasses import dataclass, asdict
21
+ from datetime import datetime, timedelta
22
+ from enum import Enum
23
+ from pathlib import Path
24
+
25
+ # Import existing components
26
+ from medical_schemas import ConfidenceScore, ValidationResult, MedicalDocumentMetadata
27
+ from specialized_model_router import SpecializedModelRouter, ModelInferenceResult
28
+ from preprocessing_pipeline import PreprocessingPipeline, ProcessingResult
29
+
30
+ logger = logging.getLogger(__name__)
31
+
32
+
33
+ class ReviewPriority(Enum):
34
+ """Priority levels for human review"""
35
+ CRITICAL = "critical" # <0.60 confidence - immediate manual review required
36
+ HIGH = "high" # 0.60-0.75 confidence - review recommended within 1 hour
37
+ MEDIUM = "medium" # 0.75-0.85 confidence - review recommended within 4 hours
38
+ LOW = "low" # 0.85-0.95 confidence - optional review for quality assurance
39
+ NONE = "none" # ≥0.95 confidence - auto-approve, audit only
40
+
41
+
42
+ class ValidationDecision(Enum):
43
+ """Final validation decisions"""
44
+ AUTO_APPROVE = "auto_approve" # ≥0.85 confidence - automatically approved
45
+ REVIEW_RECOMMENDED = "review_recommended" # 0.60-0.85 confidence - human review recommended
46
+ MANUAL_REQUIRED = "manual_required" # <0.60 confidence - manual review required
47
+ BLOCKED = "blocked" # Critical errors - processing blocked
48
+
49
+
50
+ @dataclass
51
+ class ReviewQueueItem:
52
+ """Item in the human review queue"""
53
+ item_id: str
54
+ document_id: str
55
+ priority: ReviewPriority
56
+ confidence_score: ConfidenceScore
57
+ processing_result: ProcessingResult
58
+ model_inference: ModelInferenceResult
59
+ review_decision: ValidationDecision
60
+ created_timestamp: datetime
61
+ review_deadline: datetime
62
+ assigned_reviewer: Optional[str] = None
63
+ review_notes: Optional[str] = None
64
+ reviewer_decision: Optional[str] = None
65
+ reviewed_timestamp: Optional[datetime] = None
66
+ escalated: bool = False
67
+
68
+
69
+ @dataclass
70
+ class AuditLogEntry:
71
+ """Audit log entry for compliance tracking"""
72
+ log_id: str
73
+ document_id: str
74
+ event_type: str # "confidence_gating", "manual_review", "auto_approval", "escalation"
75
+ timestamp: datetime
76
+ user_id: Optional[str]
77
+ confidence_scores: Dict[str, float]
78
+ decision: str
79
+ reasoning: str
80
+ metadata: Dict[str, Any]
81
+
82
+
83
+ class ConfidenceGatingSystem:
84
+ """Main confidence gating and validation system"""
85
+
86
+ def __init__(self,
87
+ preprocessing_pipeline: Optional[PreprocessingPipeline] = None,
88
+ model_router: Optional[SpecializedModelRouter] = None,
89
+ review_queue_path: str = "/tmp/review_queue",
90
+ audit_log_path: str = "/tmp/audit_logs"):
91
+ """Initialize confidence gating system"""
92
+
93
+ self.preprocessing_pipeline = preprocessing_pipeline or PreprocessingPipeline()
94
+ self.model_router = model_router or SpecializedModelRouter()
95
+
96
+ # Queue and logging setup
97
+ self.review_queue_path = Path(review_queue_path)
98
+ self.audit_log_path = Path(audit_log_path)
99
+ self.review_queue_path.mkdir(exist_ok=True)
100
+ self.audit_log_path.mkdir(exist_ok=True)
101
+
102
+ # Review queue storage
103
+ self.review_queue: Dict[str, ReviewQueueItem] = {}
104
+ self.load_review_queue()
105
+
106
+ # Confidence thresholds
107
+ self.confidence_thresholds = {
108
+ "auto_approve": 0.85,
109
+ "review_recommended": 0.60,
110
+ "manual_required": 0.0
111
+ }
112
+
113
+ # Review deadlines by priority
114
+ self.review_deadlines = {
115
+ ReviewPriority.CRITICAL: timedelta(minutes=30),
116
+ ReviewPriority.HIGH: timedelta(hours=1),
117
+ ReviewPriority.MEDIUM: timedelta(hours=4),
118
+ ReviewPriority.LOW: timedelta(hours=24),
119
+ ReviewPriority.NONE: timedelta(days=7) # Audit only
120
+ }
121
+
122
+ # Statistics tracking
123
+ self.stats = {
124
+ "total_processed": 0,
125
+ "auto_approved": 0,
126
+ "review_recommended": 0,
127
+ "manual_required": 0,
128
+ "blocked": 0,
129
+ "average_confidence": 0.0,
130
+ "processing_times": [],
131
+ "reviewer_performance": {}
132
+ }
133
+
134
+ logger.info("Confidence Gating System initialized")
135
+
136
+ async def process_document(self, file_path: Path, user_id: Optional[str] = None) -> Dict[str, Any]:
137
+ """Main document processing with confidence gating"""
138
+ start_time = time.time()
139
+ document_id = self._generate_document_id(file_path)
140
+
141
+ try:
142
+ logger.info(f"Processing document {document_id}: {file_path.name}")
143
+
144
+ # Stage 1: Preprocessing pipeline
145
+ preprocessing_result = await self.preprocessing_pipeline.process_file(file_path)
146
+ if not preprocessing_result:
147
+ return self._create_error_response(document_id, "Preprocessing failed")
148
+
149
+ # Stage 2: Model inference
150
+ model_result = await self.model_router.route_and_infer(preprocessing_result)
151
+ if not model_result:
152
+ return self._create_error_response(document_id, "Model inference failed")
153
+
154
+ # Stage 3: Composite confidence calculation
155
+ composite_confidence = self._calculate_composite_confidence(
156
+ preprocessing_result, model_result
157
+ )
158
+
159
+ # Stage 4: Confidence gating decision
160
+ validation_decision = self._make_validation_decision(composite_confidence)
161
+
162
+ # Stage 5: Handle based on decision
163
+ if validation_decision == ValidationDecision.AUTO_APPROVE:
164
+ response = await self._handle_auto_approval(
165
+ document_id, preprocessing_result, model_result, composite_confidence, user_id
166
+ )
167
+ elif validation_decision in [ValidationDecision.REVIEW_RECOMMENDED, ValidationDecision.MANUAL_REQUIRED]:
168
+ response = await self._handle_review_required(
169
+ document_id, preprocessing_result, model_result, composite_confidence,
170
+ validation_decision, user_id
171
+ )
172
+ else: # BLOCKED
173
+ response = await self._handle_blocked(
174
+ document_id, preprocessing_result, model_result, composite_confidence, user_id
175
+ )
176
+
177
+ # Update statistics
178
+ processing_time = time.time() - start_time
179
+ self._update_statistics(validation_decision, composite_confidence, processing_time)
180
+
181
+ return response
182
+
183
+ except Exception as e:
184
+ logger.error(f"Document processing error for {document_id}: {str(e)}")
185
+ return self._create_error_response(document_id, f"Processing error: {str(e)}")
186
+
187
+ def _calculate_composite_confidence(self,
188
+ preprocessing_result: ProcessingResult,
189
+ model_result: ModelInferenceResult) -> ConfidenceScore:
190
+ """Calculate composite confidence from all pipeline stages"""
191
+
192
+ # Extract individual confidence components
193
+ extraction_confidence = preprocessing_result.validation_result.compliance_score
194
+ model_confidence = model_result.confidence_score
195
+
196
+ # Calculate data quality based on multiple factors
197
+ data_quality_factors = []
198
+
199
+ # Factor 1: File detection confidence
200
+ if hasattr(preprocessing_result, 'file_detection'):
201
+ data_quality_factors.append(preprocessing_result.file_detection.confidence)
202
+
203
+ # Factor 2: PHI removal completeness (higher score = better quality)
204
+ if hasattr(preprocessing_result, 'phi_result'):
205
+ phi_completeness = 1.0 - (len(preprocessing_result.phi_result.redactions) / 100) # Normalize
206
+ data_quality_factors.append(max(0.0, min(1.0, phi_completeness)))
207
+
208
+ # Factor 3: Processing errors (fewer errors = higher quality)
209
+ processing_errors = len(model_result.errors) if model_result.errors else 0
210
+ error_factor = max(0.0, 1.0 - (processing_errors * 0.1)) # Each error reduces quality by 10%
211
+ data_quality_factors.append(error_factor)
212
+
213
+ # Factor 4: Model processing time (reasonable time = higher quality)
214
+ time_factor = 1.0
215
+ if model_result.processing_time > 0:
216
+ # Optimal processing time is 1-10 seconds
217
+ if 1.0 <= model_result.processing_time <= 10.0:
218
+ time_factor = 1.0
219
+ elif model_result.processing_time < 1.0:
220
+ time_factor = 0.8 # Too fast might indicate incomplete processing
221
+ else:
222
+ time_factor = max(0.5, 1.0 - ((model_result.processing_time - 10.0) / 50.0))
223
+
224
+ data_quality_factors.append(time_factor)
225
+
226
+ # Calculate average data quality
227
+ data_quality = sum(data_quality_factors) / len(data_quality_factors) if data_quality_factors else 0.5
228
+ data_quality = max(0.0, min(1.0, data_quality)) # Ensure 0-1 range
229
+
230
+ # Create composite confidence score
231
+ composite_confidence = ConfidenceScore(
232
+ extraction_confidence=extraction_confidence,
233
+ model_confidence=model_confidence,
234
+ data_quality=data_quality
235
+ )
236
+
237
+ logger.info(f"Composite confidence calculated: {composite_confidence.overall_confidence:.3f}")
238
+ logger.info(f" - Extraction: {extraction_confidence:.3f}")
239
+ logger.info(f" - Model: {model_confidence:.3f}")
240
+ logger.info(f" - Data Quality: {data_quality:.3f}")
241
+
242
+ return composite_confidence
243
+
244
+ def _make_validation_decision(self, confidence: ConfidenceScore) -> ValidationDecision:
245
+ """Make validation decision based on confidence thresholds"""
246
+ overall_confidence = confidence.overall_confidence
247
+
248
+ if overall_confidence >= self.confidence_thresholds["auto_approve"]:
249
+ return ValidationDecision.AUTO_APPROVE
250
+ elif overall_confidence >= self.confidence_thresholds["review_recommended"]:
251
+ return ValidationDecision.REVIEW_RECOMMENDED
252
+ elif overall_confidence >= self.confidence_thresholds["manual_required"]:
253
+ return ValidationDecision.MANUAL_REQUIRED
254
+ else:
255
+ return ValidationDecision.BLOCKED
256
+
257
+ def _determine_review_priority(self, confidence: ConfidenceScore) -> ReviewPriority:
258
+ """Determine review priority based on confidence score"""
259
+ overall = confidence.overall_confidence
260
+
261
+ if overall < 0.60:
262
+ return ReviewPriority.CRITICAL
263
+ elif overall < 0.70:
264
+ return ReviewPriority.HIGH
265
+ elif overall < 0.80:
266
+ return ReviewPriority.MEDIUM
267
+ elif overall < 0.90:
268
+ return ReviewPriority.LOW
269
+ else:
270
+ return ReviewPriority.NONE
271
+
272
+ async def _handle_auto_approval(self, document_id: str, preprocessing_result: ProcessingResult,
273
+ model_result: ModelInferenceResult, confidence: ConfidenceScore,
274
+ user_id: Optional[str]) -> Dict[str, Any]:
275
+ """Handle auto-approved documents"""
276
+
277
+ # Log the auto-approval
278
+ await self._log_audit_event(
279
+ document_id=document_id,
280
+ event_type="auto_approval",
281
+ user_id=user_id,
282
+ confidence_scores={
283
+ "extraction": confidence.extraction_confidence,
284
+ "model": confidence.model_confidence,
285
+ "data_quality": confidence.data_quality,
286
+ "overall": confidence.overall_confidence
287
+ },
288
+ decision="auto_approved",
289
+ reasoning=f"Confidence score {confidence.overall_confidence:.3f} meets auto-approval threshold (≥{self.confidence_thresholds['auto_approve']})"
290
+ )
291
+
292
+ return {
293
+ "document_id": document_id,
294
+ "status": "auto_approved",
295
+ "confidence": confidence.overall_confidence,
296
+ "decision": "auto_approve",
297
+ "reasoning": "High confidence - automatically approved",
298
+ "processing_result": {
299
+ "extraction_data": preprocessing_result.extraction_result,
300
+ "model_output": model_result.output_data,
301
+ "confidence_breakdown": {
302
+ "extraction": confidence.extraction_confidence,
303
+ "model": confidence.model_confidence,
304
+ "data_quality": confidence.data_quality
305
+ }
306
+ },
307
+ "requires_review": False,
308
+ "review_queue_id": None
309
+ }
310
+
311
+ async def _handle_review_required(self, document_id: str, preprocessing_result: ProcessingResult,
312
+ model_result: ModelInferenceResult, confidence: ConfidenceScore,
313
+ decision: ValidationDecision, user_id: Optional[str]) -> Dict[str, Any]:
314
+ """Handle documents requiring review"""
315
+
316
+ # Determine review priority
317
+ priority = self._determine_review_priority(confidence)
318
+
319
+ # Calculate review deadline
320
+ deadline = datetime.now() + self.review_deadlines[priority]
321
+
322
+ # Create review queue item
323
+ queue_item = ReviewQueueItem(
324
+ item_id=self._generate_queue_id(),
325
+ document_id=document_id,
326
+ priority=priority,
327
+ confidence_score=confidence,
328
+ processing_result=preprocessing_result,
329
+ model_inference=model_result,
330
+ review_decision=decision,
331
+ created_timestamp=datetime.now(),
332
+ review_deadline=deadline
333
+ )
334
+
335
+ # Add to review queue
336
+ self.review_queue[queue_item.item_id] = queue_item
337
+ await self._save_review_queue()
338
+
339
+ # Log the review requirement
340
+ await self._log_audit_event(
341
+ document_id=document_id,
342
+ event_type="review_required",
343
+ user_id=user_id,
344
+ confidence_scores={
345
+ "extraction": confidence.extraction_confidence,
346
+ "model": confidence.model_confidence,
347
+ "data_quality": confidence.data_quality,
348
+ "overall": confidence.overall_confidence
349
+ },
350
+ decision=decision.value,
351
+ reasoning=f"Confidence score {confidence.overall_confidence:.3f} requires review (threshold: {self.confidence_thresholds['review_recommended']}-{self.confidence_thresholds['auto_approve']})"
352
+ )
353
+
354
+ return {
355
+ "document_id": document_id,
356
+ "status": "review_required",
357
+ "confidence": confidence.overall_confidence,
358
+ "decision": decision.value,
359
+ "reasoning": self._get_review_reasoning(confidence, decision),
360
+ "review_queue_id": queue_item.item_id,
361
+ "priority": priority.value,
362
+ "review_deadline": deadline.isoformat(),
363
+ "processing_result": {
364
+ "extraction_data": preprocessing_result.extraction_result,
365
+ "model_output": model_result.output_data,
366
+ "confidence_breakdown": {
367
+ "extraction": confidence.extraction_confidence,
368
+ "model": confidence.model_confidence,
369
+ "data_quality": confidence.data_quality
370
+ },
371
+ "warnings": model_result.warnings
372
+ },
373
+ "requires_review": True
374
+ }
375
+
376
+ async def _handle_blocked(self, document_id: str, preprocessing_result: ProcessingResult,
377
+ model_result: ModelInferenceResult, confidence: ConfidenceScore,
378
+ user_id: Optional[str]) -> Dict[str, Any]:
379
+ """Handle blocked documents"""
380
+
381
+ # Log the blocking
382
+ await self._log_audit_event(
383
+ document_id=document_id,
384
+ event_type="blocked",
385
+ user_id=user_id,
386
+ confidence_scores={
387
+ "extraction": confidence.extraction_confidence,
388
+ "model": confidence.model_confidence,
389
+ "data_quality": confidence.data_quality,
390
+ "overall": confidence.overall_confidence
391
+ },
392
+ decision="blocked",
393
+ reasoning=f"Confidence score {confidence.overall_confidence:.3f} below acceptable threshold ({self.confidence_thresholds['manual_required']})"
394
+ )
395
+
396
+ return {
397
+ "document_id": document_id,
398
+ "status": "blocked",
399
+ "confidence": confidence.overall_confidence,
400
+ "decision": "blocked",
401
+ "reasoning": "Confidence too low for processing - manual intervention required",
402
+ "errors": model_result.errors,
403
+ "warnings": model_result.warnings,
404
+ "requires_review": True,
405
+ "escalate_immediately": True
406
+ }
407
+
408
+ def _get_review_reasoning(self, confidence: ConfidenceScore, decision: ValidationDecision) -> str:
409
+ """Generate human-readable reasoning for review requirement"""
410
+ overall = confidence.overall_confidence
411
+
412
+ reasons = []
413
+
414
+ if confidence.extraction_confidence < 0.80:
415
+ reasons.append(f"Low extraction confidence ({confidence.extraction_confidence:.3f})")
416
+
417
+ if confidence.model_confidence < 0.80:
418
+ reasons.append(f"Low model confidence ({confidence.model_confidence:.3f})")
419
+
420
+ if confidence.data_quality < 0.80:
421
+ reasons.append(f"Poor data quality ({confidence.data_quality:.3f})")
422
+
423
+ if decision == ValidationDecision.REVIEW_RECOMMENDED:
424
+ base_reason = f"Medium confidence ({overall:.3f}) - review recommended for quality assurance"
425
+ else:
426
+ base_reason = f"Low confidence ({overall:.3f}) - manual review required"
427
+
428
+ if reasons:
429
+ return f"{base_reason}. Issues: {', '.join(reasons)}"
430
+ else:
431
+ return base_reason
432
+
433
+ def get_review_queue_status(self) -> Dict[str, Any]:
434
+ """Get current review queue status"""
435
+ now = datetime.now()
436
+
437
+ # Categorize queue items
438
+ by_priority = {priority: [] for priority in ReviewPriority}
439
+ overdue = []
440
+ pending_count = 0
441
+
442
+ for item in self.review_queue.values():
443
+ if not item.reviewed_timestamp: # Still pending
444
+ pending_count += 1
445
+ by_priority[item.priority].append(item)
446
+
447
+ if now > item.review_deadline:
448
+ overdue.append(item)
449
+
450
+ return {
451
+ "total_pending": pending_count,
452
+ "by_priority": {
453
+ priority.value: len(items) for priority, items in by_priority.items()
454
+ },
455
+ "overdue_count": len(overdue),
456
+ "overdue_items": [
457
+ {
458
+ "item_id": item.item_id,
459
+ "document_id": item.document_id,
460
+ "priority": item.priority.value,
461
+ "overdue_hours": (now - item.review_deadline).total_seconds() / 3600
462
+ }
463
+ for item in overdue
464
+ ],
465
+ "queue_health": "healthy" if len(overdue) == 0 else "degraded" if len(overdue) < 5 else "critical"
466
+ }
467
+
468
+ async def _log_audit_event(self, document_id: str, event_type: str, user_id: Optional[str],
469
+ confidence_scores: Dict[str, float], decision: str, reasoning: str):
470
+ """Log audit event for compliance"""
471
+
472
+ log_entry = AuditLogEntry(
473
+ log_id=self._generate_log_id(),
474
+ document_id=document_id,
475
+ event_type=event_type,
476
+ timestamp=datetime.now(),
477
+ user_id=user_id,
478
+ confidence_scores=confidence_scores,
479
+ decision=decision,
480
+ reasoning=reasoning,
481
+ metadata={}
482
+ )
483
+
484
+ # Save to audit log file
485
+ log_file = self.audit_log_path / f"audit_{datetime.now().strftime('%Y%m%d')}.jsonl"
486
+ with open(log_file, 'a') as f:
487
+ f.write(json.dumps(asdict(log_entry), default=str) + '\n')
488
+
489
+ def _generate_document_id(self, file_path: Path) -> str:
490
+ """Generate unique document ID"""
491
+ content_hash = hashlib.sha256(str(file_path).encode()).hexdigest()[:8]
492
+ timestamp = int(time.time())
493
+ return f"doc_{timestamp}_{content_hash}"
494
+
495
+ def _generate_queue_id(self) -> str:
496
+ """Generate unique review queue ID"""
497
+ timestamp = int(time.time() * 1000) # Milliseconds for uniqueness
498
+ return f"queue_{timestamp}"
499
+
500
+ def _generate_log_id(self) -> str:
501
+ """Generate unique log ID"""
502
+ timestamp = int(time.time() * 1000)
503
+ return f"log_{timestamp}"
504
+
505
+ def _create_error_response(self, document_id: str, error_message: str) -> Dict[str, Any]:
506
+ """Create standardized error response"""
507
+ return {
508
+ "document_id": document_id,
509
+ "status": "error",
510
+ "confidence": 0.0,
511
+ "decision": "blocked",
512
+ "reasoning": error_message,
513
+ "requires_review": True,
514
+ "escalate_immediately": True,
515
+ "error": error_message
516
+ }
517
+
518
+ def load_review_queue(self):
519
+ """Load review queue from persistent storage"""
520
+ queue_file = self.review_queue_path / "review_queue.json"
521
+ if queue_file.exists():
522
+ try:
523
+ with open(queue_file, 'r') as f:
524
+ queue_data = json.load(f)
525
+ # Convert back to ReviewQueueItem objects
526
+ for item_id, item_data in queue_data.items():
527
+ # Handle datetime conversion
528
+ item_data['created_timestamp'] = datetime.fromisoformat(item_data['created_timestamp'])
529
+ item_data['review_deadline'] = datetime.fromisoformat(item_data['review_deadline'])
530
+ if item_data.get('reviewed_timestamp'):
531
+ item_data['reviewed_timestamp'] = datetime.fromisoformat(item_data['reviewed_timestamp'])
532
+ # Recreate objects (simplified for now)
533
+ self.review_queue[item_id] = item_data
534
+ logger.info(f"Loaded {len(self.review_queue)} items from review queue")
535
+ except Exception as e:
536
+ logger.error(f"Failed to load review queue: {e}")
537
+
538
+ async def _save_review_queue(self):
539
+ """Save review queue to persistent storage"""
540
+ queue_file = self.review_queue_path / "review_queue.json"
541
+ try:
542
+ # Convert to JSON-serializable format
543
+ queue_data = {}
544
+ for item_id, item in self.review_queue.items():
545
+ if isinstance(item, ReviewQueueItem):
546
+ queue_data[item_id] = asdict(item)
547
+ else:
548
+ queue_data[item_id] = item
549
+
550
+ with open(queue_file, 'w') as f:
551
+ json.dump(queue_data, f, indent=2, default=str)
552
+ except Exception as e:
553
+ logger.error(f"Failed to save review queue: {e}")
554
+
555
+ def _update_statistics(self, decision: ValidationDecision, confidence: ConfidenceScore, processing_time: float):
556
+ """Update system statistics"""
557
+ self.stats["total_processed"] += 1
558
+
559
+ if decision == ValidationDecision.AUTO_APPROVE:
560
+ self.stats["auto_approved"] += 1
561
+ elif decision == ValidationDecision.REVIEW_RECOMMENDED:
562
+ self.stats["review_recommended"] += 1
563
+ elif decision == ValidationDecision.MANUAL_REQUIRED:
564
+ self.stats["manual_required"] += 1
565
+ elif decision == ValidationDecision.BLOCKED:
566
+ self.stats["blocked"] += 1
567
+
568
+ # Update average confidence
569
+ total_confidence = self.stats["average_confidence"] * (self.stats["total_processed"] - 1)
570
+ self.stats["average_confidence"] = (total_confidence + confidence.overall_confidence) / self.stats["total_processed"]
571
+
572
+ # Track processing times
573
+ self.stats["processing_times"].append(processing_time)
574
+ if len(self.stats["processing_times"]) > 1000: # Keep last 1000 times
575
+ self.stats["processing_times"] = self.stats["processing_times"][-1000:]
576
+
577
+ def get_system_statistics(self) -> Dict[str, Any]:
578
+ """Get comprehensive system statistics"""
579
+ if self.stats["total_processed"] == 0:
580
+ return {"total_processed": 0, "status": "no_data"}
581
+
582
+ return {
583
+ "total_processed": self.stats["total_processed"],
584
+ "distribution": {
585
+ "auto_approved": {
586
+ "count": self.stats["auto_approved"],
587
+ "percentage": (self.stats["auto_approved"] / self.stats["total_processed"]) * 100
588
+ },
589
+ "review_recommended": {
590
+ "count": self.stats["review_recommended"],
591
+ "percentage": (self.stats["review_recommended"] / self.stats["total_processed"]) * 100
592
+ },
593
+ "manual_required": {
594
+ "count": self.stats["manual_required"],
595
+ "percentage": (self.stats["manual_required"] / self.stats["total_processed"]) * 100
596
+ },
597
+ "blocked": {
598
+ "count": self.stats["blocked"],
599
+ "percentage": (self.stats["blocked"] / self.stats["total_processed"]) * 100
600
+ }
601
+ },
602
+ "confidence_metrics": {
603
+ "average_confidence": self.stats["average_confidence"],
604
+ "success_rate": ((self.stats["auto_approved"] + self.stats["review_recommended"]) / self.stats["total_processed"]) * 100
605
+ },
606
+ "performance_metrics": {
607
+ "average_processing_time": sum(self.stats["processing_times"]) / len(self.stats["processing_times"]) if self.stats["processing_times"] else 0,
608
+ "median_processing_time": sorted(self.stats["processing_times"])[len(self.stats["processing_times"])//2] if self.stats["processing_times"] else 0
609
+ },
610
+ "system_health": "healthy" if self.stats["blocked"] / self.stats["total_processed"] < 0.1 else "degraded"
611
+ }
612
+
613
+
614
+ # Export main classes
615
+ __all__ = [
616
+ "ConfidenceGatingSystem",
617
+ "ReviewQueueItem",
618
+ "AuditLogEntry",
619
+ "ValidationDecision",
620
+ "ReviewPriority"
621
+ ]