Spaces:
Sleeping
Sleeping
| """ | |
| Medical Preprocessing Pipeline - Phase 2 | |
| Central orchestration layer for medical file processing and extraction. | |
| This module coordinates all preprocessing components including file detection, | |
| PHI de-identification, and modality-specific extraction to produce structured data | |
| for AI model processing. | |
| Author: MiniMax Agent | |
| Date: 2025-10-29 | |
| Version: 1.0.0 | |
| """ | |
| import os | |
| import json | |
| import logging | |
| import time | |
| from typing import Dict, List, Optional, Any, Tuple | |
| from dataclasses import dataclass, asdict | |
| from pathlib import Path | |
| import traceback | |
| from file_detector import MedicalFileDetector, FileDetectionResult, MedicalFileType | |
| from phi_deidentifier import MedicalPHIDeidentifier, DeidentificationResult, PHICategory | |
| from pdf_extractor import MedicalPDFProcessor, ExtractionResult | |
| from dicom_processor import DICOMProcessor, DICOMProcessingResult | |
| from ecg_processor import ECGSignalProcessor, ECGProcessingResult | |
| from medical_schemas import ( | |
| ValidationResult, validate_document_schema, route_to_specialized_model, | |
| MedicalDocumentMetadata, ConfidenceScore | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class ProcessingPipelineResult: | |
| """Result of complete preprocessing pipeline""" | |
| document_id: str | |
| file_detection: FileDetectionResult | |
| deidentification_result: Optional[DeidentificationResult] | |
| extraction_result: Any # Can be ExtractionResult, DICOMProcessingResult, or ECGProcessingResult | |
| structured_data: Dict[str, Any] | |
| validation_result: ValidationResult | |
| model_routing: Dict[str, Any] | |
| processing_time: float | |
| pipeline_metadata: Dict[str, Any] | |
| class MedicalPreprocessingPipeline: | |
| """Main preprocessing pipeline for medical documents""" | |
| def __init__(self, config: Optional[Dict[str, Any]] = None): | |
| self.config = config or self._default_config() | |
| # Initialize components | |
| self.file_detector = MedicalFileDetector() | |
| self.phi_deidentifier = MedicalPHIDeidentifier(self.config.get('phi_config', {})) | |
| self.pdf_processor = MedicalPDFProcessor() | |
| self.dicom_processor = DICOMProcessor() | |
| self.ecg_processor = ECGSignalProcessor() | |
| # Pipeline statistics | |
| self.stats = { | |
| "total_processed": 0, | |
| "successful_processing": 0, | |
| "phi_deidentified": 0, | |
| "validation_passed": 0, | |
| "processing_times": [], | |
| "error_counts": {} | |
| } | |
| logger.info("Medical Preprocessing Pipeline initialized") | |
| def _default_config(self) -> Dict[str, Any]: | |
| """Default pipeline configuration""" | |
| return { | |
| "enable_phi_deidentification": True, | |
| "enable_validation": True, | |
| "enable_model_routing": True, | |
| "max_file_size_mb": 100, | |
| "supported_formats": [".pdf", ".dcm", ".dicom", ".xml", ".scp", ".csv"], | |
| "phi_config": { | |
| "compliance_level": "HIPAA", | |
| "use_hashing": True, | |
| "redaction_method": "placeholder" | |
| }, | |
| "validation_strict_mode": False, | |
| "output_format": "schema_compliant" | |
| } | |
| def process_document(self, file_path: str, document_type: str = "auto") -> ProcessingPipelineResult: | |
| """ | |
| Process a single medical document through the complete pipeline | |
| Args: | |
| file_path: Path to medical document | |
| document_type: Document type hint ("auto", "radiology", "laboratory", etc.) | |
| Returns: | |
| ProcessingPipelineResult with complete processing results | |
| """ | |
| start_time = time.time() | |
| document_id = self._generate_document_id(file_path) | |
| try: | |
| logger.info(f"Starting processing pipeline for document: {file_path}") | |
| # Step 1: File Detection and Analysis | |
| file_detection = self._detect_and_analyze_file(file_path) | |
| # Step 2: PHI De-identification (if enabled and needed) | |
| deidentification_result = None | |
| if self.config["enable_phi_deidentification"]: | |
| deidentification_result = self._perform_phi_deidentification(file_path, file_detection) | |
| # Step 3: Extract Structured Data | |
| extraction_result = self._extract_structured_data(file_path, file_detection, document_type) | |
| # Step 4: Validate Against Schema | |
| validation_result = self._validate_extracted_data(extraction_result) | |
| # Step 5: Model Routing | |
| model_routing = self._determine_model_routing(extraction_result, validation_result) | |
| # Step 6: Compile Final Results | |
| processing_time = time.time() - start_time | |
| pipeline_metadata = { | |
| "pipeline_version": "1.0.0", | |
| "processing_timestamp": time.time(), | |
| "file_size": os.path.getsize(file_path) if os.path.exists(file_path) else 0, | |
| "config_used": self.config | |
| } | |
| result = ProcessingPipelineResult( | |
| document_id=document_id, | |
| file_detection=file_detection, | |
| deidentification_result=deidentification_result, | |
| extraction_result=extraction_result, | |
| structured_data=self._compile_structured_data(extraction_result, deidentification_result), | |
| validation_result=validation_result, | |
| model_routing=model_routing, | |
| processing_time=processing_time, | |
| pipeline_metadata=pipeline_metadata | |
| ) | |
| # Update statistics | |
| self._update_statistics(result, True) | |
| logger.info(f"Pipeline processing completed successfully in {processing_time:.2f}s") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Pipeline processing failed: {str(e)}") | |
| # Create error result | |
| error_result = ProcessingPipelineResult( | |
| document_id=document_id, | |
| file_detection=FileDetectionResult( | |
| file_type=MedicalFileType.UNKNOWN, | |
| confidence=0.0, | |
| detected_features=["processing_error"], | |
| mime_type="application/octet-stream", | |
| file_size=0, | |
| metadata={"error": str(e)}, | |
| recommended_extractor="error_handler" | |
| ), | |
| deidentification_result=None, | |
| extraction_result=None, | |
| structured_data={"error": str(e), "traceback": traceback.format_exc()}, | |
| validation_result=ValidationResult(is_valid=False, validation_errors=[str(e)]), | |
| model_routing={"error": str(e)}, | |
| processing_time=time.time() - start_time, | |
| pipeline_metadata={"error": str(e), "processing_timestamp": time.time()} | |
| ) | |
| # Update statistics | |
| self._update_statistics(error_result, False) | |
| return error_result | |
| def _detect_and_analyze_file(self, file_path: str) -> FileDetectionResult: | |
| """Detect file type and characteristics""" | |
| try: | |
| result = self.file_detector.detect_file_type(file_path) | |
| logger.info(f"File detected: {result.file_type.value} (confidence: {result.confidence:.2f})") | |
| return result | |
| except Exception as e: | |
| logger.error(f"File detection error: {str(e)}") | |
| raise | |
| def _perform_phi_deidentification(self, file_path: str, | |
| file_detection: FileDetectionResult) -> Optional[DeidentificationResult]: | |
| """Perform PHI de-identification if needed""" | |
| try: | |
| # Determine document type for PHI processing | |
| doc_type_mapping = { | |
| MedicalFileType.PDF_CLINICAL: "clinical_notes", | |
| MedicalFileType.PDF_RADIOLOGY: "radiology", | |
| MedicalFileType.PDF_LABORATORY: "laboratory", | |
| MedicalFileType.PDF_ECG_REPORT: "ecg", | |
| MedicalFileType.DICOM_CT: "radiology", | |
| MedicalFileType.DICOM_MRI: "radiology", | |
| MedicalFileType.DICOM_XRAY: "radiology", | |
| MedicalFileType.DICOM_ULTRASOUND: "radiology", | |
| MedicalFileType.ECG_XML: "ecg", | |
| MedicalFileType.ECG_SCPE: "ecg", | |
| MedicalFileType.ECG_CSV: "ecg" | |
| } | |
| doc_type = doc_type_mapping.get(file_detection.file_type, "general") | |
| # Read file content for PHI detection | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| if content: | |
| result = self.phi_deidentifier.deidentify_text(content, doc_type) | |
| logger.info(f"PHI de-identification completed: {len(result.phi_matches)} PHI entities found") | |
| return result | |
| else: | |
| logger.warning("No text content found for PHI de-identification") | |
| return None | |
| except Exception as e: | |
| logger.error(f"PHI de-identification error: {str(e)}") | |
| return None | |
| def _extract_structured_data(self, file_path: str, file_detection: FileDetectionResult, | |
| document_type: str) -> Any: | |
| """Extract structured data based on file type""" | |
| try: | |
| # Route to appropriate extractor based on file type | |
| if file_detection.file_type in [MedicalFileType.PDF_CLINICAL, MedicalFileType.PDF_RADIOLOGY, | |
| MedicalFileType.PDF_LABORATORY, MedicalFileType.PDF_ECG_REPORT]: | |
| # PDF processing | |
| doc_type = "unknown" | |
| if file_detection.file_type == MedicalFileType.PDF_RADIOLOGY: | |
| doc_type = "radiology" | |
| elif file_detection.file_type == MedicalFileType.PDF_LABORATORY: | |
| doc_type = "laboratory" | |
| elif file_detection.file_type == MedicalFileType.PDF_ECG_REPORT: | |
| doc_type = "ecg_report" | |
| elif file_detection.file_type == MedicalFileType.PDF_CLINICAL: | |
| doc_type = "clinical_notes" | |
| result = self.pdf_processor.process_pdf(file_path, doc_type) | |
| logger.info(f"PDF processing completed: {result.extraction_method}") | |
| return result | |
| elif file_detection.file_type in [MedicalFileType.DICOM_CT, MedicalFileType.DICOM_MRI, | |
| MedicalFileType.DICOM_XRAY, MedicalFileType.DICOM_ULTRASOUND]: | |
| # DICOM processing | |
| result = self.dicom_processor.process_dicom_file(file_path) | |
| logger.info(f"DICOM processing completed: {result.modality}") | |
| return result | |
| elif file_detection.file_type in [MedicalFileType.ECG_XML, MedicalFileType.ECG_SCPE, | |
| MedicalFileType.ECG_CSV]: | |
| # ECG processing | |
| format_mapping = { | |
| MedicalFileType.ECG_XML: "xml", | |
| MedicalFileType.ECG_SCPE: "scp", | |
| MedicalFileType.ECG_CSV: "csv" | |
| } | |
| ecg_format = format_mapping.get(file_detection.file_type, "auto") | |
| result = self.ecg_processor.process_ecg_file(file_path, ecg_format) | |
| logger.info(f"ECG processing completed: {len(result.lead_names)} leads") | |
| return result | |
| else: | |
| raise ValueError(f"No appropriate extractor for file type: {file_detection.file_type}") | |
| except Exception as e: | |
| logger.error(f"Data extraction error: {str(e)}") | |
| raise | |
| def _validate_extracted_data(self, extraction_result: Any) -> ValidationResult: | |
| """Validate extracted data against medical schemas""" | |
| if not self.config["enable_validation"]: | |
| return ValidationResult(is_valid=True, compliance_score=1.0) | |
| try: | |
| # Convert extraction result to dictionary format | |
| if hasattr(extraction_result, 'structured_data'): | |
| # PDF extraction result | |
| structured_data = extraction_result.structured_data | |
| elif hasattr(extraction_result, 'metadata') and hasattr(extraction_result, 'confidence_score'): | |
| # DICOM or ECG processing result | |
| structured_data = asdict(extraction_result) | |
| else: | |
| structured_data = {"raw_data": extraction_result} | |
| # Determine document type from extraction result | |
| doc_type = "unknown" | |
| if "document_type" in structured_data: | |
| doc_type = structured_data["document_type"] | |
| elif "modality" in structured_data: | |
| doc_type = "radiology" | |
| elif "signal_data" in structured_data: | |
| doc_type = "ECG" | |
| # Add metadata for validation | |
| if "metadata" not in structured_data: | |
| structured_data["metadata"] = { | |
| "source_type": doc_type, | |
| "extraction_timestamp": time.time() | |
| } | |
| # Validate against schema | |
| validation_result = validate_document_schema(structured_data) | |
| if validation_result.is_valid: | |
| logger.info(f"Schema validation passed: {doc_type}") | |
| else: | |
| logger.warning(f"Schema validation failed: {validation_result.validation_errors}") | |
| return validation_result | |
| except Exception as e: | |
| logger.error(f"Validation error: {str(e)}") | |
| return ValidationResult( | |
| is_valid=False, | |
| validation_errors=[str(e)], | |
| compliance_score=0.0 | |
| ) | |
| def _determine_model_routing(self, extraction_result: Any, | |
| validation_result: ValidationResult) -> Dict[str, Any]: | |
| """Determine appropriate AI model routing""" | |
| if not self.config["enable_model_routing"]: | |
| return {"routing_disabled": True} | |
| try: | |
| # Extract document data for routing decision | |
| if hasattr(extraction_result, 'structured_data'): | |
| structured_data = extraction_result.structured_data | |
| else: | |
| structured_data = asdict(extraction_result) | |
| # Use schema routing function | |
| recommended_model = route_to_specialized_model(structured_data) | |
| routing_info = { | |
| "recommended_model": recommended_model, | |
| "validation_passed": validation_result.is_valid, | |
| "confidence_threshold_met": validation_result.compliance_score > 0.6, | |
| "requires_human_review": validation_result.compliance_score < 0.85, | |
| "routing_confidence": validation_result.compliance_score | |
| } | |
| logger.info(f"Model routing: {recommended_model} (confidence: {validation_result.compliance_score:.2f})") | |
| return routing_info | |
| except Exception as e: | |
| logger.error(f"Model routing error: {str(e)}") | |
| return {"error": str(e), "fallback_model": "generic_processor"} | |
| def _compile_structured_data(self, extraction_result: Any, | |
| deidentification_result: Optional[DeidentificationResult]) -> Dict[str, Any]: | |
| """Compile final structured data output""" | |
| try: | |
| # Start with extraction result | |
| if hasattr(extraction_result, 'structured_data'): | |
| structured_data = extraction_result.structured_data.copy() | |
| else: | |
| structured_data = asdict(extraction_result) | |
| # Add de-identification information | |
| if deidentification_result: | |
| structured_data["phi_deidentification"] = { | |
| "phi_entities_removed": len(deidentification_result.phi_matches), | |
| "deidentification_method": deidentification_result.anonymization_method, | |
| "original_hash": deidentification_result.hash_original, | |
| "compliance_level": deidentification_result.compliance_level | |
| } | |
| # Add extraction metadata | |
| if hasattr(extraction_result, 'metadata'): | |
| structured_data["extraction_metadata"] = extraction_result.metadata | |
| # Add confidence scores | |
| if hasattr(extraction_result, 'confidence_scores'): | |
| structured_data["extraction_confidence"] = extraction_result.confidence_scores | |
| return structured_data | |
| except Exception as e: | |
| logger.error(f"Data compilation error: {str(e)}") | |
| return {"error": str(e)} | |
| def _generate_document_id(self, file_path: str) -> str: | |
| """Generate unique document ID""" | |
| import hashlib | |
| file_stat = os.stat(file_path) | |
| identifier = f"{file_path}_{file_stat.st_size}_{file_stat.st_mtime}" | |
| return hashlib.md5(identifier.encode()).hexdigest()[:12] | |
| def _update_statistics(self, result: ProcessingPipelineResult, success: bool): | |
| """Update pipeline statistics""" | |
| self.stats["total_processed"] += 1 | |
| if success: | |
| self.stats["successful_processing"] += 1 | |
| if result.deidentification_result: | |
| self.stats["phi_deidentified"] += 1 | |
| if result.validation_result.is_valid: | |
| self.stats["validation_passed"] += 1 | |
| self.stats["processing_times"].append(result.processing_time) | |
| # Track errors | |
| if not success: | |
| error_type = type(result.structured_data.get("error", Exception())).__name__ | |
| self.stats["error_counts"][error_type] = self.stats["error_counts"].get(error_type, 0) + 1 | |
| def get_pipeline_statistics(self) -> Dict[str, Any]: | |
| """Get comprehensive pipeline statistics""" | |
| processing_times = self.stats["processing_times"] | |
| return { | |
| "total_documents_processed": self.stats["total_processed"], | |
| "successful_processing_rate": self.stats["successful_processing"] / max(self.stats["total_processed"], 1), | |
| "phi_deidentification_rate": self.stats["phi_deidentified"] / max(self.stats["total_processed"], 1), | |
| "validation_pass_rate": self.stats["validation_passed"] / max(self.stats["total_processed"], 1), | |
| "average_processing_time": sum(processing_times) / len(processing_times) if processing_times else 0, | |
| "error_breakdown": self.stats["error_counts"], | |
| "pipeline_health": "healthy" if self.stats["successful_processing"] > self.stats["total_processed"] * 0.9 else "degraded" | |
| } | |
| def batch_process(self, file_paths: List[str], document_types: Optional[List[str]] = None) -> List[ProcessingPipelineResult]: | |
| """Process multiple documents in batch""" | |
| if document_types is None: | |
| document_types = ["auto"] * len(file_paths) | |
| results = [] | |
| for i, (file_path, doc_type) in enumerate(zip(file_paths, document_types)): | |
| logger.info(f"Processing batch document {i+1}/{len(file_paths)}: {file_path}") | |
| try: | |
| result = self.process_document(file_path, doc_type) | |
| results.append(result) | |
| except Exception as e: | |
| logger.error(f"Batch processing error for {file_path}: {str(e)}") | |
| # Create error result | |
| error_result = ProcessingPipelineResult( | |
| document_id=self._generate_document_id(file_path), | |
| file_detection=FileDetectionResult( | |
| file_type=MedicalFileType.UNKNOWN, | |
| confidence=0.0, | |
| detected_features=["batch_error"], | |
| mime_type="application/octet-stream", | |
| file_size=0, | |
| metadata={"error": str(e)}, | |
| recommended_extractor="error_handler" | |
| ), | |
| deidentification_result=None, | |
| extraction_result=None, | |
| structured_data={"error": str(e), "batch_processing_failed": True}, | |
| validation_result=ValidationResult(is_valid=False, validation_errors=[str(e)]), | |
| model_routing={"error": str(e)}, | |
| processing_time=0.0, | |
| pipeline_metadata={"batch_position": i, "error": str(e)} | |
| ) | |
| results.append(error_result) | |
| logger.info(f"Batch processing completed: {len(results)} documents processed") | |
| return results | |
| def export_pipeline_result(self, result: ProcessingPipelineResult, output_path: str): | |
| """Export pipeline result to JSON file""" | |
| try: | |
| export_data = { | |
| "document_id": result.document_id, | |
| "file_detection": asdict(result.file_detection), | |
| "deidentification_result": asdict(result.deidentification_result) if result.deidentification_result else None, | |
| "extraction_result": self._serialize_extraction_result(result.extraction_result), | |
| "structured_data": result.structured_data, | |
| "validation_result": asdict(result.validation_result), | |
| "model_routing": result.model_routing, | |
| "processing_time": result.processing_time, | |
| "pipeline_metadata": result.pipeline_metadata, | |
| "export_timestamp": time.time() | |
| } | |
| with open(output_path, 'w') as f: | |
| json.dump(export_data, f, indent=2, default=str) | |
| logger.info(f"Pipeline result exported to: {output_path}") | |
| except Exception as e: | |
| logger.error(f"Export error: {str(e)}") | |
| def _serialize_extraction_result(self, extraction_result: Any) -> Dict[str, Any]: | |
| """Serialize extraction result for JSON export""" | |
| try: | |
| if hasattr(extraction_result, '__dict__'): | |
| return asdict(extraction_result) | |
| else: | |
| return {"data": extraction_result} | |
| except: | |
| return {"error": "Could not serialize extraction result"} | |
| # Export main classes | |
| __all__ = [ | |
| "MedicalPreprocessingPipeline", | |
| "ProcessingPipelineResult" | |
| ] |