""" Medical Preprocessing Pipeline - Phase 2 Central orchestration layer for medical file processing and extraction. This module coordinates all preprocessing components including file detection, PHI de-identification, and modality-specific extraction to produce structured data for AI model processing. Author: MiniMax Agent Date: 2025-10-29 Version: 1.0.0 """ import os import json import logging import time from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass, asdict from pathlib import Path import traceback from file_detector import MedicalFileDetector, FileDetectionResult, MedicalFileType from phi_deidentifier import MedicalPHIDeidentifier, DeidentificationResult, PHICategory from pdf_extractor import MedicalPDFProcessor, ExtractionResult from dicom_processor import DICOMProcessor, DICOMProcessingResult from ecg_processor import ECGSignalProcessor, ECGProcessingResult from medical_schemas import ( ValidationResult, validate_document_schema, route_to_specialized_model, MedicalDocumentMetadata, ConfidenceScore ) logger = logging.getLogger(__name__) @dataclass class ProcessingPipelineResult: """Result of complete preprocessing pipeline""" document_id: str file_detection: FileDetectionResult deidentification_result: Optional[DeidentificationResult] extraction_result: Any # Can be ExtractionResult, DICOMProcessingResult, or ECGProcessingResult structured_data: Dict[str, Any] validation_result: ValidationResult model_routing: Dict[str, Any] processing_time: float pipeline_metadata: Dict[str, Any] class MedicalPreprocessingPipeline: """Main preprocessing pipeline for medical documents""" def __init__(self, config: Optional[Dict[str, Any]] = None): self.config = config or self._default_config() # Initialize components self.file_detector = MedicalFileDetector() self.phi_deidentifier = MedicalPHIDeidentifier(self.config.get('phi_config', {})) self.pdf_processor = MedicalPDFProcessor() self.dicom_processor = DICOMProcessor() self.ecg_processor = ECGSignalProcessor() # Pipeline statistics self.stats = { "total_processed": 0, "successful_processing": 0, "phi_deidentified": 0, "validation_passed": 0, "processing_times": [], "error_counts": {} } logger.info("Medical Preprocessing Pipeline initialized") def _default_config(self) -> Dict[str, Any]: """Default pipeline configuration""" return { "enable_phi_deidentification": True, "enable_validation": True, "enable_model_routing": True, "max_file_size_mb": 100, "supported_formats": [".pdf", ".dcm", ".dicom", ".xml", ".scp", ".csv"], "phi_config": { "compliance_level": "HIPAA", "use_hashing": True, "redaction_method": "placeholder" }, "validation_strict_mode": False, "output_format": "schema_compliant" } def process_document(self, file_path: str, document_type: str = "auto") -> ProcessingPipelineResult: """ Process a single medical document through the complete pipeline Args: file_path: Path to medical document document_type: Document type hint ("auto", "radiology", "laboratory", etc.) Returns: ProcessingPipelineResult with complete processing results """ start_time = time.time() document_id = self._generate_document_id(file_path) try: logger.info(f"Starting processing pipeline for document: {file_path}") # Step 1: File Detection and Analysis file_detection = self._detect_and_analyze_file(file_path) # Step 2: PHI De-identification (if enabled and needed) deidentification_result = None if self.config["enable_phi_deidentification"]: deidentification_result = self._perform_phi_deidentification(file_path, file_detection) # Step 3: Extract Structured Data extraction_result = self._extract_structured_data(file_path, file_detection, document_type) # Step 4: Validate Against Schema validation_result = self._validate_extracted_data(extraction_result) # Step 5: Model Routing model_routing = self._determine_model_routing(extraction_result, validation_result) # Step 6: Compile Final Results processing_time = time.time() - start_time pipeline_metadata = { "pipeline_version": "1.0.0", "processing_timestamp": time.time(), "file_size": os.path.getsize(file_path) if os.path.exists(file_path) else 0, "config_used": self.config } result = ProcessingPipelineResult( document_id=document_id, file_detection=file_detection, deidentification_result=deidentification_result, extraction_result=extraction_result, structured_data=self._compile_structured_data(extraction_result, deidentification_result), validation_result=validation_result, model_routing=model_routing, processing_time=processing_time, pipeline_metadata=pipeline_metadata ) # Update statistics self._update_statistics(result, True) logger.info(f"Pipeline processing completed successfully in {processing_time:.2f}s") return result except Exception as e: logger.error(f"Pipeline processing failed: {str(e)}") # Create error result error_result = ProcessingPipelineResult( document_id=document_id, file_detection=FileDetectionResult( file_type=MedicalFileType.UNKNOWN, confidence=0.0, detected_features=["processing_error"], mime_type="application/octet-stream", file_size=0, metadata={"error": str(e)}, recommended_extractor="error_handler" ), deidentification_result=None, extraction_result=None, structured_data={"error": str(e), "traceback": traceback.format_exc()}, validation_result=ValidationResult(is_valid=False, validation_errors=[str(e)]), model_routing={"error": str(e)}, processing_time=time.time() - start_time, pipeline_metadata={"error": str(e), "processing_timestamp": time.time()} ) # Update statistics self._update_statistics(error_result, False) return error_result def _detect_and_analyze_file(self, file_path: str) -> FileDetectionResult: """Detect file type and characteristics""" try: result = self.file_detector.detect_file_type(file_path) logger.info(f"File detected: {result.file_type.value} (confidence: {result.confidence:.2f})") return result except Exception as e: logger.error(f"File detection error: {str(e)}") raise def _perform_phi_deidentification(self, file_path: str, file_detection: FileDetectionResult) -> Optional[DeidentificationResult]: """Perform PHI de-identification if needed""" try: # Determine document type for PHI processing doc_type_mapping = { MedicalFileType.PDF_CLINICAL: "clinical_notes", MedicalFileType.PDF_RADIOLOGY: "radiology", MedicalFileType.PDF_LABORATORY: "laboratory", MedicalFileType.PDF_ECG_REPORT: "ecg", MedicalFileType.DICOM_CT: "radiology", MedicalFileType.DICOM_MRI: "radiology", MedicalFileType.DICOM_XRAY: "radiology", MedicalFileType.DICOM_ULTRASOUND: "radiology", MedicalFileType.ECG_XML: "ecg", MedicalFileType.ECG_SCPE: "ecg", MedicalFileType.ECG_CSV: "ecg" } doc_type = doc_type_mapping.get(file_detection.file_type, "general") # Read file content for PHI detection with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() if content: result = self.phi_deidentifier.deidentify_text(content, doc_type) logger.info(f"PHI de-identification completed: {len(result.phi_matches)} PHI entities found") return result else: logger.warning("No text content found for PHI de-identification") return None except Exception as e: logger.error(f"PHI de-identification error: {str(e)}") return None def _extract_structured_data(self, file_path: str, file_detection: FileDetectionResult, document_type: str) -> Any: """Extract structured data based on file type""" try: # Route to appropriate extractor based on file type if file_detection.file_type in [MedicalFileType.PDF_CLINICAL, MedicalFileType.PDF_RADIOLOGY, MedicalFileType.PDF_LABORATORY, MedicalFileType.PDF_ECG_REPORT]: # PDF processing doc_type = "unknown" if file_detection.file_type == MedicalFileType.PDF_RADIOLOGY: doc_type = "radiology" elif file_detection.file_type == MedicalFileType.PDF_LABORATORY: doc_type = "laboratory" elif file_detection.file_type == MedicalFileType.PDF_ECG_REPORT: doc_type = "ecg_report" elif file_detection.file_type == MedicalFileType.PDF_CLINICAL: doc_type = "clinical_notes" result = self.pdf_processor.process_pdf(file_path, doc_type) logger.info(f"PDF processing completed: {result.extraction_method}") return result elif file_detection.file_type in [MedicalFileType.DICOM_CT, MedicalFileType.DICOM_MRI, MedicalFileType.DICOM_XRAY, MedicalFileType.DICOM_ULTRASOUND]: # DICOM processing result = self.dicom_processor.process_dicom_file(file_path) logger.info(f"DICOM processing completed: {result.modality}") return result elif file_detection.file_type in [MedicalFileType.ECG_XML, MedicalFileType.ECG_SCPE, MedicalFileType.ECG_CSV]: # ECG processing format_mapping = { MedicalFileType.ECG_XML: "xml", MedicalFileType.ECG_SCPE: "scp", MedicalFileType.ECG_CSV: "csv" } ecg_format = format_mapping.get(file_detection.file_type, "auto") result = self.ecg_processor.process_ecg_file(file_path, ecg_format) logger.info(f"ECG processing completed: {len(result.lead_names)} leads") return result else: raise ValueError(f"No appropriate extractor for file type: {file_detection.file_type}") except Exception as e: logger.error(f"Data extraction error: {str(e)}") raise def _validate_extracted_data(self, extraction_result: Any) -> ValidationResult: """Validate extracted data against medical schemas""" if not self.config["enable_validation"]: return ValidationResult(is_valid=True, compliance_score=1.0) try: # Convert extraction result to dictionary format if hasattr(extraction_result, 'structured_data'): # PDF extraction result structured_data = extraction_result.structured_data elif hasattr(extraction_result, 'metadata') and hasattr(extraction_result, 'confidence_score'): # DICOM or ECG processing result structured_data = asdict(extraction_result) else: structured_data = {"raw_data": extraction_result} # Determine document type from extraction result doc_type = "unknown" if "document_type" in structured_data: doc_type = structured_data["document_type"] elif "modality" in structured_data: doc_type = "radiology" elif "signal_data" in structured_data: doc_type = "ECG" # Add metadata for validation if "metadata" not in structured_data: structured_data["metadata"] = { "source_type": doc_type, "extraction_timestamp": time.time() } # Validate against schema validation_result = validate_document_schema(structured_data) if validation_result.is_valid: logger.info(f"Schema validation passed: {doc_type}") else: logger.warning(f"Schema validation failed: {validation_result.validation_errors}") return validation_result except Exception as e: logger.error(f"Validation error: {str(e)}") return ValidationResult( is_valid=False, validation_errors=[str(e)], compliance_score=0.0 ) def _determine_model_routing(self, extraction_result: Any, validation_result: ValidationResult) -> Dict[str, Any]: """Determine appropriate AI model routing""" if not self.config["enable_model_routing"]: return {"routing_disabled": True} try: # Extract document data for routing decision if hasattr(extraction_result, 'structured_data'): structured_data = extraction_result.structured_data else: structured_data = asdict(extraction_result) # Use schema routing function recommended_model = route_to_specialized_model(structured_data) routing_info = { "recommended_model": recommended_model, "validation_passed": validation_result.is_valid, "confidence_threshold_met": validation_result.compliance_score > 0.6, "requires_human_review": validation_result.compliance_score < 0.85, "routing_confidence": validation_result.compliance_score } logger.info(f"Model routing: {recommended_model} (confidence: {validation_result.compliance_score:.2f})") return routing_info except Exception as e: logger.error(f"Model routing error: {str(e)}") return {"error": str(e), "fallback_model": "generic_processor"} def _compile_structured_data(self, extraction_result: Any, deidentification_result: Optional[DeidentificationResult]) -> Dict[str, Any]: """Compile final structured data output""" try: # Start with extraction result if hasattr(extraction_result, 'structured_data'): structured_data = extraction_result.structured_data.copy() else: structured_data = asdict(extraction_result) # Add de-identification information if deidentification_result: structured_data["phi_deidentification"] = { "phi_entities_removed": len(deidentification_result.phi_matches), "deidentification_method": deidentification_result.anonymization_method, "original_hash": deidentification_result.hash_original, "compliance_level": deidentification_result.compliance_level } # Add extraction metadata if hasattr(extraction_result, 'metadata'): structured_data["extraction_metadata"] = extraction_result.metadata # Add confidence scores if hasattr(extraction_result, 'confidence_scores'): structured_data["extraction_confidence"] = extraction_result.confidence_scores return structured_data except Exception as e: logger.error(f"Data compilation error: {str(e)}") return {"error": str(e)} def _generate_document_id(self, file_path: str) -> str: """Generate unique document ID""" import hashlib file_stat = os.stat(file_path) identifier = f"{file_path}_{file_stat.st_size}_{file_stat.st_mtime}" return hashlib.md5(identifier.encode()).hexdigest()[:12] def _update_statistics(self, result: ProcessingPipelineResult, success: bool): """Update pipeline statistics""" self.stats["total_processed"] += 1 if success: self.stats["successful_processing"] += 1 if result.deidentification_result: self.stats["phi_deidentified"] += 1 if result.validation_result.is_valid: self.stats["validation_passed"] += 1 self.stats["processing_times"].append(result.processing_time) # Track errors if not success: error_type = type(result.structured_data.get("error", Exception())).__name__ self.stats["error_counts"][error_type] = self.stats["error_counts"].get(error_type, 0) + 1 def get_pipeline_statistics(self) -> Dict[str, Any]: """Get comprehensive pipeline statistics""" processing_times = self.stats["processing_times"] return { "total_documents_processed": self.stats["total_processed"], "successful_processing_rate": self.stats["successful_processing"] / max(self.stats["total_processed"], 1), "phi_deidentification_rate": self.stats["phi_deidentified"] / max(self.stats["total_processed"], 1), "validation_pass_rate": self.stats["validation_passed"] / max(self.stats["total_processed"], 1), "average_processing_time": sum(processing_times) / len(processing_times) if processing_times else 0, "error_breakdown": self.stats["error_counts"], "pipeline_health": "healthy" if self.stats["successful_processing"] > self.stats["total_processed"] * 0.9 else "degraded" } def batch_process(self, file_paths: List[str], document_types: Optional[List[str]] = None) -> List[ProcessingPipelineResult]: """Process multiple documents in batch""" if document_types is None: document_types = ["auto"] * len(file_paths) results = [] for i, (file_path, doc_type) in enumerate(zip(file_paths, document_types)): logger.info(f"Processing batch document {i+1}/{len(file_paths)}: {file_path}") try: result = self.process_document(file_path, doc_type) results.append(result) except Exception as e: logger.error(f"Batch processing error for {file_path}: {str(e)}") # Create error result error_result = ProcessingPipelineResult( document_id=self._generate_document_id(file_path), file_detection=FileDetectionResult( file_type=MedicalFileType.UNKNOWN, confidence=0.0, detected_features=["batch_error"], mime_type="application/octet-stream", file_size=0, metadata={"error": str(e)}, recommended_extractor="error_handler" ), deidentification_result=None, extraction_result=None, structured_data={"error": str(e), "batch_processing_failed": True}, validation_result=ValidationResult(is_valid=False, validation_errors=[str(e)]), model_routing={"error": str(e)}, processing_time=0.0, pipeline_metadata={"batch_position": i, "error": str(e)} ) results.append(error_result) logger.info(f"Batch processing completed: {len(results)} documents processed") return results def export_pipeline_result(self, result: ProcessingPipelineResult, output_path: str): """Export pipeline result to JSON file""" try: export_data = { "document_id": result.document_id, "file_detection": asdict(result.file_detection), "deidentification_result": asdict(result.deidentification_result) if result.deidentification_result else None, "extraction_result": self._serialize_extraction_result(result.extraction_result), "structured_data": result.structured_data, "validation_result": asdict(result.validation_result), "model_routing": result.model_routing, "processing_time": result.processing_time, "pipeline_metadata": result.pipeline_metadata, "export_timestamp": time.time() } with open(output_path, 'w') as f: json.dump(export_data, f, indent=2, default=str) logger.info(f"Pipeline result exported to: {output_path}") except Exception as e: logger.error(f"Export error: {str(e)}") def _serialize_extraction_result(self, extraction_result: Any) -> Dict[str, Any]: """Serialize extraction result for JSON export""" try: if hasattr(extraction_result, '__dict__'): return asdict(extraction_result) else: return {"data": extraction_result} except: return {"error": "Could not serialize extraction result"} # Export main classes __all__ = [ "MedicalPreprocessingPipeline", "ProcessingPipelineResult" ]