|
|
""" |
|
|
Medical Preprocessing Pipeline - Phase 2 |
|
|
Central orchestration layer for medical file processing and extraction. |
|
|
|
|
|
This module coordinates all preprocessing components including file detection, |
|
|
PHI de-identification, and modality-specific extraction to produce structured data |
|
|
for AI model processing. |
|
|
|
|
|
Author: MiniMax Agent |
|
|
Date: 2025-10-29 |
|
|
Version: 1.0.0 |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import logging |
|
|
import time |
|
|
from typing import Dict, List, Optional, Any, Tuple |
|
|
from dataclasses import dataclass, asdict |
|
|
from pathlib import Path |
|
|
import traceback |
|
|
|
|
|
from file_detector import MedicalFileDetector, FileDetectionResult, MedicalFileType |
|
|
from phi_deidentifier import MedicalPHIDeidentifier, DeidentificationResult, PHICategory |
|
|
from pdf_extractor import MedicalPDFProcessor, ExtractionResult |
|
|
from dicom_processor import DICOMProcessor, DICOMProcessingResult |
|
|
from ecg_processor import ECGSignalProcessor, ECGProcessingResult |
|
|
from medical_schemas import ( |
|
|
ValidationResult, validate_document_schema, route_to_specialized_model, |
|
|
MedicalDocumentMetadata, ConfidenceScore |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ProcessingPipelineResult: |
|
|
"""Result of complete preprocessing pipeline""" |
|
|
document_id: str |
|
|
file_detection: FileDetectionResult |
|
|
deidentification_result: Optional[DeidentificationResult] |
|
|
extraction_result: Any |
|
|
structured_data: Dict[str, Any] |
|
|
validation_result: ValidationResult |
|
|
model_routing: Dict[str, Any] |
|
|
processing_time: float |
|
|
pipeline_metadata: Dict[str, Any] |
|
|
|
|
|
|
|
|
class MedicalPreprocessingPipeline: |
|
|
"""Main preprocessing pipeline for medical documents""" |
|
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
|
self.config = config or self._default_config() |
|
|
|
|
|
|
|
|
self.file_detector = MedicalFileDetector() |
|
|
self.phi_deidentifier = MedicalPHIDeidentifier(self.config.get('phi_config', {})) |
|
|
self.pdf_processor = MedicalPDFProcessor() |
|
|
self.dicom_processor = DICOMProcessor() |
|
|
self.ecg_processor = ECGSignalProcessor() |
|
|
|
|
|
|
|
|
self.stats = { |
|
|
"total_processed": 0, |
|
|
"successful_processing": 0, |
|
|
"phi_deidentified": 0, |
|
|
"validation_passed": 0, |
|
|
"processing_times": [], |
|
|
"error_counts": {} |
|
|
} |
|
|
|
|
|
logger.info("Medical Preprocessing Pipeline initialized") |
|
|
|
|
|
def _default_config(self) -> Dict[str, Any]: |
|
|
"""Default pipeline configuration""" |
|
|
return { |
|
|
"enable_phi_deidentification": True, |
|
|
"enable_validation": True, |
|
|
"enable_model_routing": True, |
|
|
"max_file_size_mb": 100, |
|
|
"supported_formats": [".pdf", ".dcm", ".dicom", ".xml", ".scp", ".csv"], |
|
|
"phi_config": { |
|
|
"compliance_level": "HIPAA", |
|
|
"use_hashing": True, |
|
|
"redaction_method": "placeholder" |
|
|
}, |
|
|
"validation_strict_mode": False, |
|
|
"output_format": "schema_compliant" |
|
|
} |
|
|
|
|
|
def process_document(self, file_path: str, document_type: str = "auto") -> ProcessingPipelineResult: |
|
|
""" |
|
|
Process a single medical document through the complete pipeline |
|
|
|
|
|
Args: |
|
|
file_path: Path to medical document |
|
|
document_type: Document type hint ("auto", "radiology", "laboratory", etc.) |
|
|
|
|
|
Returns: |
|
|
ProcessingPipelineResult with complete processing results |
|
|
""" |
|
|
start_time = time.time() |
|
|
document_id = self._generate_document_id(file_path) |
|
|
|
|
|
try: |
|
|
logger.info(f"Starting processing pipeline for document: {file_path}") |
|
|
|
|
|
|
|
|
file_detection = self._detect_and_analyze_file(file_path) |
|
|
|
|
|
|
|
|
deidentification_result = None |
|
|
if self.config["enable_phi_deidentification"]: |
|
|
deidentification_result = self._perform_phi_deidentification(file_path, file_detection) |
|
|
|
|
|
|
|
|
extraction_result = self._extract_structured_data(file_path, file_detection, document_type) |
|
|
|
|
|
|
|
|
validation_result = self._validate_extracted_data(extraction_result) |
|
|
|
|
|
|
|
|
model_routing = self._determine_model_routing(extraction_result, validation_result) |
|
|
|
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
pipeline_metadata = { |
|
|
"pipeline_version": "1.0.0", |
|
|
"processing_timestamp": time.time(), |
|
|
"file_size": os.path.getsize(file_path) if os.path.exists(file_path) else 0, |
|
|
"config_used": self.config |
|
|
} |
|
|
|
|
|
result = ProcessingPipelineResult( |
|
|
document_id=document_id, |
|
|
file_detection=file_detection, |
|
|
deidentification_result=deidentification_result, |
|
|
extraction_result=extraction_result, |
|
|
structured_data=self._compile_structured_data(extraction_result, deidentification_result), |
|
|
validation_result=validation_result, |
|
|
model_routing=model_routing, |
|
|
processing_time=processing_time, |
|
|
pipeline_metadata=pipeline_metadata |
|
|
) |
|
|
|
|
|
|
|
|
self._update_statistics(result, True) |
|
|
|
|
|
logger.info(f"Pipeline processing completed successfully in {processing_time:.2f}s") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Pipeline processing failed: {str(e)}") |
|
|
|
|
|
|
|
|
error_result = ProcessingPipelineResult( |
|
|
document_id=document_id, |
|
|
file_detection=FileDetectionResult( |
|
|
file_type=MedicalFileType.UNKNOWN, |
|
|
confidence=0.0, |
|
|
detected_features=["processing_error"], |
|
|
mime_type="application/octet-stream", |
|
|
file_size=0, |
|
|
metadata={"error": str(e)}, |
|
|
recommended_extractor="error_handler" |
|
|
), |
|
|
deidentification_result=None, |
|
|
extraction_result=None, |
|
|
structured_data={"error": str(e), "traceback": traceback.format_exc()}, |
|
|
validation_result=ValidationResult(is_valid=False, validation_errors=[str(e)]), |
|
|
model_routing={"error": str(e)}, |
|
|
processing_time=time.time() - start_time, |
|
|
pipeline_metadata={"error": str(e), "processing_timestamp": time.time()} |
|
|
) |
|
|
|
|
|
|
|
|
self._update_statistics(error_result, False) |
|
|
|
|
|
return error_result |
|
|
|
|
|
def _detect_and_analyze_file(self, file_path: str) -> FileDetectionResult: |
|
|
"""Detect file type and characteristics""" |
|
|
try: |
|
|
result = self.file_detector.detect_file_type(file_path) |
|
|
logger.info(f"File detected: {result.file_type.value} (confidence: {result.confidence:.2f})") |
|
|
return result |
|
|
except Exception as e: |
|
|
logger.error(f"File detection error: {str(e)}") |
|
|
raise |
|
|
|
|
|
def _perform_phi_deidentification(self, file_path: str, |
|
|
file_detection: FileDetectionResult) -> Optional[DeidentificationResult]: |
|
|
"""Perform PHI de-identification if needed""" |
|
|
try: |
|
|
|
|
|
doc_type_mapping = { |
|
|
MedicalFileType.PDF_CLINICAL: "clinical_notes", |
|
|
MedicalFileType.PDF_RADIOLOGY: "radiology", |
|
|
MedicalFileType.PDF_LABORATORY: "laboratory", |
|
|
MedicalFileType.PDF_ECG_REPORT: "ecg", |
|
|
MedicalFileType.DICOM_CT: "radiology", |
|
|
MedicalFileType.DICOM_MRI: "radiology", |
|
|
MedicalFileType.DICOM_XRAY: "radiology", |
|
|
MedicalFileType.DICOM_ULTRASOUND: "radiology", |
|
|
MedicalFileType.ECG_XML: "ecg", |
|
|
MedicalFileType.ECG_SCPE: "ecg", |
|
|
MedicalFileType.ECG_CSV: "ecg" |
|
|
} |
|
|
|
|
|
doc_type = doc_type_mapping.get(file_detection.file_type, "general") |
|
|
|
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: |
|
|
content = f.read() |
|
|
|
|
|
if content: |
|
|
result = self.phi_deidentifier.deidentify_text(content, doc_type) |
|
|
logger.info(f"PHI de-identification completed: {len(result.phi_matches)} PHI entities found") |
|
|
return result |
|
|
else: |
|
|
logger.warning("No text content found for PHI de-identification") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"PHI de-identification error: {str(e)}") |
|
|
return None |
|
|
|
|
|
def _extract_structured_data(self, file_path: str, file_detection: FileDetectionResult, |
|
|
document_type: str) -> Any: |
|
|
"""Extract structured data based on file type""" |
|
|
try: |
|
|
|
|
|
if file_detection.file_type in [MedicalFileType.PDF_CLINICAL, MedicalFileType.PDF_RADIOLOGY, |
|
|
MedicalFileType.PDF_LABORATORY, MedicalFileType.PDF_ECG_REPORT]: |
|
|
|
|
|
doc_type = "unknown" |
|
|
if file_detection.file_type == MedicalFileType.PDF_RADIOLOGY: |
|
|
doc_type = "radiology" |
|
|
elif file_detection.file_type == MedicalFileType.PDF_LABORATORY: |
|
|
doc_type = "laboratory" |
|
|
elif file_detection.file_type == MedicalFileType.PDF_ECG_REPORT: |
|
|
doc_type = "ecg_report" |
|
|
elif file_detection.file_type == MedicalFileType.PDF_CLINICAL: |
|
|
doc_type = "clinical_notes" |
|
|
|
|
|
result = self.pdf_processor.process_pdf(file_path, doc_type) |
|
|
logger.info(f"PDF processing completed: {result.extraction_method}") |
|
|
return result |
|
|
|
|
|
elif file_detection.file_type in [MedicalFileType.DICOM_CT, MedicalFileType.DICOM_MRI, |
|
|
MedicalFileType.DICOM_XRAY, MedicalFileType.DICOM_ULTRASOUND]: |
|
|
|
|
|
result = self.dicom_processor.process_dicom_file(file_path) |
|
|
logger.info(f"DICOM processing completed: {result.modality}") |
|
|
return result |
|
|
|
|
|
elif file_detection.file_type in [MedicalFileType.ECG_XML, MedicalFileType.ECG_SCPE, |
|
|
MedicalFileType.ECG_CSV]: |
|
|
|
|
|
format_mapping = { |
|
|
MedicalFileType.ECG_XML: "xml", |
|
|
MedicalFileType.ECG_SCPE: "scp", |
|
|
MedicalFileType.ECG_CSV: "csv" |
|
|
} |
|
|
ecg_format = format_mapping.get(file_detection.file_type, "auto") |
|
|
|
|
|
result = self.ecg_processor.process_ecg_file(file_path, ecg_format) |
|
|
logger.info(f"ECG processing completed: {len(result.lead_names)} leads") |
|
|
return result |
|
|
|
|
|
else: |
|
|
raise ValueError(f"No appropriate extractor for file type: {file_detection.file_type}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Data extraction error: {str(e)}") |
|
|
raise |
|
|
|
|
|
def _validate_extracted_data(self, extraction_result: Any) -> ValidationResult: |
|
|
"""Validate extracted data against medical schemas""" |
|
|
if not self.config["enable_validation"]: |
|
|
return ValidationResult(is_valid=True, compliance_score=1.0) |
|
|
|
|
|
try: |
|
|
|
|
|
if hasattr(extraction_result, 'structured_data'): |
|
|
|
|
|
structured_data = extraction_result.structured_data |
|
|
elif hasattr(extraction_result, 'metadata') and hasattr(extraction_result, 'confidence_score'): |
|
|
|
|
|
structured_data = asdict(extraction_result) |
|
|
else: |
|
|
structured_data = {"raw_data": extraction_result} |
|
|
|
|
|
|
|
|
doc_type = "unknown" |
|
|
if "document_type" in structured_data: |
|
|
doc_type = structured_data["document_type"] |
|
|
elif "modality" in structured_data: |
|
|
doc_type = "radiology" |
|
|
elif "signal_data" in structured_data: |
|
|
doc_type = "ECG" |
|
|
|
|
|
|
|
|
if "metadata" not in structured_data: |
|
|
structured_data["metadata"] = { |
|
|
"source_type": doc_type, |
|
|
"extraction_timestamp": time.time() |
|
|
} |
|
|
|
|
|
|
|
|
validation_result = validate_document_schema(structured_data) |
|
|
|
|
|
if validation_result.is_valid: |
|
|
logger.info(f"Schema validation passed: {doc_type}") |
|
|
else: |
|
|
logger.warning(f"Schema validation failed: {validation_result.validation_errors}") |
|
|
|
|
|
return validation_result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Validation error: {str(e)}") |
|
|
return ValidationResult( |
|
|
is_valid=False, |
|
|
validation_errors=[str(e)], |
|
|
compliance_score=0.0 |
|
|
) |
|
|
|
|
|
def _determine_model_routing(self, extraction_result: Any, |
|
|
validation_result: ValidationResult) -> Dict[str, Any]: |
|
|
"""Determine appropriate AI model routing""" |
|
|
if not self.config["enable_model_routing"]: |
|
|
return {"routing_disabled": True} |
|
|
|
|
|
try: |
|
|
|
|
|
if hasattr(extraction_result, 'structured_data'): |
|
|
structured_data = extraction_result.structured_data |
|
|
else: |
|
|
structured_data = asdict(extraction_result) |
|
|
|
|
|
|
|
|
recommended_model = route_to_specialized_model(structured_data) |
|
|
|
|
|
routing_info = { |
|
|
"recommended_model": recommended_model, |
|
|
"validation_passed": validation_result.is_valid, |
|
|
"confidence_threshold_met": validation_result.compliance_score > 0.6, |
|
|
"requires_human_review": validation_result.compliance_score < 0.85, |
|
|
"routing_confidence": validation_result.compliance_score |
|
|
} |
|
|
|
|
|
logger.info(f"Model routing: {recommended_model} (confidence: {validation_result.compliance_score:.2f})") |
|
|
return routing_info |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Model routing error: {str(e)}") |
|
|
return {"error": str(e), "fallback_model": "generic_processor"} |
|
|
|
|
|
def _compile_structured_data(self, extraction_result: Any, |
|
|
deidentification_result: Optional[DeidentificationResult]) -> Dict[str, Any]: |
|
|
"""Compile final structured data output""" |
|
|
try: |
|
|
|
|
|
if hasattr(extraction_result, 'structured_data'): |
|
|
structured_data = extraction_result.structured_data.copy() |
|
|
else: |
|
|
structured_data = asdict(extraction_result) |
|
|
|
|
|
|
|
|
if deidentification_result: |
|
|
structured_data["phi_deidentification"] = { |
|
|
"phi_entities_removed": len(deidentification_result.phi_matches), |
|
|
"deidentification_method": deidentification_result.anonymization_method, |
|
|
"original_hash": deidentification_result.hash_original, |
|
|
"compliance_level": deidentification_result.compliance_level |
|
|
} |
|
|
|
|
|
|
|
|
if hasattr(extraction_result, 'metadata'): |
|
|
structured_data["extraction_metadata"] = extraction_result.metadata |
|
|
|
|
|
|
|
|
if hasattr(extraction_result, 'confidence_scores'): |
|
|
structured_data["extraction_confidence"] = extraction_result.confidence_scores |
|
|
|
|
|
return structured_data |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Data compilation error: {str(e)}") |
|
|
return {"error": str(e)} |
|
|
|
|
|
def _generate_document_id(self, file_path: str) -> str: |
|
|
"""Generate unique document ID""" |
|
|
import hashlib |
|
|
file_stat = os.stat(file_path) |
|
|
identifier = f"{file_path}_{file_stat.st_size}_{file_stat.st_mtime}" |
|
|
return hashlib.md5(identifier.encode()).hexdigest()[:12] |
|
|
|
|
|
def _update_statistics(self, result: ProcessingPipelineResult, success: bool): |
|
|
"""Update pipeline statistics""" |
|
|
self.stats["total_processed"] += 1 |
|
|
|
|
|
if success: |
|
|
self.stats["successful_processing"] += 1 |
|
|
|
|
|
if result.deidentification_result: |
|
|
self.stats["phi_deidentified"] += 1 |
|
|
|
|
|
if result.validation_result.is_valid: |
|
|
self.stats["validation_passed"] += 1 |
|
|
|
|
|
self.stats["processing_times"].append(result.processing_time) |
|
|
|
|
|
|
|
|
if not success: |
|
|
error_type = type(result.structured_data.get("error", Exception())).__name__ |
|
|
self.stats["error_counts"][error_type] = self.stats["error_counts"].get(error_type, 0) + 1 |
|
|
|
|
|
def get_pipeline_statistics(self) -> Dict[str, Any]: |
|
|
"""Get comprehensive pipeline statistics""" |
|
|
processing_times = self.stats["processing_times"] |
|
|
|
|
|
return { |
|
|
"total_documents_processed": self.stats["total_processed"], |
|
|
"successful_processing_rate": self.stats["successful_processing"] / max(self.stats["total_processed"], 1), |
|
|
"phi_deidentification_rate": self.stats["phi_deidentified"] / max(self.stats["total_processed"], 1), |
|
|
"validation_pass_rate": self.stats["validation_passed"] / max(self.stats["total_processed"], 1), |
|
|
"average_processing_time": sum(processing_times) / len(processing_times) if processing_times else 0, |
|
|
"error_breakdown": self.stats["error_counts"], |
|
|
"pipeline_health": "healthy" if self.stats["successful_processing"] > self.stats["total_processed"] * 0.9 else "degraded" |
|
|
} |
|
|
|
|
|
def batch_process(self, file_paths: List[str], document_types: Optional[List[str]] = None) -> List[ProcessingPipelineResult]: |
|
|
"""Process multiple documents in batch""" |
|
|
if document_types is None: |
|
|
document_types = ["auto"] * len(file_paths) |
|
|
|
|
|
results = [] |
|
|
|
|
|
for i, (file_path, doc_type) in enumerate(zip(file_paths, document_types)): |
|
|
logger.info(f"Processing batch document {i+1}/{len(file_paths)}: {file_path}") |
|
|
|
|
|
try: |
|
|
result = self.process_document(file_path, doc_type) |
|
|
results.append(result) |
|
|
except Exception as e: |
|
|
logger.error(f"Batch processing error for {file_path}: {str(e)}") |
|
|
|
|
|
error_result = ProcessingPipelineResult( |
|
|
document_id=self._generate_document_id(file_path), |
|
|
file_detection=FileDetectionResult( |
|
|
file_type=MedicalFileType.UNKNOWN, |
|
|
confidence=0.0, |
|
|
detected_features=["batch_error"], |
|
|
mime_type="application/octet-stream", |
|
|
file_size=0, |
|
|
metadata={"error": str(e)}, |
|
|
recommended_extractor="error_handler" |
|
|
), |
|
|
deidentification_result=None, |
|
|
extraction_result=None, |
|
|
structured_data={"error": str(e), "batch_processing_failed": True}, |
|
|
validation_result=ValidationResult(is_valid=False, validation_errors=[str(e)]), |
|
|
model_routing={"error": str(e)}, |
|
|
processing_time=0.0, |
|
|
pipeline_metadata={"batch_position": i, "error": str(e)} |
|
|
) |
|
|
results.append(error_result) |
|
|
|
|
|
logger.info(f"Batch processing completed: {len(results)} documents processed") |
|
|
return results |
|
|
|
|
|
def export_pipeline_result(self, result: ProcessingPipelineResult, output_path: str): |
|
|
"""Export pipeline result to JSON file""" |
|
|
try: |
|
|
export_data = { |
|
|
"document_id": result.document_id, |
|
|
"file_detection": asdict(result.file_detection), |
|
|
"deidentification_result": asdict(result.deidentification_result) if result.deidentification_result else None, |
|
|
"extraction_result": self._serialize_extraction_result(result.extraction_result), |
|
|
"structured_data": result.structured_data, |
|
|
"validation_result": asdict(result.validation_result), |
|
|
"model_routing": result.model_routing, |
|
|
"processing_time": result.processing_time, |
|
|
"pipeline_metadata": result.pipeline_metadata, |
|
|
"export_timestamp": time.time() |
|
|
} |
|
|
|
|
|
with open(output_path, 'w') as f: |
|
|
json.dump(export_data, f, indent=2, default=str) |
|
|
|
|
|
logger.info(f"Pipeline result exported to: {output_path}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Export error: {str(e)}") |
|
|
|
|
|
def _serialize_extraction_result(self, extraction_result: Any) -> Dict[str, Any]: |
|
|
"""Serialize extraction result for JSON export""" |
|
|
try: |
|
|
if hasattr(extraction_result, '__dict__'): |
|
|
return asdict(extraction_result) |
|
|
else: |
|
|
return {"data": extraction_result} |
|
|
except: |
|
|
return {"error": "Could not serialize extraction result"} |
|
|
|
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
"MedicalPreprocessingPipeline", |
|
|
"ProcessingPipelineResult" |
|
|
] |