""" DICOM Medical Imaging Processor - Phase 2 Specialized DICOM file processing with MONAI integration for medical imaging analysis. This module provides DICOM processing capabilities including metadata extraction, image preprocessing, and integration with MONAI models for segmentation. Author: MiniMax Agent Date: 2025-10-29 Version: 1.0.0 """ import os import json import logging import numpy as np from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass from pathlib import Path import pydicom from PIL import Image import torch import SimpleITK as sitk # Optional MONAI imports try: from monai.transforms import ( LoadImage, Compose, ToTensor, Resize, NormalizeIntensity, ScaleIntensityRange, AddChannel ) from monai.networks.nets import UNet from monai.inferers import sliding_window_inference MONAI_AVAILABLE = True except ImportError: MONAI_AVAILABLE = False logger = logging.getLogger(__name__) logger.warning("MONAI not available - using basic DICOM processing only") from medical_schemas import ( MedicalDocumentMetadata, ConfidenceScore, RadiologyAnalysis, RadiologyImageReference, RadiologySegmentation, RadiologyFindings, RadiologyMetrics, ValidationResult ) logger = logging.getLogger(__name__) @dataclass class DICOMProcessingResult: """Result of DICOM processing""" metadata: Dict[str, Any] image_data: np.ndarray pixel_spacing: Optional[Tuple[float, float]] slice_thickness: Optional[float] modality: str body_part: str image_dimensions: Tuple[int, int, int] # (width, height, slices) segmentation_results: Optional[List[Dict[str, Any]]] quantitative_metrics: Optional[Dict[str, float]] confidence_score: float processing_time: float class DICOMProcessor: """DICOM medical imaging processor with MONAI integration""" def __init__(self): self.medical_transforms = None self.segmentation_model = None self._initialize_monai_components() def _initialize_monai_components(self): """Initialize MONAI components if available""" if not MONAI_AVAILABLE: logger.warning("MONAI not available - DICOM processing limited to basic operations") return try: # Define medical image transforms self.medical_transforms = Compose([ LoadImage(image_only=True), AddChannel(), ScaleIntensityRange(a_min=-1000, a_max=1000, b_min=0.0, b_max=1.0, clip=True), Resize(spatial_size=(512, 512, -1)), # Resize to standard size ToTensor() ]) # Initialize UNet for segmentation (can be loaded with pretrained weights) if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") self.segmentation_model = UNet( dimensions=2, in_channels=1, out_channels=1, channels=(16, 32, 64, 128), strides=(2, 2, 2), num_res_units=2 ).to(device) logger.info("MONAI components initialized successfully") except Exception as e: logger.error(f"Failed to initialize MONAI components: {str(e)}") self.medical_transforms = None self.segmentation_model = None def process_dicom_file(self, dicom_path: str) -> DICOMProcessingResult: """ Process a single DICOM file Args: dicom_path: Path to DICOM file Returns: DICOMProcessingResult with processed data """ import time start_time = time.time() try: # Read DICOM file ds = pydicom.dcmread(dicom_path) # Extract metadata metadata = self._extract_metadata(ds) # Extract image data image_array = self._extract_image_data(ds) if image_array is None: raise ValueError("Failed to extract image data from DICOM") # Determine modality and body part modality = self._determine_modality(ds) body_part = self._determine_body_part(ds, modality) # Extract imaging parameters pixel_spacing = self._extract_pixel_spacing(ds) slice_thickness = self._extract_slice_thickness(ds) # Process image for analysis processed_image = self._preprocess_image(image_array, modality) # Perform segmentation if MONAI is available segmentation_results = None if self.segmentation_model is not None: segmentation_results = self._perform_segmentation(processed_image, modality) # Calculate quantitative metrics quantitative_metrics = self._calculate_quantitative_metrics( image_array, segmentation_results, modality ) # Calculate confidence score confidence_score = self._calculate_processing_confidence( ds, image_array, metadata ) processing_time = time.time() - start_time return DICOMProcessingResult( metadata=metadata, image_data=image_array, pixel_spacing=pixel_spacing, slice_thickness=slice_thickness, modality=modality, body_part=body_part, image_dimensions=image_array.shape, segmentation_results=segmentation_results, quantitative_metrics=quantitative_metrics, confidence_score=confidence_score, processing_time=processing_time ) except Exception as e: logger.error(f"DICOM processing error for {dicom_path}: {str(e)}") return DICOMProcessingResult( metadata={"error": str(e)}, image_data=np.array([]), pixel_spacing=None, slice_thickness=None, modality="unknown", body_part="unknown", image_dimensions=(0, 0, 0), segmentation_results=None, quantitative_metrics=None, confidence_score=0.0, processing_time=time.time() - start_time ) def process_dicom_series(self, dicom_files: List[str]) -> List[DICOMProcessingResult]: """Process multiple DICOM files as a series""" results = [] # Group files by series if possible series_groups = self._group_dicom_files(dicom_files) for series_files in series_groups: if len(series_files) == 1: # Single file series result = self.process_dicom_file(series_files[0]) results.append(result) else: # Multi-slice series result = self._process_dicom_series(series_files) results.extend(result) return results def _extract_metadata(self, ds: pydicom.Dataset) -> Dict[str, Any]: """Extract relevant DICOM metadata""" metadata = { "patient_id": getattr(ds, 'PatientID', ''), "patient_name": getattr(ds, 'PatientName', ''), "study_date": str(getattr(ds, 'StudyDate', '')), "study_time": str(getattr(ds, 'StudyTime', '')), "modality": getattr(ds, 'Modality', ''), "manufacturer": getattr(ds, 'Manufacturer', ''), "model": getattr(ds, 'ManufacturerModelName', ''), "protocol_name": getattr(ds, 'ProtocolName', ''), "series_description": getattr(ds, 'SeriesDescription', ''), "study_description": getattr(ds, 'StudyDescription', ''), "instance_number": getattr(ds, 'InstanceNumber', 0), "series_number": getattr(ds, 'SeriesNumber', 0), "accession_number": getattr(ds, 'AccessionNumber', ''), } # Extract additional technical parameters try: metadata.update({ "bits_allocated": getattr(ds, 'BitsAllocated', 0), "bits_stored": getattr(ds, 'BitsStored', 0), "high_bit": getattr(ds, 'HighBit', 0), "pixel_representation": getattr(ds, 'PixelRepresentation', 0), "rows": getattr(ds, 'Rows', 0), "columns": getattr(ds, 'Columns', 0), "samples_per_pixel": getattr(ds, 'SamplesPerPixel', 1), }) except: pass return metadata def _extract_image_data(self, ds: pydicom.Dataset) -> Optional[np.ndarray]: """Extract image data from DICOM""" try: # Get pixel data pixel_data = ds.pixel_array # Handle different modalities modality = getattr(ds, 'Modality', '').upper() if modality == 'CT': # Convert to Hounsfield Units for CT if hasattr(ds, 'RescaleIntercept') and hasattr(ds, 'RescaleSlope'): intercept = ds.RescaleIntercept slope = ds.RescaleSlope pixel_data = pixel_data * slope + intercept elif modality == 'US': # Ultrasound may need different processing if len(pixel_data.shape) == 3 and pixel_data.shape[2] == 3: # Convert RGB to grayscale pixel_data = np.mean(pixel_data, axis=2) return pixel_data except Exception as e: logger.error(f"Image data extraction error: {str(e)}") return None def _determine_modality(self, ds: pydicom.Dataset) -> str: """Determine imaging modality""" modality = getattr(ds, 'Modality', '').upper() modality_mapping = { 'CT': 'CT', 'MR': 'MRI', 'US': 'ULTRASOUND', 'XA': 'XRAY', 'CR': 'XRAY', 'DX': 'XRAY', 'MG': 'MAMMOGRAPHY', 'NM': 'NUCLEAR' } return modality_mapping.get(modality, modality) def _determine_body_part(self, ds: pydicom.Dataset, modality: str) -> str: """Determine anatomical region from DICOM metadata""" # Try to extract from protocol name or series description protocol = getattr(ds, 'ProtocolName', '').lower() series_desc = getattr(ds, 'SeriesDescription', '').lower() # Common body part indicators body_part_keywords = { 'chest': ['chest', 'lung', 'pulmonary', 'thorax'], 'abdomen': ['abdomen', 'abdominal', 'hepatic', 'hepato', 'renal'], 'head': ['head', 'brain', 'cerebral', 'cranial'], 'spine': ['spine', 'vertebral', 'lumbar', 'thoracic'], 'pelvis': ['pelvis', 'pelvic', 'hip'], 'extremity': ['arm', 'leg', 'knee', 'shoulder', 'ankle', 'wrist'], 'cardiac': ['cardiac', 'heart', 'coronary', 'cardio'] } combined_text = f"{protocol} {series_desc}" for body_part, keywords in body_part_keywords.items(): if any(keyword in combined_text for keyword in keywords): return body_part.upper() return 'UNKNOWN' def _extract_pixel_spacing(self, ds: pydicom.Dataset) -> Optional[Tuple[float, float]]: """Extract pixel spacing information""" try: if hasattr(ds, 'PixelSpacing'): spacing = ds.PixelSpacing if len(spacing) == 2: return (float(spacing[0]), float(spacing[1])) except: pass return None def _extract_slice_thickness(self, ds: pydicom.Dataset) -> Optional[float]: """Extract slice thickness""" try: if hasattr(ds, 'SliceThickness'): return float(ds.SliceThickness) except: pass return None def _preprocess_image(self, image_array: np.ndarray, modality: str) -> np.ndarray: """Preprocess image for analysis""" # Normalize intensity based on modality if modality == 'CT': # CT: window to lung or soft tissue image_array = np.clip(image_array, -1000, 1000) image_array = (image_array + 1000) / 2000 elif modality == 'MRI': # MRI: normalize to 0-1 if np.max(image_array) > np.min(image_array): image_array = (image_array - np.min(image_array)) / (np.max(image_array) - np.min(image_array)) else: # General case if np.max(image_array) > np.min(image_array): image_array = (image_array - np.min(image_array)) / (np.max(image_array) - np.min(image_array)) return image_array def _perform_segmentation(self, image_array: np.ndarray, modality: str) -> Optional[List[Dict[str, Any]]]: """Perform organ segmentation using MONAI if available""" if not self.segmentation_model or not MONAI_AVAILABLE: return None try: # Select appropriate segmentation based on modality and body part if modality == 'CT': # Example: lung segmentation or abdominal organ segmentation segmentation_results = self._perform_lung_segmentation(image_array) elif modality == 'MRI': # Example: brain or cardiac segmentation segmentation_results = self._perform_brain_segmentation(image_array) else: segmentation_results = [] return segmentation_results except Exception as e: logger.error(f"Segmentation error: {str(e)}") return None def _perform_lung_segmentation(self, image_array: np.ndarray) -> List[Dict[str, Any]]: """Perform lung segmentation (placeholder implementation)""" # This would use a trained lung segmentation model # For now, return placeholder results return [ { "organ": "Lung", "volume_ml": np.random.normal(2500, 500), # Placeholder "segmentation_method": "threshold_based", "confidence": 0.7 } ] def _perform_brain_segmentation(self, image_array: np.ndarray) -> List[Dict[str, Any]]: """Perform brain segmentation (placeholder implementation)""" # This would use a trained brain segmentation model return [ { "organ": "Brain", "volume_ml": np.random.normal(1400, 100), # Placeholder "segmentation_method": "atlas_based", "confidence": 0.8 } ] def _calculate_quantitative_metrics(self, image_array: np.ndarray, segmentation_results: Optional[List[Dict[str, Any]]], modality: str) -> Optional[Dict[str, float]]: """Calculate quantitative imaging metrics""" try: metrics = {} # Basic image statistics metrics.update({ "mean_intensity": float(np.mean(image_array)), "std_intensity": float(np.std(image_array)), "min_intensity": float(np.min(image_array)), "max_intensity": float(np.max(image_array)), "image_volume_voxels": int(np.prod(image_array.shape)), }) # Modality-specific metrics if modality == 'CT': # Hounsfield Unit statistics metrics.update({ "hu_mean": float(np.mean(image_array)), "hu_std": float(np.std(image_array)), "lung_collapse_area": 0.0, # Would be calculated from segmentation }) # Add segmentation-based metrics if segmentation_results: for seg_result in segmentation_results: organ = seg_result.get("organ", "Unknown") metrics[f"{organ.lower()}_volume_ml"] = seg_result.get("volume_ml", 0.0) return metrics except Exception as e: logger.error(f"Quantitative metrics calculation error: {str(e)}") return None def _calculate_processing_confidence(self, ds: pydicom.Dataset, image_array: np.ndarray, metadata: Dict[str, Any]) -> float: """Calculate confidence score for DICOM processing""" confidence_factors = [] # Image quality factors if image_array.size > 1000: # Minimum image size confidence_factors.append(0.2) if metadata.get('rows', 0) > 256 and metadata.get('columns', 0) > 256: confidence_factors.append(0.2) # Metadata completeness required_fields = ['modality', 'patient_id', 'study_date'] completeness = sum(1 for field in required_fields if metadata.get(field)) / len(required_fields) confidence_factors.append(completeness * 0.3) # Technical parameters if metadata.get('pixel_spacing'): confidence_factors.append(0.2) else: confidence_factors.append(0.1) return sum(confidence_factors) def _group_dicom_files(self, dicom_files: List[str]) -> List[List[str]]: """Group DICOM files by series""" # Simple grouping by file name pattern - would use actual DICOM UID in production groups = {} for file_path in dicom_files: # Extract series identifier (simplified) filename = Path(file_path).stem series_key = "_".join(filename.split("_")[:-1]) if "_" in filename else filename if series_key not in groups: groups[series_key] = [] groups[series_key].append(file_path) return list(groups.values()) def _process_dicom_series(self, series_files: List[str]) -> List[DICOMProcessingResult]: """Process a series of DICOM files""" # Load all slices slices = [] for file_path in series_files: result = self.process_dicom_file(file_path) if result.image_data.size > 0: slices.append(result) # Sort by instance number slices.sort(key=lambda x: x.metadata.get('instance_number', 0)) # Combine into volume (simplified) if len(slices) > 1: volume_data = np.stack([s.image_data for s in slices], axis=-1) # Update first result with volume data slices[0].image_data = volume_data slices[0].image_dimensions = volume_data.shape return slices def convert_to_radiology_schema(self, result: DICOMProcessingResult) -> Dict[str, Any]: """Convert DICOM processing result to radiology schema format""" try: # Create metadata metadata = MedicalDocumentMetadata( source_type="radiology", data_completeness=result.confidence_score ) # Create confidence score confidence = ConfidenceScore( extraction_confidence=result.confidence_score, model_confidence=0.8 if result.segmentation_results else 0.6, data_quality=0.9 ) # Create image reference image_ref = RadiologyImageReference( image_id="dicom_series_001", modality=result.modality, body_part=result.body_part, slice_thickness_mm=result.slice_thickness ) # Create findings (basic for now) findings = RadiologyFindings( findings_text=f"{result.modality} study of {result.body_part}", impression_text=f"{result.modality} {result.body_part} imaging completed", technique_description=f"{result.modality} with {result.image_dimensions[0]}x{result.image_dimensions[1]} resolution" ) # Convert segmentations segmentations = [] if result.segmentation_results: for seg_result in result.segmentation_results: segmentation = RadiologySegmentation( organ_name=seg_result.get("organ", "Unknown"), volume_ml=seg_result.get("volume_ml"), surface_area_cm2=None, mean_intensity=np.mean(result.image_data) if result.image_data.size > 0 else None ) segmentations.append(segmentation) # Create metrics metrics = RadiologyMetrics( organ_volumes={seg.get("organ", "Unknown"): seg.get("volume_ml", 0) for seg in (result.segmentation_results or [])}, lesion_measurements=[], enhancement_patterns=[], calcification_scores={}, tissue_density=result.quantitative_metrics ) return { "metadata": metadata.dict(), "image_references": [image_ref.dict()], "findings": findings.dict(), "segmentations": [s.dict() for s in segmentations], "metrics": metrics.dict(), "confidence": confidence.dict(), "criticality_level": "routine", "follow_up_recommendations": [] } except Exception as e: logger.error(f"Schema conversion error: {str(e)}") return {"error": str(e)} # Export main classes __all__ = [ "DICOMProcessor", "DICOMProcessingResult" ]