Spaces:
Sleeping
Sleeping
| """ | |
| Confidence Gating and Validation System - Phase 4 | |
| Implements composite confidence scoring, thresholds, and human review queue management. | |
| This module builds on the preprocessing pipeline and model routing to provide intelligent | |
| confidence-based gating, validation workflows, and review queue management for medical AI. | |
| Author: MiniMax Agent | |
| Date: 2025-10-29 | |
| Version: 1.0.0 | |
| """ | |
| import os | |
| import logging | |
| import asyncio | |
| import time | |
| import json | |
| import hashlib | |
| from typing import Dict, List, Optional, Any, Tuple, Union | |
| from dataclasses import dataclass, asdict | |
| from datetime import datetime, timedelta | |
| from enum import Enum | |
| from pathlib import Path | |
| # Import existing components | |
| from medical_schemas import ConfidenceScore, ValidationResult, MedicalDocumentMetadata | |
| from specialized_model_router import SpecializedModelRouter, ModelInferenceResult | |
| from preprocessing_pipeline import PreprocessingPipeline, ProcessingResult | |
| logger = logging.getLogger(__name__) | |
| class ReviewPriority(Enum): | |
| """Priority levels for human review""" | |
| CRITICAL = "critical" # <0.60 confidence - immediate manual review required | |
| HIGH = "high" # 0.60-0.75 confidence - review recommended within 1 hour | |
| MEDIUM = "medium" # 0.75-0.85 confidence - review recommended within 4 hours | |
| LOW = "low" # 0.85-0.95 confidence - optional review for quality assurance | |
| NONE = "none" # ≥0.95 confidence - auto-approve, audit only | |
| class ValidationDecision(Enum): | |
| """Final validation decisions""" | |
| AUTO_APPROVE = "auto_approve" # ≥0.85 confidence - automatically approved | |
| REVIEW_RECOMMENDED = "review_recommended" # 0.60-0.85 confidence - human review recommended | |
| MANUAL_REQUIRED = "manual_required" # <0.60 confidence - manual review required | |
| BLOCKED = "blocked" # Critical errors - processing blocked | |
| class ReviewQueueItem: | |
| """Item in the human review queue""" | |
| item_id: str | |
| document_id: str | |
| priority: ReviewPriority | |
| confidence_score: ConfidenceScore | |
| processing_result: ProcessingResult | |
| model_inference: ModelInferenceResult | |
| review_decision: ValidationDecision | |
| created_timestamp: datetime | |
| review_deadline: datetime | |
| assigned_reviewer: Optional[str] = None | |
| review_notes: Optional[str] = None | |
| reviewer_decision: Optional[str] = None | |
| reviewed_timestamp: Optional[datetime] = None | |
| escalated: bool = False | |
| class AuditLogEntry: | |
| """Audit log entry for compliance tracking""" | |
| log_id: str | |
| document_id: str | |
| event_type: str # "confidence_gating", "manual_review", "auto_approval", "escalation" | |
| timestamp: datetime | |
| user_id: Optional[str] | |
| confidence_scores: Dict[str, float] | |
| decision: str | |
| reasoning: str | |
| metadata: Dict[str, Any] | |
| class ConfidenceGatingSystem: | |
| """Main confidence gating and validation system""" | |
| def __init__(self, | |
| preprocessing_pipeline: Optional[PreprocessingPipeline] = None, | |
| model_router: Optional[SpecializedModelRouter] = None, | |
| review_queue_path: str = "/tmp/review_queue", | |
| audit_log_path: str = "/tmp/audit_logs"): | |
| """Initialize confidence gating system""" | |
| self.preprocessing_pipeline = preprocessing_pipeline or PreprocessingPipeline() | |
| self.model_router = model_router or SpecializedModelRouter() | |
| # Queue and logging setup | |
| self.review_queue_path = Path(review_queue_path) | |
| self.audit_log_path = Path(audit_log_path) | |
| self.review_queue_path.mkdir(exist_ok=True) | |
| self.audit_log_path.mkdir(exist_ok=True) | |
| # Review queue storage | |
| self.review_queue: Dict[str, ReviewQueueItem] = {} | |
| self.load_review_queue() | |
| # Confidence thresholds | |
| self.confidence_thresholds = { | |
| "auto_approve": 0.85, | |
| "review_recommended": 0.60, | |
| "manual_required": 0.0 | |
| } | |
| # Review deadlines by priority | |
| self.review_deadlines = { | |
| ReviewPriority.CRITICAL: timedelta(minutes=30), | |
| ReviewPriority.HIGH: timedelta(hours=1), | |
| ReviewPriority.MEDIUM: timedelta(hours=4), | |
| ReviewPriority.LOW: timedelta(hours=24), | |
| ReviewPriority.NONE: timedelta(days=7) # Audit only | |
| } | |
| # Statistics tracking | |
| self.stats = { | |
| "total_processed": 0, | |
| "auto_approved": 0, | |
| "review_recommended": 0, | |
| "manual_required": 0, | |
| "blocked": 0, | |
| "average_confidence": 0.0, | |
| "processing_times": [], | |
| "reviewer_performance": {} | |
| } | |
| logger.info("Confidence Gating System initialized") | |
| async def process_document(self, file_path: Path, user_id: Optional[str] = None) -> Dict[str, Any]: | |
| """Main document processing with confidence gating""" | |
| start_time = time.time() | |
| document_id = self._generate_document_id(file_path) | |
| try: | |
| logger.info(f"Processing document {document_id}: {file_path.name}") | |
| # Stage 1: Preprocessing pipeline | |
| preprocessing_result = await self.preprocessing_pipeline.process_file(file_path) | |
| if not preprocessing_result: | |
| return self._create_error_response(document_id, "Preprocessing failed") | |
| # Stage 2: Model inference | |
| model_result = await self.model_router.route_and_infer(preprocessing_result) | |
| if not model_result: | |
| return self._create_error_response(document_id, "Model inference failed") | |
| # Stage 3: Composite confidence calculation | |
| composite_confidence = self._calculate_composite_confidence( | |
| preprocessing_result, model_result | |
| ) | |
| # Stage 4: Confidence gating decision | |
| validation_decision = self._make_validation_decision(composite_confidence) | |
| # Stage 5: Handle based on decision | |
| if validation_decision == ValidationDecision.AUTO_APPROVE: | |
| response = await self._handle_auto_approval( | |
| document_id, preprocessing_result, model_result, composite_confidence, user_id | |
| ) | |
| elif validation_decision in [ValidationDecision.REVIEW_RECOMMENDED, ValidationDecision.MANUAL_REQUIRED]: | |
| response = await self._handle_review_required( | |
| document_id, preprocessing_result, model_result, composite_confidence, | |
| validation_decision, user_id | |
| ) | |
| else: # BLOCKED | |
| response = await self._handle_blocked( | |
| document_id, preprocessing_result, model_result, composite_confidence, user_id | |
| ) | |
| # Update statistics | |
| processing_time = time.time() - start_time | |
| self._update_statistics(validation_decision, composite_confidence, processing_time) | |
| return response | |
| except Exception as e: | |
| logger.error(f"Document processing error for {document_id}: {str(e)}") | |
| return self._create_error_response(document_id, f"Processing error: {str(e)}") | |
| def _calculate_composite_confidence(self, | |
| preprocessing_result: ProcessingResult, | |
| model_result: ModelInferenceResult) -> ConfidenceScore: | |
| """Calculate composite confidence from all pipeline stages""" | |
| # Extract individual confidence components | |
| extraction_confidence = preprocessing_result.validation_result.compliance_score | |
| model_confidence = model_result.confidence_score | |
| # Calculate data quality based on multiple factors | |
| data_quality_factors = [] | |
| # Factor 1: File detection confidence | |
| if hasattr(preprocessing_result, 'file_detection'): | |
| data_quality_factors.append(preprocessing_result.file_detection.confidence) | |
| # Factor 2: PHI removal completeness (higher score = better quality) | |
| if hasattr(preprocessing_result, 'phi_result'): | |
| phi_completeness = 1.0 - (len(preprocessing_result.phi_result.redactions) / 100) # Normalize | |
| data_quality_factors.append(max(0.0, min(1.0, phi_completeness))) | |
| # Factor 3: Processing errors (fewer errors = higher quality) | |
| processing_errors = len(model_result.errors) if model_result.errors else 0 | |
| error_factor = max(0.0, 1.0 - (processing_errors * 0.1)) # Each error reduces quality by 10% | |
| data_quality_factors.append(error_factor) | |
| # Factor 4: Model processing time (reasonable time = higher quality) | |
| time_factor = 1.0 | |
| if model_result.processing_time > 0: | |
| # Optimal processing time is 1-10 seconds | |
| if 1.0 <= model_result.processing_time <= 10.0: | |
| time_factor = 1.0 | |
| elif model_result.processing_time < 1.0: | |
| time_factor = 0.8 # Too fast might indicate incomplete processing | |
| else: | |
| time_factor = max(0.5, 1.0 - ((model_result.processing_time - 10.0) / 50.0)) | |
| data_quality_factors.append(time_factor) | |
| # Calculate average data quality | |
| data_quality = sum(data_quality_factors) / len(data_quality_factors) if data_quality_factors else 0.5 | |
| data_quality = max(0.0, min(1.0, data_quality)) # Ensure 0-1 range | |
| # Create composite confidence score | |
| composite_confidence = ConfidenceScore( | |
| extraction_confidence=extraction_confidence, | |
| model_confidence=model_confidence, | |
| data_quality=data_quality | |
| ) | |
| logger.info(f"Composite confidence calculated: {composite_confidence.overall_confidence:.3f}") | |
| logger.info(f" - Extraction: {extraction_confidence:.3f}") | |
| logger.info(f" - Model: {model_confidence:.3f}") | |
| logger.info(f" - Data Quality: {data_quality:.3f}") | |
| return composite_confidence | |
| def _make_validation_decision(self, confidence: ConfidenceScore) -> ValidationDecision: | |
| """Make validation decision based on confidence thresholds""" | |
| overall_confidence = confidence.overall_confidence | |
| if overall_confidence >= self.confidence_thresholds["auto_approve"]: | |
| return ValidationDecision.AUTO_APPROVE | |
| elif overall_confidence >= self.confidence_thresholds["review_recommended"]: | |
| return ValidationDecision.REVIEW_RECOMMENDED | |
| elif overall_confidence >= self.confidence_thresholds["manual_required"]: | |
| return ValidationDecision.MANUAL_REQUIRED | |
| else: | |
| return ValidationDecision.BLOCKED | |
| def _determine_review_priority(self, confidence: ConfidenceScore) -> ReviewPriority: | |
| """Determine review priority based on confidence score""" | |
| overall = confidence.overall_confidence | |
| if overall < 0.60: | |
| return ReviewPriority.CRITICAL | |
| elif overall < 0.70: | |
| return ReviewPriority.HIGH | |
| elif overall < 0.80: | |
| return ReviewPriority.MEDIUM | |
| elif overall < 0.90: | |
| return ReviewPriority.LOW | |
| else: | |
| return ReviewPriority.NONE | |
| async def _handle_auto_approval(self, document_id: str, preprocessing_result: ProcessingResult, | |
| model_result: ModelInferenceResult, confidence: ConfidenceScore, | |
| user_id: Optional[str]) -> Dict[str, Any]: | |
| """Handle auto-approved documents""" | |
| # Log the auto-approval | |
| await self._log_audit_event( | |
| document_id=document_id, | |
| event_type="auto_approval", | |
| user_id=user_id, | |
| confidence_scores={ | |
| "extraction": confidence.extraction_confidence, | |
| "model": confidence.model_confidence, | |
| "data_quality": confidence.data_quality, | |
| "overall": confidence.overall_confidence | |
| }, | |
| decision="auto_approved", | |
| reasoning=f"Confidence score {confidence.overall_confidence:.3f} meets auto-approval threshold (≥{self.confidence_thresholds['auto_approve']})" | |
| ) | |
| return { | |
| "document_id": document_id, | |
| "status": "auto_approved", | |
| "confidence": confidence.overall_confidence, | |
| "decision": "auto_approve", | |
| "reasoning": "High confidence - automatically approved", | |
| "processing_result": { | |
| "extraction_data": preprocessing_result.extraction_result, | |
| "model_output": model_result.output_data, | |
| "confidence_breakdown": { | |
| "extraction": confidence.extraction_confidence, | |
| "model": confidence.model_confidence, | |
| "data_quality": confidence.data_quality | |
| } | |
| }, | |
| "requires_review": False, | |
| "review_queue_id": None | |
| } | |
| async def _handle_review_required(self, document_id: str, preprocessing_result: ProcessingResult, | |
| model_result: ModelInferenceResult, confidence: ConfidenceScore, | |
| decision: ValidationDecision, user_id: Optional[str]) -> Dict[str, Any]: | |
| """Handle documents requiring review""" | |
| # Determine review priority | |
| priority = self._determine_review_priority(confidence) | |
| # Calculate review deadline | |
| deadline = datetime.now() + self.review_deadlines[priority] | |
| # Create review queue item | |
| queue_item = ReviewQueueItem( | |
| item_id=self._generate_queue_id(), | |
| document_id=document_id, | |
| priority=priority, | |
| confidence_score=confidence, | |
| processing_result=preprocessing_result, | |
| model_inference=model_result, | |
| review_decision=decision, | |
| created_timestamp=datetime.now(), | |
| review_deadline=deadline | |
| ) | |
| # Add to review queue | |
| self.review_queue[queue_item.item_id] = queue_item | |
| await self._save_review_queue() | |
| # Log the review requirement | |
| await self._log_audit_event( | |
| document_id=document_id, | |
| event_type="review_required", | |
| user_id=user_id, | |
| confidence_scores={ | |
| "extraction": confidence.extraction_confidence, | |
| "model": confidence.model_confidence, | |
| "data_quality": confidence.data_quality, | |
| "overall": confidence.overall_confidence | |
| }, | |
| decision=decision.value, | |
| reasoning=f"Confidence score {confidence.overall_confidence:.3f} requires review (threshold: {self.confidence_thresholds['review_recommended']}-{self.confidence_thresholds['auto_approve']})" | |
| ) | |
| return { | |
| "document_id": document_id, | |
| "status": "review_required", | |
| "confidence": confidence.overall_confidence, | |
| "decision": decision.value, | |
| "reasoning": self._get_review_reasoning(confidence, decision), | |
| "review_queue_id": queue_item.item_id, | |
| "priority": priority.value, | |
| "review_deadline": deadline.isoformat(), | |
| "processing_result": { | |
| "extraction_data": preprocessing_result.extraction_result, | |
| "model_output": model_result.output_data, | |
| "confidence_breakdown": { | |
| "extraction": confidence.extraction_confidence, | |
| "model": confidence.model_confidence, | |
| "data_quality": confidence.data_quality | |
| }, | |
| "warnings": model_result.warnings | |
| }, | |
| "requires_review": True | |
| } | |
| async def _handle_blocked(self, document_id: str, preprocessing_result: ProcessingResult, | |
| model_result: ModelInferenceResult, confidence: ConfidenceScore, | |
| user_id: Optional[str]) -> Dict[str, Any]: | |
| """Handle blocked documents""" | |
| # Log the blocking | |
| await self._log_audit_event( | |
| document_id=document_id, | |
| event_type="blocked", | |
| user_id=user_id, | |
| confidence_scores={ | |
| "extraction": confidence.extraction_confidence, | |
| "model": confidence.model_confidence, | |
| "data_quality": confidence.data_quality, | |
| "overall": confidence.overall_confidence | |
| }, | |
| decision="blocked", | |
| reasoning=f"Confidence score {confidence.overall_confidence:.3f} below acceptable threshold ({self.confidence_thresholds['manual_required']})" | |
| ) | |
| return { | |
| "document_id": document_id, | |
| "status": "blocked", | |
| "confidence": confidence.overall_confidence, | |
| "decision": "blocked", | |
| "reasoning": "Confidence too low for processing - manual intervention required", | |
| "errors": model_result.errors, | |
| "warnings": model_result.warnings, | |
| "requires_review": True, | |
| "escalate_immediately": True | |
| } | |
| def _get_review_reasoning(self, confidence: ConfidenceScore, decision: ValidationDecision) -> str: | |
| """Generate human-readable reasoning for review requirement""" | |
| overall = confidence.overall_confidence | |
| reasons = [] | |
| if confidence.extraction_confidence < 0.80: | |
| reasons.append(f"Low extraction confidence ({confidence.extraction_confidence:.3f})") | |
| if confidence.model_confidence < 0.80: | |
| reasons.append(f"Low model confidence ({confidence.model_confidence:.3f})") | |
| if confidence.data_quality < 0.80: | |
| reasons.append(f"Poor data quality ({confidence.data_quality:.3f})") | |
| if decision == ValidationDecision.REVIEW_RECOMMENDED: | |
| base_reason = f"Medium confidence ({overall:.3f}) - review recommended for quality assurance" | |
| else: | |
| base_reason = f"Low confidence ({overall:.3f}) - manual review required" | |
| if reasons: | |
| return f"{base_reason}. Issues: {', '.join(reasons)}" | |
| else: | |
| return base_reason | |
| def get_review_queue_status(self) -> Dict[str, Any]: | |
| """Get current review queue status""" | |
| now = datetime.now() | |
| # Categorize queue items | |
| by_priority = {priority: [] for priority in ReviewPriority} | |
| overdue = [] | |
| pending_count = 0 | |
| for item in self.review_queue.values(): | |
| if not item.reviewed_timestamp: # Still pending | |
| pending_count += 1 | |
| by_priority[item.priority].append(item) | |
| if now > item.review_deadline: | |
| overdue.append(item) | |
| return { | |
| "total_pending": pending_count, | |
| "by_priority": { | |
| priority.value: len(items) for priority, items in by_priority.items() | |
| }, | |
| "overdue_count": len(overdue), | |
| "overdue_items": [ | |
| { | |
| "item_id": item.item_id, | |
| "document_id": item.document_id, | |
| "priority": item.priority.value, | |
| "overdue_hours": (now - item.review_deadline).total_seconds() / 3600 | |
| } | |
| for item in overdue | |
| ], | |
| "queue_health": "healthy" if len(overdue) == 0 else "degraded" if len(overdue) < 5 else "critical" | |
| } | |
| async def _log_audit_event(self, document_id: str, event_type: str, user_id: Optional[str], | |
| confidence_scores: Dict[str, float], decision: str, reasoning: str): | |
| """Log audit event for compliance""" | |
| log_entry = AuditLogEntry( | |
| log_id=self._generate_log_id(), | |
| document_id=document_id, | |
| event_type=event_type, | |
| timestamp=datetime.now(), | |
| user_id=user_id, | |
| confidence_scores=confidence_scores, | |
| decision=decision, | |
| reasoning=reasoning, | |
| metadata={} | |
| ) | |
| # Save to audit log file | |
| log_file = self.audit_log_path / f"audit_{datetime.now().strftime('%Y%m%d')}.jsonl" | |
| with open(log_file, 'a') as f: | |
| f.write(json.dumps(asdict(log_entry), default=str) + '\n') | |
| def _generate_document_id(self, file_path: Path) -> str: | |
| """Generate unique document ID""" | |
| content_hash = hashlib.sha256(str(file_path).encode()).hexdigest()[:8] | |
| timestamp = int(time.time()) | |
| return f"doc_{timestamp}_{content_hash}" | |
| def _generate_queue_id(self) -> str: | |
| """Generate unique review queue ID""" | |
| timestamp = int(time.time() * 1000) # Milliseconds for uniqueness | |
| return f"queue_{timestamp}" | |
| def _generate_log_id(self) -> str: | |
| """Generate unique log ID""" | |
| timestamp = int(time.time() * 1000) | |
| return f"log_{timestamp}" | |
| def _create_error_response(self, document_id: str, error_message: str) -> Dict[str, Any]: | |
| """Create standardized error response""" | |
| return { | |
| "document_id": document_id, | |
| "status": "error", | |
| "confidence": 0.0, | |
| "decision": "blocked", | |
| "reasoning": error_message, | |
| "requires_review": True, | |
| "escalate_immediately": True, | |
| "error": error_message | |
| } | |
| def load_review_queue(self): | |
| """Load review queue from persistent storage""" | |
| queue_file = self.review_queue_path / "review_queue.json" | |
| if queue_file.exists(): | |
| try: | |
| with open(queue_file, 'r') as f: | |
| queue_data = json.load(f) | |
| # Convert back to ReviewQueueItem objects | |
| for item_id, item_data in queue_data.items(): | |
| # Handle datetime conversion | |
| item_data['created_timestamp'] = datetime.fromisoformat(item_data['created_timestamp']) | |
| item_data['review_deadline'] = datetime.fromisoformat(item_data['review_deadline']) | |
| if item_data.get('reviewed_timestamp'): | |
| item_data['reviewed_timestamp'] = datetime.fromisoformat(item_data['reviewed_timestamp']) | |
| # Recreate objects (simplified for now) | |
| self.review_queue[item_id] = item_data | |
| logger.info(f"Loaded {len(self.review_queue)} items from review queue") | |
| except Exception as e: | |
| logger.error(f"Failed to load review queue: {e}") | |
| async def _save_review_queue(self): | |
| """Save review queue to persistent storage""" | |
| queue_file = self.review_queue_path / "review_queue.json" | |
| try: | |
| # Convert to JSON-serializable format | |
| queue_data = {} | |
| for item_id, item in self.review_queue.items(): | |
| if isinstance(item, ReviewQueueItem): | |
| queue_data[item_id] = asdict(item) | |
| else: | |
| queue_data[item_id] = item | |
| with open(queue_file, 'w') as f: | |
| json.dump(queue_data, f, indent=2, default=str) | |
| except Exception as e: | |
| logger.error(f"Failed to save review queue: {e}") | |
| def _update_statistics(self, decision: ValidationDecision, confidence: ConfidenceScore, processing_time: float): | |
| """Update system statistics""" | |
| self.stats["total_processed"] += 1 | |
| if decision == ValidationDecision.AUTO_APPROVE: | |
| self.stats["auto_approved"] += 1 | |
| elif decision == ValidationDecision.REVIEW_RECOMMENDED: | |
| self.stats["review_recommended"] += 1 | |
| elif decision == ValidationDecision.MANUAL_REQUIRED: | |
| self.stats["manual_required"] += 1 | |
| elif decision == ValidationDecision.BLOCKED: | |
| self.stats["blocked"] += 1 | |
| # Update average confidence | |
| total_confidence = self.stats["average_confidence"] * (self.stats["total_processed"] - 1) | |
| self.stats["average_confidence"] = (total_confidence + confidence.overall_confidence) / self.stats["total_processed"] | |
| # Track processing times | |
| self.stats["processing_times"].append(processing_time) | |
| if len(self.stats["processing_times"]) > 1000: # Keep last 1000 times | |
| self.stats["processing_times"] = self.stats["processing_times"][-1000:] | |
| def get_system_statistics(self) -> Dict[str, Any]: | |
| """Get comprehensive system statistics""" | |
| if self.stats["total_processed"] == 0: | |
| return {"total_processed": 0, "status": "no_data"} | |
| return { | |
| "total_processed": self.stats["total_processed"], | |
| "distribution": { | |
| "auto_approved": { | |
| "count": self.stats["auto_approved"], | |
| "percentage": (self.stats["auto_approved"] / self.stats["total_processed"]) * 100 | |
| }, | |
| "review_recommended": { | |
| "count": self.stats["review_recommended"], | |
| "percentage": (self.stats["review_recommended"] / self.stats["total_processed"]) * 100 | |
| }, | |
| "manual_required": { | |
| "count": self.stats["manual_required"], | |
| "percentage": (self.stats["manual_required"] / self.stats["total_processed"]) * 100 | |
| }, | |
| "blocked": { | |
| "count": self.stats["blocked"], | |
| "percentage": (self.stats["blocked"] / self.stats["total_processed"]) * 100 | |
| } | |
| }, | |
| "confidence_metrics": { | |
| "average_confidence": self.stats["average_confidence"], | |
| "success_rate": ((self.stats["auto_approved"] + self.stats["review_recommended"]) / self.stats["total_processed"]) * 100 | |
| }, | |
| "performance_metrics": { | |
| "average_processing_time": sum(self.stats["processing_times"]) / len(self.stats["processing_times"]) if self.stats["processing_times"] else 0, | |
| "median_processing_time": sorted(self.stats["processing_times"])[len(self.stats["processing_times"])//2] if self.stats["processing_times"] else 0 | |
| }, | |
| "system_health": "healthy" if self.stats["blocked"] / self.stats["total_processed"] < 0.1 else "degraded" | |
| } | |
| # Export main classes | |
| __all__ = [ | |
| "ConfidenceGatingSystem", | |
| "ReviewQueueItem", | |
| "AuditLogEntry", | |
| "ValidationDecision", | |
| "ReviewPriority" | |
| ] |