Spaces:
Sleeping
Sleeping
| """ | |
| File Detection and Routing System - Phase 2 | |
| Multi-format medical file detection with confidence scoring and routing logic. | |
| This module provides robust file type detection for medical documents including | |
| PDFs, DICOM files, ECG signals, and archives with confidence-based routing. | |
| Author: MiniMax Agent | |
| Date: 2025-10-29 | |
| Version: 1.0.0 | |
| """ | |
| import os | |
| import mimetypes | |
| import hashlib | |
| from typing import Dict, List, Optional, Tuple, Any | |
| from pathlib import Path | |
| import magic | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| import logging | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| class MedicalFileType(Enum): | |
| """Enumerated medical file types for routing""" | |
| PDF_CLINICAL = "pdf_clinical" | |
| PDF_RADIOLOGY = "pdf_radiology" | |
| PDF_LABORATORY = "pdf_laboratory" | |
| PDF_ECG_REPORT = "pdf_ecg_report" | |
| DICOM_CT = "dicom_ct" | |
| DICOM_MRI = "dicom_mri" | |
| DICOM_XRAY = "dicom_xray" | |
| DICOM_ULTRASOUND = "dicom_ultrasound" | |
| ECG_XML = "ecg_xml" | |
| ECG_SCPE = "ecg_scpe" | |
| ECG_CSV = "ecg_csv" | |
| ECG_WFDB = "ecg_wfdb" | |
| ARCHIVE_ZIP = "archive_zip" | |
| ARCHIVE_TAR = "archive_tar" | |
| IMAGE_TIFF = "image_tiff" | |
| IMAGE_JPEG = "image_jpeg" | |
| UNKNOWN = "unknown" | |
| class FileDetectionResult: | |
| """Result of file type detection with confidence scoring""" | |
| file_type: MedicalFileType | |
| confidence: float | |
| detected_features: List[str] | |
| mime_type: str | |
| file_size: int | |
| metadata: Dict[str, Any] | |
| recommended_extractor: str | |
| class MedicalFileDetector: | |
| """Medical file type detection with multi-modal analysis""" | |
| def __init__(self): | |
| self.known_patterns = self._init_detection_patterns() | |
| self.magic = magic.Magic(mime=True) | |
| def _init_detection_patterns(self) -> Dict[str, Dict]: | |
| """Initialize detection patterns for various medical file types""" | |
| return { | |
| # PDF Patterns | |
| "pdf_clinical": { | |
| "extensions": [".pdf"], | |
| "magic_bytes": [[b"%PDF"]], | |
| "keywords": ["clinical", "progress note", "consultation", "assessment", "plan"], | |
| "extractor": "pdf_text_extractor" | |
| }, | |
| "pdf_radiology": { | |
| "extensions": [".pdf"], | |
| "magic_bytes": [[b"%PDF"]], | |
| "keywords": ["radiology", "ct scan", "mri", "x-ray", "imaging", "findings", "impression"], | |
| "extractor": "pdf_radiology_extractor" | |
| }, | |
| "pdf_laboratory": { | |
| "extensions": [".pdf"], | |
| "magic_bytes": [[b"%PDF"]], | |
| "keywords": ["laboratory", "lab results", "blood work", "test results", "reference range"], | |
| "extractor": "pdf_laboratory_extractor" | |
| }, | |
| "pdf_ecg_report": { | |
| "extensions": [".pdf"], | |
| "magic_bytes": [[b"%PDF"]], | |
| "keywords": ["ecg", "ekg", "electrocardiogram", "rhythm", "heart rate", "st segment"], | |
| "extractor": "pdf_ecg_extractor" | |
| }, | |
| # DICOM Patterns | |
| "dicom_ct": { | |
| "extensions": [".dcm", ".dicom"], | |
| "magic_bytes": [[b"DICM"]], | |
| "keywords": ["computed tomography", "ct", "slice"], | |
| "extractor": "dicom_processor" | |
| }, | |
| "dicom_mri": { | |
| "extensions": [".dcm", ".dicom"], | |
| "magic_bytes": [[b"DICM"]], | |
| "keywords": ["magnetic resonance", "mri", "t1", "t2", "flair"], | |
| "extractor": "dicom_processor" | |
| }, | |
| "dicom_xray": { | |
| "extensions": [".dcm", ".dicom"], | |
| "magic_bytes": [[b"DICM"]], | |
| "keywords": ["x-ray", "radiograph", "chest", "abdomen", "bone"], | |
| "extractor": "dicom_processor" | |
| }, | |
| "dicom_ultrasound": { | |
| "extensions": [".dcm", ".dicom"], | |
| "magic_bytes": [[b"DICM"]], | |
| "keywords": ["ultrasound", "sonogram", "echocardiogram"], | |
| "extractor": "dicom_processor" | |
| }, | |
| # ECG File Patterns | |
| "ecg_xml": { | |
| "extensions": [".xml", ".ecg"], | |
| "magic_bytes": [[b"<?xml"], [b"<ECG"], [b"<electrocardiogram"]], | |
| "keywords": ["ecg", "lead", "signal", "waveform"], | |
| "extractor": "ecg_xml_processor" | |
| }, | |
| "ecg_scpe": { | |
| "extensions": [".scp", ".scpe"], | |
| "magic_bytes": [[b"SCP-ECG"]], | |
| "keywords": ["scp-ecg", "electrocardiogram"], | |
| "extractor": "ecg_scp_processor" | |
| }, | |
| "ecg_csv": { | |
| "extensions": [".csv"], | |
| "magic_bytes": [], | |
| "keywords": ["time", "lead", "voltage", "millivolts", "ecg"], | |
| "extractor": "ecg_csv_processor" | |
| }, | |
| # Archive Patterns | |
| "archive_zip": { | |
| "extensions": [".zip"], | |
| "magic_bytes": [[b"PK"]], | |
| "keywords": [], | |
| "extractor": "archive_processor" | |
| }, | |
| "archive_tar": { | |
| "extensions": [".tar", ".gz", ".tgz"], | |
| "magic_bytes": [[b"ustar"], [b"\x1f\x8b"]], | |
| "keywords": [], | |
| "extractor": "archive_processor" | |
| }, | |
| # Image Patterns | |
| "image_tiff": { | |
| "extensions": [".tiff", ".tif"], | |
| "magic_bytes": [[b"II*\x00"], [b"MM\x00*"]], | |
| "keywords": [], | |
| "extractor": "image_processor" | |
| }, | |
| "image_jpeg": { | |
| "extensions": [".jpg", ".jpeg"], | |
| "magic_bytes": [[b"\xff\xd8\xff"]], | |
| "keywords": [], | |
| "extractor": "image_processor" | |
| } | |
| } | |
| def detect_file_type(self, file_path: str, content_sample: Optional[bytes] = None) -> FileDetectionResult: | |
| """ | |
| Detect medical file type with confidence scoring | |
| Args: | |
| file_path: Path to the file | |
| content_sample: Optional sample of file content for detection | |
| Returns: | |
| FileDetectionResult with detected type and confidence | |
| """ | |
| try: | |
| # Get basic file info | |
| file_size = os.path.getsize(file_path) | |
| file_ext = Path(file_path).suffix.lower() | |
| detected_features = [] | |
| # Try mime type detection | |
| mime_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream" | |
| # Get file content sample if not provided | |
| if content_sample is None: | |
| with open(file_path, 'rb') as f: | |
| content_sample = f.read(min(8192, file_size)) # Read first 8KB | |
| # Analyze against known patterns | |
| pattern_scores = [] | |
| for pattern_name, pattern_config in self.known_patterns.items(): | |
| score = 0.0 | |
| features = [] | |
| # Check file extension | |
| if file_ext in pattern_config.get("extensions", []): | |
| score += 0.3 | |
| features.append(f"extension_{file_ext}") | |
| # Check magic bytes | |
| for magic_bytes in pattern_config.get("magic_bytes", []): | |
| if magic_bytes in content_sample: | |
| score += 0.4 | |
| features.append("magic_bytes") | |
| break | |
| # Check content keywords | |
| try: | |
| content_text = content_sample.decode('utf-8', errors='ignore').lower() | |
| for keyword in pattern_config.get("keywords", []): | |
| if keyword.lower() in content_text: | |
| score += 0.1 | |
| features.append(f"keyword_{keyword}") | |
| except: | |
| pass # Non-text content | |
| # Additional scoring based on file characteristics | |
| if pattern_name.startswith("dicom") and file_size > 1024*1024: # DICOM files are typically >1MB | |
| score += 0.1 | |
| features.append("size_dicom") | |
| if pattern_name.startswith("pdf") and 1024 < file_size < 50*1024*1024: # Reasonable PDF size | |
| score += 0.1 | |
| features.append("size_pdf") | |
| if score > 0: | |
| pattern_scores.append((pattern_name, score, features)) | |
| # Select best match | |
| if pattern_scores: | |
| best_pattern, best_score, best_features = max(pattern_scores, key=lambda x: x[1]) | |
| file_type = MedicalFileType(best_pattern) | |
| confidence = min(best_score, 1.0) # Cap at 1.0 | |
| detected_features = best_features | |
| recommended_extractor = self.known_patterns[best_pattern]["extractor"] | |
| else: | |
| # Fallback to unknown | |
| file_type = MedicalFileType.UNKNOWN | |
| confidence = 0.1 | |
| detected_features = ["no_pattern_match"] | |
| recommended_extractor = "generic_extractor" | |
| # Adjust confidence based on file size | |
| if file_size < 100: # Very small files | |
| confidence *= 0.5 | |
| detected_features.append("very_small_file") | |
| elif file_size > 100*1024*1024: # Very large files | |
| confidence *= 0.8 | |
| detected_features.append("large_file") | |
| metadata = { | |
| "file_extension": file_ext, | |
| "detection_method": "multi_modal", | |
| "content_length": len(content_sample) | |
| } | |
| logger.info(f"File detection: {file_path} -> {file_type.value} (confidence: {confidence:.2f})") | |
| return FileDetectionResult( | |
| file_type=file_type, | |
| confidence=confidence, | |
| detected_features=detected_features, | |
| mime_type=mime_type, | |
| file_size=file_size, | |
| metadata=metadata, | |
| recommended_extractor=recommended_extractor | |
| ) | |
| except Exception as e: | |
| logger.error(f"File detection error for {file_path}: {str(e)}") | |
| return FileDetectionResult( | |
| file_type=MedicalFileType.UNKNOWN, | |
| confidence=0.0, | |
| detected_features=["detection_error"], | |
| mime_type="application/octet-stream", | |
| file_size=0, | |
| metadata={"error": str(e)}, | |
| recommended_extractor="error_handler" | |
| ) | |
| def batch_detect(self, file_paths: List[str]) -> List[FileDetectionResult]: | |
| """Detect file types for multiple files""" | |
| results = [] | |
| for file_path in file_paths: | |
| if os.path.exists(file_path): | |
| result = self.detect_file_type(file_path) | |
| results.append(result) | |
| else: | |
| logger.warning(f"File not found: {file_path}") | |
| return results | |
| def get_routing_info(self, detection_result: FileDetectionResult) -> Dict[str, Any]: | |
| """Get routing information for detected file type""" | |
| return { | |
| "extractor": detection_result.recommended_extractor, | |
| "priority": "high" if detection_result.confidence > 0.8 else "medium" if detection_result.confidence > 0.5 else "low", | |
| "requires_ocr": detection_result.file_type in [MedicalFileType.PDF_CLINICAL, MedicalFileType.PDF_RADIOLOGY, | |
| MedicalFileType.PDF_LABORATORY, MedicalFileType.PDF_ECG_REPORT], | |
| "supports_batch": detection_result.file_type in [MedicalFileType.DICOM_CT, MedicalFileType.DICOM_MRI, | |
| MedicalFileType.ECG_CSV, MedicalFileType.ARCHIVE_ZIP], | |
| "phi_risk": "high" if detection_result.file_type in [MedicalFileType.PDF_CLINICAL, MedicalFileType.PDF_RADIOLOGY, | |
| MedicalFileType.PDF_LABORATORY] else "medium" | |
| } | |
| def calculate_file_hash(file_path: str) -> str: | |
| """Calculate SHA256 hash for file deduplication""" | |
| hash_sha256 = hashlib.sha256() | |
| try: | |
| with open(file_path, "rb") as f: | |
| for chunk in iter(lambda: f.read(4096), b""): | |
| hash_sha256.update(chunk) | |
| return hash_sha256.hexdigest() | |
| except Exception as e: | |
| logger.error(f"Hash calculation error for {file_path}: {str(e)}") | |
| return "" | |
| # Export main classes and functions | |
| __all__ = [ | |
| "MedicalFileDetector", | |
| "MedicalFileType", | |
| "FileDetectionResult", | |
| "calculate_file_hash" | |
| ] |