|
|
""" |
|
|
File Detection and Routing System - Phase 2 |
|
|
Multi-format medical file detection with confidence scoring and routing logic. |
|
|
|
|
|
This module provides robust file type detection for medical documents including |
|
|
PDFs, DICOM files, ECG signals, and archives with confidence-based routing. |
|
|
|
|
|
Author: MiniMax Agent |
|
|
Date: 2025-10-29 |
|
|
Version: 1.0.0 |
|
|
""" |
|
|
|
|
|
import os |
|
|
import mimetypes |
|
|
import hashlib |
|
|
from typing import Dict, List, Optional, Tuple, Any |
|
|
from pathlib import Path |
|
|
import magic |
|
|
from dataclasses import dataclass |
|
|
from enum import Enum |
|
|
import logging |
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class MedicalFileType(Enum): |
|
|
"""Enumerated medical file types for routing""" |
|
|
PDF_CLINICAL = "pdf_clinical" |
|
|
PDF_RADIOLOGY = "pdf_radiology" |
|
|
PDF_LABORATORY = "pdf_laboratory" |
|
|
PDF_ECG_REPORT = "pdf_ecg_report" |
|
|
DICOM_CT = "dicom_ct" |
|
|
DICOM_MRI = "dicom_mri" |
|
|
DICOM_XRAY = "dicom_xray" |
|
|
DICOM_ULTRASOUND = "dicom_ultrasound" |
|
|
ECG_XML = "ecg_xml" |
|
|
ECG_SCPE = "ecg_scpe" |
|
|
ECG_CSV = "ecg_csv" |
|
|
ECG_WFDB = "ecg_wfdb" |
|
|
ARCHIVE_ZIP = "archive_zip" |
|
|
ARCHIVE_TAR = "archive_tar" |
|
|
IMAGE_TIFF = "image_tiff" |
|
|
IMAGE_JPEG = "image_jpeg" |
|
|
UNKNOWN = "unknown" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class FileDetectionResult: |
|
|
"""Result of file type detection with confidence scoring""" |
|
|
file_type: MedicalFileType |
|
|
confidence: float |
|
|
detected_features: List[str] |
|
|
mime_type: str |
|
|
file_size: int |
|
|
metadata: Dict[str, Any] |
|
|
recommended_extractor: str |
|
|
|
|
|
|
|
|
class MedicalFileDetector: |
|
|
"""Medical file type detection with multi-modal analysis""" |
|
|
|
|
|
def __init__(self): |
|
|
self.known_patterns = self._init_detection_patterns() |
|
|
self.magic = magic.Magic(mime=True) |
|
|
|
|
|
def _init_detection_patterns(self) -> Dict[str, Dict]: |
|
|
"""Initialize detection patterns for various medical file types""" |
|
|
return { |
|
|
|
|
|
"pdf_clinical": { |
|
|
"extensions": [".pdf"], |
|
|
"magic_bytes": [[b"%PDF"]], |
|
|
"keywords": ["clinical", "progress note", "consultation", "assessment", "plan"], |
|
|
"extractor": "pdf_text_extractor" |
|
|
}, |
|
|
"pdf_radiology": { |
|
|
"extensions": [".pdf"], |
|
|
"magic_bytes": [[b"%PDF"]], |
|
|
"keywords": ["radiology", "ct scan", "mri", "x-ray", "imaging", "findings", "impression"], |
|
|
"extractor": "pdf_radiology_extractor" |
|
|
}, |
|
|
"pdf_laboratory": { |
|
|
"extensions": [".pdf"], |
|
|
"magic_bytes": [[b"%PDF"]], |
|
|
"keywords": ["laboratory", "lab results", "blood work", "test results", "reference range"], |
|
|
"extractor": "pdf_laboratory_extractor" |
|
|
}, |
|
|
"pdf_ecg_report": { |
|
|
"extensions": [".pdf"], |
|
|
"magic_bytes": [[b"%PDF"]], |
|
|
"keywords": ["ecg", "ekg", "electrocardiogram", "rhythm", "heart rate", "st segment"], |
|
|
"extractor": "pdf_ecg_extractor" |
|
|
}, |
|
|
|
|
|
|
|
|
"dicom_ct": { |
|
|
"extensions": [".dcm", ".dicom"], |
|
|
"magic_bytes": [[b"DICM"]], |
|
|
"keywords": ["computed tomography", "ct", "slice"], |
|
|
"extractor": "dicom_processor" |
|
|
}, |
|
|
"dicom_mri": { |
|
|
"extensions": [".dcm", ".dicom"], |
|
|
"magic_bytes": [[b"DICM"]], |
|
|
"keywords": ["magnetic resonance", "mri", "t1", "t2", "flair"], |
|
|
"extractor": "dicom_processor" |
|
|
}, |
|
|
"dicom_xray": { |
|
|
"extensions": [".dcm", ".dicom"], |
|
|
"magic_bytes": [[b"DICM"]], |
|
|
"keywords": ["x-ray", "radiograph", "chest", "abdomen", "bone"], |
|
|
"extractor": "dicom_processor" |
|
|
}, |
|
|
"dicom_ultrasound": { |
|
|
"extensions": [".dcm", ".dicom"], |
|
|
"magic_bytes": [[b"DICM"]], |
|
|
"keywords": ["ultrasound", "sonogram", "echocardiogram"], |
|
|
"extractor": "dicom_processor" |
|
|
}, |
|
|
|
|
|
|
|
|
"ecg_xml": { |
|
|
"extensions": [".xml", ".ecg"], |
|
|
"magic_bytes": [[b"<?xml"], [b"<ECG"], [b"<electrocardiogram"]], |
|
|
"keywords": ["ecg", "lead", "signal", "waveform"], |
|
|
"extractor": "ecg_xml_processor" |
|
|
}, |
|
|
"ecg_scpe": { |
|
|
"extensions": [".scp", ".scpe"], |
|
|
"magic_bytes": [[b"SCP-ECG"]], |
|
|
"keywords": ["scp-ecg", "electrocardiogram"], |
|
|
"extractor": "ecg_scp_processor" |
|
|
}, |
|
|
"ecg_csv": { |
|
|
"extensions": [".csv"], |
|
|
"magic_bytes": [], |
|
|
"keywords": ["time", "lead", "voltage", "millivolts", "ecg"], |
|
|
"extractor": "ecg_csv_processor" |
|
|
}, |
|
|
|
|
|
|
|
|
"archive_zip": { |
|
|
"extensions": [".zip"], |
|
|
"magic_bytes": [[b"PK"]], |
|
|
"keywords": [], |
|
|
"extractor": "archive_processor" |
|
|
}, |
|
|
"archive_tar": { |
|
|
"extensions": [".tar", ".gz", ".tgz"], |
|
|
"magic_bytes": [[b"ustar"], [b"\x1f\x8b"]], |
|
|
"keywords": [], |
|
|
"extractor": "archive_processor" |
|
|
}, |
|
|
|
|
|
|
|
|
"image_tiff": { |
|
|
"extensions": [".tiff", ".tif"], |
|
|
"magic_bytes": [[b"II*\x00"], [b"MM\x00*"]], |
|
|
"keywords": [], |
|
|
"extractor": "image_processor" |
|
|
}, |
|
|
"image_jpeg": { |
|
|
"extensions": [".jpg", ".jpeg"], |
|
|
"magic_bytes": [[b"\xff\xd8\xff"]], |
|
|
"keywords": [], |
|
|
"extractor": "image_processor" |
|
|
} |
|
|
} |
|
|
|
|
|
def detect_file_type(self, file_path: str, content_sample: Optional[bytes] = None) -> FileDetectionResult: |
|
|
""" |
|
|
Detect medical file type with confidence scoring |
|
|
|
|
|
Args: |
|
|
file_path: Path to the file |
|
|
content_sample: Optional sample of file content for detection |
|
|
|
|
|
Returns: |
|
|
FileDetectionResult with detected type and confidence |
|
|
""" |
|
|
try: |
|
|
|
|
|
file_size = os.path.getsize(file_path) |
|
|
file_ext = Path(file_path).suffix.lower() |
|
|
detected_features = [] |
|
|
|
|
|
|
|
|
mime_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream" |
|
|
|
|
|
|
|
|
if content_sample is None: |
|
|
with open(file_path, 'rb') as f: |
|
|
content_sample = f.read(min(8192, file_size)) |
|
|
|
|
|
|
|
|
pattern_scores = [] |
|
|
|
|
|
for pattern_name, pattern_config in self.known_patterns.items(): |
|
|
score = 0.0 |
|
|
features = [] |
|
|
|
|
|
|
|
|
if file_ext in pattern_config.get("extensions", []): |
|
|
score += 0.3 |
|
|
features.append(f"extension_{file_ext}") |
|
|
|
|
|
|
|
|
for magic_bytes in pattern_config.get("magic_bytes", []): |
|
|
if magic_bytes in content_sample: |
|
|
score += 0.4 |
|
|
features.append("magic_bytes") |
|
|
break |
|
|
|
|
|
|
|
|
try: |
|
|
content_text = content_sample.decode('utf-8', errors='ignore').lower() |
|
|
for keyword in pattern_config.get("keywords", []): |
|
|
if keyword.lower() in content_text: |
|
|
score += 0.1 |
|
|
features.append(f"keyword_{keyword}") |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
if pattern_name.startswith("dicom") and file_size > 1024*1024: |
|
|
score += 0.1 |
|
|
features.append("size_dicom") |
|
|
|
|
|
if pattern_name.startswith("pdf") and 1024 < file_size < 50*1024*1024: |
|
|
score += 0.1 |
|
|
features.append("size_pdf") |
|
|
|
|
|
if score > 0: |
|
|
pattern_scores.append((pattern_name, score, features)) |
|
|
|
|
|
|
|
|
if pattern_scores: |
|
|
best_pattern, best_score, best_features = max(pattern_scores, key=lambda x: x[1]) |
|
|
file_type = MedicalFileType(best_pattern) |
|
|
confidence = min(best_score, 1.0) |
|
|
detected_features = best_features |
|
|
recommended_extractor = self.known_patterns[best_pattern]["extractor"] |
|
|
else: |
|
|
|
|
|
file_type = MedicalFileType.UNKNOWN |
|
|
confidence = 0.1 |
|
|
detected_features = ["no_pattern_match"] |
|
|
recommended_extractor = "generic_extractor" |
|
|
|
|
|
|
|
|
if file_size < 100: |
|
|
confidence *= 0.5 |
|
|
detected_features.append("very_small_file") |
|
|
elif file_size > 100*1024*1024: |
|
|
confidence *= 0.8 |
|
|
detected_features.append("large_file") |
|
|
|
|
|
metadata = { |
|
|
"file_extension": file_ext, |
|
|
"detection_method": "multi_modal", |
|
|
"content_length": len(content_sample) |
|
|
} |
|
|
|
|
|
logger.info(f"File detection: {file_path} -> {file_type.value} (confidence: {confidence:.2f})") |
|
|
|
|
|
return FileDetectionResult( |
|
|
file_type=file_type, |
|
|
confidence=confidence, |
|
|
detected_features=detected_features, |
|
|
mime_type=mime_type, |
|
|
file_size=file_size, |
|
|
metadata=metadata, |
|
|
recommended_extractor=recommended_extractor |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"File detection error for {file_path}: {str(e)}") |
|
|
return FileDetectionResult( |
|
|
file_type=MedicalFileType.UNKNOWN, |
|
|
confidence=0.0, |
|
|
detected_features=["detection_error"], |
|
|
mime_type="application/octet-stream", |
|
|
file_size=0, |
|
|
metadata={"error": str(e)}, |
|
|
recommended_extractor="error_handler" |
|
|
) |
|
|
|
|
|
def batch_detect(self, file_paths: List[str]) -> List[FileDetectionResult]: |
|
|
"""Detect file types for multiple files""" |
|
|
results = [] |
|
|
for file_path in file_paths: |
|
|
if os.path.exists(file_path): |
|
|
result = self.detect_file_type(file_path) |
|
|
results.append(result) |
|
|
else: |
|
|
logger.warning(f"File not found: {file_path}") |
|
|
return results |
|
|
|
|
|
def get_routing_info(self, detection_result: FileDetectionResult) -> Dict[str, Any]: |
|
|
"""Get routing information for detected file type""" |
|
|
return { |
|
|
"extractor": detection_result.recommended_extractor, |
|
|
"priority": "high" if detection_result.confidence > 0.8 else "medium" if detection_result.confidence > 0.5 else "low", |
|
|
"requires_ocr": detection_result.file_type in [MedicalFileType.PDF_CLINICAL, MedicalFileType.PDF_RADIOLOGY, |
|
|
MedicalFileType.PDF_LABORATORY, MedicalFileType.PDF_ECG_REPORT], |
|
|
"supports_batch": detection_result.file_type in [MedicalFileType.DICOM_CT, MedicalFileType.DICOM_MRI, |
|
|
MedicalFileType.ECG_CSV, MedicalFileType.ARCHIVE_ZIP], |
|
|
"phi_risk": "high" if detection_result.file_type in [MedicalFileType.PDF_CLINICAL, MedicalFileType.PDF_RADIOLOGY, |
|
|
MedicalFileType.PDF_LABORATORY] else "medium" |
|
|
} |
|
|
|
|
|
|
|
|
def calculate_file_hash(file_path: str) -> str: |
|
|
"""Calculate SHA256 hash for file deduplication""" |
|
|
hash_sha256 = hashlib.sha256() |
|
|
try: |
|
|
with open(file_path, "rb") as f: |
|
|
for chunk in iter(lambda: f.read(4096), b""): |
|
|
hash_sha256.update(chunk) |
|
|
return hash_sha256.hexdigest() |
|
|
except Exception as e: |
|
|
logger.error(f"Hash calculation error for {file_path}: {str(e)}") |
|
|
return "" |
|
|
|
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
"MedicalFileDetector", |
|
|
"MedicalFileType", |
|
|
"FileDetectionResult", |
|
|
"calculate_file_hash" |
|
|
] |