""" Confidence Gating and Validation System - Phase 4 Implements composite confidence scoring, thresholds, and human review queue management. This module builds on the preprocessing pipeline and model routing to provide intelligent confidence-based gating, validation workflows, and review queue management for medical AI. Author: MiniMax Agent Date: 2025-10-29 Version: 1.0.0 """ import os import logging import asyncio import time import json import hashlib from typing import Dict, List, Optional, Any, Tuple, Union from dataclasses import dataclass, asdict from datetime import datetime, timedelta from enum import Enum from pathlib import Path # Import existing components from medical_schemas import ConfidenceScore, ValidationResult, MedicalDocumentMetadata from specialized_model_router import SpecializedModelRouter, ModelInferenceResult from preprocessing_pipeline import PreprocessingPipeline, ProcessingResult logger = logging.getLogger(__name__) class ReviewPriority(Enum): """Priority levels for human review""" CRITICAL = "critical" # <0.60 confidence - immediate manual review required HIGH = "high" # 0.60-0.75 confidence - review recommended within 1 hour MEDIUM = "medium" # 0.75-0.85 confidence - review recommended within 4 hours LOW = "low" # 0.85-0.95 confidence - optional review for quality assurance NONE = "none" # ≥0.95 confidence - auto-approve, audit only class ValidationDecision(Enum): """Final validation decisions""" AUTO_APPROVE = "auto_approve" # ≥0.85 confidence - automatically approved REVIEW_RECOMMENDED = "review_recommended" # 0.60-0.85 confidence - human review recommended MANUAL_REQUIRED = "manual_required" # <0.60 confidence - manual review required BLOCKED = "blocked" # Critical errors - processing blocked @dataclass class ReviewQueueItem: """Item in the human review queue""" item_id: str document_id: str priority: ReviewPriority confidence_score: ConfidenceScore processing_result: ProcessingResult model_inference: ModelInferenceResult review_decision: ValidationDecision created_timestamp: datetime review_deadline: datetime assigned_reviewer: Optional[str] = None review_notes: Optional[str] = None reviewer_decision: Optional[str] = None reviewed_timestamp: Optional[datetime] = None escalated: bool = False @dataclass class AuditLogEntry: """Audit log entry for compliance tracking""" log_id: str document_id: str event_type: str # "confidence_gating", "manual_review", "auto_approval", "escalation" timestamp: datetime user_id: Optional[str] confidence_scores: Dict[str, float] decision: str reasoning: str metadata: Dict[str, Any] class ConfidenceGatingSystem: """Main confidence gating and validation system""" def __init__(self, preprocessing_pipeline: Optional[PreprocessingPipeline] = None, model_router: Optional[SpecializedModelRouter] = None, review_queue_path: str = "/tmp/review_queue", audit_log_path: str = "/tmp/audit_logs"): """Initialize confidence gating system""" self.preprocessing_pipeline = preprocessing_pipeline or PreprocessingPipeline() self.model_router = model_router or SpecializedModelRouter() # Queue and logging setup self.review_queue_path = Path(review_queue_path) self.audit_log_path = Path(audit_log_path) self.review_queue_path.mkdir(exist_ok=True) self.audit_log_path.mkdir(exist_ok=True) # Review queue storage self.review_queue: Dict[str, ReviewQueueItem] = {} self.load_review_queue() # Confidence thresholds self.confidence_thresholds = { "auto_approve": 0.85, "review_recommended": 0.60, "manual_required": 0.0 } # Review deadlines by priority self.review_deadlines = { ReviewPriority.CRITICAL: timedelta(minutes=30), ReviewPriority.HIGH: timedelta(hours=1), ReviewPriority.MEDIUM: timedelta(hours=4), ReviewPriority.LOW: timedelta(hours=24), ReviewPriority.NONE: timedelta(days=7) # Audit only } # Statistics tracking self.stats = { "total_processed": 0, "auto_approved": 0, "review_recommended": 0, "manual_required": 0, "blocked": 0, "average_confidence": 0.0, "processing_times": [], "reviewer_performance": {} } logger.info("Confidence Gating System initialized") async def process_document(self, file_path: Path, user_id: Optional[str] = None) -> Dict[str, Any]: """Main document processing with confidence gating""" start_time = time.time() document_id = self._generate_document_id(file_path) try: logger.info(f"Processing document {document_id}: {file_path.name}") # Stage 1: Preprocessing pipeline preprocessing_result = await self.preprocessing_pipeline.process_file(file_path) if not preprocessing_result: return self._create_error_response(document_id, "Preprocessing failed") # Stage 2: Model inference model_result = await self.model_router.route_and_infer(preprocessing_result) if not model_result: return self._create_error_response(document_id, "Model inference failed") # Stage 3: Composite confidence calculation composite_confidence = self._calculate_composite_confidence( preprocessing_result, model_result ) # Stage 4: Confidence gating decision validation_decision = self._make_validation_decision(composite_confidence) # Stage 5: Handle based on decision if validation_decision == ValidationDecision.AUTO_APPROVE: response = await self._handle_auto_approval( document_id, preprocessing_result, model_result, composite_confidence, user_id ) elif validation_decision in [ValidationDecision.REVIEW_RECOMMENDED, ValidationDecision.MANUAL_REQUIRED]: response = await self._handle_review_required( document_id, preprocessing_result, model_result, composite_confidence, validation_decision, user_id ) else: # BLOCKED response = await self._handle_blocked( document_id, preprocessing_result, model_result, composite_confidence, user_id ) # Update statistics processing_time = time.time() - start_time self._update_statistics(validation_decision, composite_confidence, processing_time) return response except Exception as e: logger.error(f"Document processing error for {document_id}: {str(e)}") return self._create_error_response(document_id, f"Processing error: {str(e)}") def _calculate_composite_confidence(self, preprocessing_result: ProcessingResult, model_result: ModelInferenceResult) -> ConfidenceScore: """Calculate composite confidence from all pipeline stages""" # Extract individual confidence components extraction_confidence = preprocessing_result.validation_result.compliance_score model_confidence = model_result.confidence_score # Calculate data quality based on multiple factors data_quality_factors = [] # Factor 1: File detection confidence if hasattr(preprocessing_result, 'file_detection'): data_quality_factors.append(preprocessing_result.file_detection.confidence) # Factor 2: PHI removal completeness (higher score = better quality) if hasattr(preprocessing_result, 'phi_result'): phi_completeness = 1.0 - (len(preprocessing_result.phi_result.redactions) / 100) # Normalize data_quality_factors.append(max(0.0, min(1.0, phi_completeness))) # Factor 3: Processing errors (fewer errors = higher quality) processing_errors = len(model_result.errors) if model_result.errors else 0 error_factor = max(0.0, 1.0 - (processing_errors * 0.1)) # Each error reduces quality by 10% data_quality_factors.append(error_factor) # Factor 4: Model processing time (reasonable time = higher quality) time_factor = 1.0 if model_result.processing_time > 0: # Optimal processing time is 1-10 seconds if 1.0 <= model_result.processing_time <= 10.0: time_factor = 1.0 elif model_result.processing_time < 1.0: time_factor = 0.8 # Too fast might indicate incomplete processing else: time_factor = max(0.5, 1.0 - ((model_result.processing_time - 10.0) / 50.0)) data_quality_factors.append(time_factor) # Calculate average data quality data_quality = sum(data_quality_factors) / len(data_quality_factors) if data_quality_factors else 0.5 data_quality = max(0.0, min(1.0, data_quality)) # Ensure 0-1 range # Create composite confidence score composite_confidence = ConfidenceScore( extraction_confidence=extraction_confidence, model_confidence=model_confidence, data_quality=data_quality ) logger.info(f"Composite confidence calculated: {composite_confidence.overall_confidence:.3f}") logger.info(f" - Extraction: {extraction_confidence:.3f}") logger.info(f" - Model: {model_confidence:.3f}") logger.info(f" - Data Quality: {data_quality:.3f}") return composite_confidence def _make_validation_decision(self, confidence: ConfidenceScore) -> ValidationDecision: """Make validation decision based on confidence thresholds""" overall_confidence = confidence.overall_confidence if overall_confidence >= self.confidence_thresholds["auto_approve"]: return ValidationDecision.AUTO_APPROVE elif overall_confidence >= self.confidence_thresholds["review_recommended"]: return ValidationDecision.REVIEW_RECOMMENDED elif overall_confidence >= self.confidence_thresholds["manual_required"]: return ValidationDecision.MANUAL_REQUIRED else: return ValidationDecision.BLOCKED def _determine_review_priority(self, confidence: ConfidenceScore) -> ReviewPriority: """Determine review priority based on confidence score""" overall = confidence.overall_confidence if overall < 0.60: return ReviewPriority.CRITICAL elif overall < 0.70: return ReviewPriority.HIGH elif overall < 0.80: return ReviewPriority.MEDIUM elif overall < 0.90: return ReviewPriority.LOW else: return ReviewPriority.NONE async def _handle_auto_approval(self, document_id: str, preprocessing_result: ProcessingResult, model_result: ModelInferenceResult, confidence: ConfidenceScore, user_id: Optional[str]) -> Dict[str, Any]: """Handle auto-approved documents""" # Log the auto-approval await self._log_audit_event( document_id=document_id, event_type="auto_approval", user_id=user_id, confidence_scores={ "extraction": confidence.extraction_confidence, "model": confidence.model_confidence, "data_quality": confidence.data_quality, "overall": confidence.overall_confidence }, decision="auto_approved", reasoning=f"Confidence score {confidence.overall_confidence:.3f} meets auto-approval threshold (≥{self.confidence_thresholds['auto_approve']})" ) return { "document_id": document_id, "status": "auto_approved", "confidence": confidence.overall_confidence, "decision": "auto_approve", "reasoning": "High confidence - automatically approved", "processing_result": { "extraction_data": preprocessing_result.extraction_result, "model_output": model_result.output_data, "confidence_breakdown": { "extraction": confidence.extraction_confidence, "model": confidence.model_confidence, "data_quality": confidence.data_quality } }, "requires_review": False, "review_queue_id": None } async def _handle_review_required(self, document_id: str, preprocessing_result: ProcessingResult, model_result: ModelInferenceResult, confidence: ConfidenceScore, decision: ValidationDecision, user_id: Optional[str]) -> Dict[str, Any]: """Handle documents requiring review""" # Determine review priority priority = self._determine_review_priority(confidence) # Calculate review deadline deadline = datetime.now() + self.review_deadlines[priority] # Create review queue item queue_item = ReviewQueueItem( item_id=self._generate_queue_id(), document_id=document_id, priority=priority, confidence_score=confidence, processing_result=preprocessing_result, model_inference=model_result, review_decision=decision, created_timestamp=datetime.now(), review_deadline=deadline ) # Add to review queue self.review_queue[queue_item.item_id] = queue_item await self._save_review_queue() # Log the review requirement await self._log_audit_event( document_id=document_id, event_type="review_required", user_id=user_id, confidence_scores={ "extraction": confidence.extraction_confidence, "model": confidence.model_confidence, "data_quality": confidence.data_quality, "overall": confidence.overall_confidence }, decision=decision.value, reasoning=f"Confidence score {confidence.overall_confidence:.3f} requires review (threshold: {self.confidence_thresholds['review_recommended']}-{self.confidence_thresholds['auto_approve']})" ) return { "document_id": document_id, "status": "review_required", "confidence": confidence.overall_confidence, "decision": decision.value, "reasoning": self._get_review_reasoning(confidence, decision), "review_queue_id": queue_item.item_id, "priority": priority.value, "review_deadline": deadline.isoformat(), "processing_result": { "extraction_data": preprocessing_result.extraction_result, "model_output": model_result.output_data, "confidence_breakdown": { "extraction": confidence.extraction_confidence, "model": confidence.model_confidence, "data_quality": confidence.data_quality }, "warnings": model_result.warnings }, "requires_review": True } async def _handle_blocked(self, document_id: str, preprocessing_result: ProcessingResult, model_result: ModelInferenceResult, confidence: ConfidenceScore, user_id: Optional[str]) -> Dict[str, Any]: """Handle blocked documents""" # Log the blocking await self._log_audit_event( document_id=document_id, event_type="blocked", user_id=user_id, confidence_scores={ "extraction": confidence.extraction_confidence, "model": confidence.model_confidence, "data_quality": confidence.data_quality, "overall": confidence.overall_confidence }, decision="blocked", reasoning=f"Confidence score {confidence.overall_confidence:.3f} below acceptable threshold ({self.confidence_thresholds['manual_required']})" ) return { "document_id": document_id, "status": "blocked", "confidence": confidence.overall_confidence, "decision": "blocked", "reasoning": "Confidence too low for processing - manual intervention required", "errors": model_result.errors, "warnings": model_result.warnings, "requires_review": True, "escalate_immediately": True } def _get_review_reasoning(self, confidence: ConfidenceScore, decision: ValidationDecision) -> str: """Generate human-readable reasoning for review requirement""" overall = confidence.overall_confidence reasons = [] if confidence.extraction_confidence < 0.80: reasons.append(f"Low extraction confidence ({confidence.extraction_confidence:.3f})") if confidence.model_confidence < 0.80: reasons.append(f"Low model confidence ({confidence.model_confidence:.3f})") if confidence.data_quality < 0.80: reasons.append(f"Poor data quality ({confidence.data_quality:.3f})") if decision == ValidationDecision.REVIEW_RECOMMENDED: base_reason = f"Medium confidence ({overall:.3f}) - review recommended for quality assurance" else: base_reason = f"Low confidence ({overall:.3f}) - manual review required" if reasons: return f"{base_reason}. Issues: {', '.join(reasons)}" else: return base_reason def get_review_queue_status(self) -> Dict[str, Any]: """Get current review queue status""" now = datetime.now() # Categorize queue items by_priority = {priority: [] for priority in ReviewPriority} overdue = [] pending_count = 0 for item in self.review_queue.values(): if not item.reviewed_timestamp: # Still pending pending_count += 1 by_priority[item.priority].append(item) if now > item.review_deadline: overdue.append(item) return { "total_pending": pending_count, "by_priority": { priority.value: len(items) for priority, items in by_priority.items() }, "overdue_count": len(overdue), "overdue_items": [ { "item_id": item.item_id, "document_id": item.document_id, "priority": item.priority.value, "overdue_hours": (now - item.review_deadline).total_seconds() / 3600 } for item in overdue ], "queue_health": "healthy" if len(overdue) == 0 else "degraded" if len(overdue) < 5 else "critical" } async def _log_audit_event(self, document_id: str, event_type: str, user_id: Optional[str], confidence_scores: Dict[str, float], decision: str, reasoning: str): """Log audit event for compliance""" log_entry = AuditLogEntry( log_id=self._generate_log_id(), document_id=document_id, event_type=event_type, timestamp=datetime.now(), user_id=user_id, confidence_scores=confidence_scores, decision=decision, reasoning=reasoning, metadata={} ) # Save to audit log file log_file = self.audit_log_path / f"audit_{datetime.now().strftime('%Y%m%d')}.jsonl" with open(log_file, 'a') as f: f.write(json.dumps(asdict(log_entry), default=str) + '\n') def _generate_document_id(self, file_path: Path) -> str: """Generate unique document ID""" content_hash = hashlib.sha256(str(file_path).encode()).hexdigest()[:8] timestamp = int(time.time()) return f"doc_{timestamp}_{content_hash}" def _generate_queue_id(self) -> str: """Generate unique review queue ID""" timestamp = int(time.time() * 1000) # Milliseconds for uniqueness return f"queue_{timestamp}" def _generate_log_id(self) -> str: """Generate unique log ID""" timestamp = int(time.time() * 1000) return f"log_{timestamp}" def _create_error_response(self, document_id: str, error_message: str) -> Dict[str, Any]: """Create standardized error response""" return { "document_id": document_id, "status": "error", "confidence": 0.0, "decision": "blocked", "reasoning": error_message, "requires_review": True, "escalate_immediately": True, "error": error_message } def load_review_queue(self): """Load review queue from persistent storage""" queue_file = self.review_queue_path / "review_queue.json" if queue_file.exists(): try: with open(queue_file, 'r') as f: queue_data = json.load(f) # Convert back to ReviewQueueItem objects for item_id, item_data in queue_data.items(): # Handle datetime conversion item_data['created_timestamp'] = datetime.fromisoformat(item_data['created_timestamp']) item_data['review_deadline'] = datetime.fromisoformat(item_data['review_deadline']) if item_data.get('reviewed_timestamp'): item_data['reviewed_timestamp'] = datetime.fromisoformat(item_data['reviewed_timestamp']) # Recreate objects (simplified for now) self.review_queue[item_id] = item_data logger.info(f"Loaded {len(self.review_queue)} items from review queue") except Exception as e: logger.error(f"Failed to load review queue: {e}") async def _save_review_queue(self): """Save review queue to persistent storage""" queue_file = self.review_queue_path / "review_queue.json" try: # Convert to JSON-serializable format queue_data = {} for item_id, item in self.review_queue.items(): if isinstance(item, ReviewQueueItem): queue_data[item_id] = asdict(item) else: queue_data[item_id] = item with open(queue_file, 'w') as f: json.dump(queue_data, f, indent=2, default=str) except Exception as e: logger.error(f"Failed to save review queue: {e}") def _update_statistics(self, decision: ValidationDecision, confidence: ConfidenceScore, processing_time: float): """Update system statistics""" self.stats["total_processed"] += 1 if decision == ValidationDecision.AUTO_APPROVE: self.stats["auto_approved"] += 1 elif decision == ValidationDecision.REVIEW_RECOMMENDED: self.stats["review_recommended"] += 1 elif decision == ValidationDecision.MANUAL_REQUIRED: self.stats["manual_required"] += 1 elif decision == ValidationDecision.BLOCKED: self.stats["blocked"] += 1 # Update average confidence total_confidence = self.stats["average_confidence"] * (self.stats["total_processed"] - 1) self.stats["average_confidence"] = (total_confidence + confidence.overall_confidence) / self.stats["total_processed"] # Track processing times self.stats["processing_times"].append(processing_time) if len(self.stats["processing_times"]) > 1000: # Keep last 1000 times self.stats["processing_times"] = self.stats["processing_times"][-1000:] def get_system_statistics(self) -> Dict[str, Any]: """Get comprehensive system statistics""" if self.stats["total_processed"] == 0: return {"total_processed": 0, "status": "no_data"} return { "total_processed": self.stats["total_processed"], "distribution": { "auto_approved": { "count": self.stats["auto_approved"], "percentage": (self.stats["auto_approved"] / self.stats["total_processed"]) * 100 }, "review_recommended": { "count": self.stats["review_recommended"], "percentage": (self.stats["review_recommended"] / self.stats["total_processed"]) * 100 }, "manual_required": { "count": self.stats["manual_required"], "percentage": (self.stats["manual_required"] / self.stats["total_processed"]) * 100 }, "blocked": { "count": self.stats["blocked"], "percentage": (self.stats["blocked"] / self.stats["total_processed"]) * 100 } }, "confidence_metrics": { "average_confidence": self.stats["average_confidence"], "success_rate": ((self.stats["auto_approved"] + self.stats["review_recommended"]) / self.stats["total_processed"]) * 100 }, "performance_metrics": { "average_processing_time": sum(self.stats["processing_times"]) / len(self.stats["processing_times"]) if self.stats["processing_times"] else 0, "median_processing_time": sorted(self.stats["processing_times"])[len(self.stats["processing_times"])//2] if self.stats["processing_times"] else 0 }, "system_health": "healthy" if self.stats["blocked"] / self.stats["total_processed"] < 0.1 else "degraded" } # Export main classes __all__ = [ "ConfidenceGatingSystem", "ReviewQueueItem", "AuditLogEntry", "ValidationDecision", "ReviewPriority" ]