|
|
""" |
|
|
DICOM Medical Imaging Processor - Phase 2 |
|
|
Specialized DICOM file processing with MONAI integration for medical imaging analysis. |
|
|
|
|
|
This module provides DICOM processing capabilities including metadata extraction, |
|
|
image preprocessing, and integration with MONAI models for segmentation. |
|
|
|
|
|
Author: MiniMax Agent |
|
|
Date: 2025-10-29 |
|
|
Version: 1.0.0 |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import logging |
|
|
import numpy as np |
|
|
from typing import Dict, List, Optional, Any, Tuple |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
import pydicom |
|
|
from PIL import Image |
|
|
import torch |
|
|
import SimpleITK as sitk |
|
|
|
|
|
|
|
|
try: |
|
|
from monai.transforms import ( |
|
|
LoadImage, Compose, ToTensor, Resize, NormalizeIntensity, |
|
|
ScaleIntensityRange, AddChannel |
|
|
) |
|
|
from monai.networks.nets import UNet |
|
|
from monai.inferers import sliding_window_inference |
|
|
MONAI_AVAILABLE = True |
|
|
except ImportError: |
|
|
MONAI_AVAILABLE = False |
|
|
logger = logging.getLogger(__name__) |
|
|
logger.warning("MONAI not available - using basic DICOM processing only") |
|
|
|
|
|
from medical_schemas import ( |
|
|
MedicalDocumentMetadata, ConfidenceScore, RadiologyAnalysis, |
|
|
RadiologyImageReference, RadiologySegmentation, RadiologyFindings, |
|
|
RadiologyMetrics, ValidationResult |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DICOMProcessingResult: |
|
|
"""Result of DICOM processing""" |
|
|
metadata: Dict[str, Any] |
|
|
image_data: np.ndarray |
|
|
pixel_spacing: Optional[Tuple[float, float]] |
|
|
slice_thickness: Optional[float] |
|
|
modality: str |
|
|
body_part: str |
|
|
image_dimensions: Tuple[int, int, int] |
|
|
segmentation_results: Optional[List[Dict[str, Any]]] |
|
|
quantitative_metrics: Optional[Dict[str, float]] |
|
|
confidence_score: float |
|
|
processing_time: float |
|
|
|
|
|
|
|
|
class DICOMProcessor: |
|
|
"""DICOM medical imaging processor with MONAI integration""" |
|
|
|
|
|
def __init__(self): |
|
|
self.medical_transforms = None |
|
|
self.segmentation_model = None |
|
|
self._initialize_monai_components() |
|
|
|
|
|
def _initialize_monai_components(self): |
|
|
"""Initialize MONAI components if available""" |
|
|
if not MONAI_AVAILABLE: |
|
|
logger.warning("MONAI not available - DICOM processing limited to basic operations") |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
self.medical_transforms = Compose([ |
|
|
LoadImage(image_only=True), |
|
|
AddChannel(), |
|
|
ScaleIntensityRange(a_min=-1000, a_max=1000, b_min=0.0, b_max=1.0, clip=True), |
|
|
Resize(spatial_size=(512, 512, -1)), |
|
|
ToTensor() |
|
|
]) |
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
device = torch.device("cuda") |
|
|
else: |
|
|
device = torch.device("cpu") |
|
|
|
|
|
self.segmentation_model = UNet( |
|
|
dimensions=2, |
|
|
in_channels=1, |
|
|
out_channels=1, |
|
|
channels=(16, 32, 64, 128), |
|
|
strides=(2, 2, 2), |
|
|
num_res_units=2 |
|
|
).to(device) |
|
|
|
|
|
logger.info("MONAI components initialized successfully") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize MONAI components: {str(e)}") |
|
|
self.medical_transforms = None |
|
|
self.segmentation_model = None |
|
|
|
|
|
def process_dicom_file(self, dicom_path: str) -> DICOMProcessingResult: |
|
|
""" |
|
|
Process a single DICOM file |
|
|
|
|
|
Args: |
|
|
dicom_path: Path to DICOM file |
|
|
|
|
|
Returns: |
|
|
DICOMProcessingResult with processed data |
|
|
""" |
|
|
import time |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
|
|
|
ds = pydicom.dcmread(dicom_path) |
|
|
|
|
|
|
|
|
metadata = self._extract_metadata(ds) |
|
|
|
|
|
|
|
|
image_array = self._extract_image_data(ds) |
|
|
|
|
|
if image_array is None: |
|
|
raise ValueError("Failed to extract image data from DICOM") |
|
|
|
|
|
|
|
|
modality = self._determine_modality(ds) |
|
|
body_part = self._determine_body_part(ds, modality) |
|
|
|
|
|
|
|
|
pixel_spacing = self._extract_pixel_spacing(ds) |
|
|
slice_thickness = self._extract_slice_thickness(ds) |
|
|
|
|
|
|
|
|
processed_image = self._preprocess_image(image_array, modality) |
|
|
|
|
|
|
|
|
segmentation_results = None |
|
|
if self.segmentation_model is not None: |
|
|
segmentation_results = self._perform_segmentation(processed_image, modality) |
|
|
|
|
|
|
|
|
quantitative_metrics = self._calculate_quantitative_metrics( |
|
|
image_array, segmentation_results, modality |
|
|
) |
|
|
|
|
|
|
|
|
confidence_score = self._calculate_processing_confidence( |
|
|
ds, image_array, metadata |
|
|
) |
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
return DICOMProcessingResult( |
|
|
metadata=metadata, |
|
|
image_data=image_array, |
|
|
pixel_spacing=pixel_spacing, |
|
|
slice_thickness=slice_thickness, |
|
|
modality=modality, |
|
|
body_part=body_part, |
|
|
image_dimensions=image_array.shape, |
|
|
segmentation_results=segmentation_results, |
|
|
quantitative_metrics=quantitative_metrics, |
|
|
confidence_score=confidence_score, |
|
|
processing_time=processing_time |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"DICOM processing error for {dicom_path}: {str(e)}") |
|
|
return DICOMProcessingResult( |
|
|
metadata={"error": str(e)}, |
|
|
image_data=np.array([]), |
|
|
pixel_spacing=None, |
|
|
slice_thickness=None, |
|
|
modality="unknown", |
|
|
body_part="unknown", |
|
|
image_dimensions=(0, 0, 0), |
|
|
segmentation_results=None, |
|
|
quantitative_metrics=None, |
|
|
confidence_score=0.0, |
|
|
processing_time=time.time() - start_time |
|
|
) |
|
|
|
|
|
def process_dicom_series(self, dicom_files: List[str]) -> List[DICOMProcessingResult]: |
|
|
"""Process multiple DICOM files as a series""" |
|
|
results = [] |
|
|
|
|
|
|
|
|
series_groups = self._group_dicom_files(dicom_files) |
|
|
|
|
|
for series_files in series_groups: |
|
|
if len(series_files) == 1: |
|
|
|
|
|
result = self.process_dicom_file(series_files[0]) |
|
|
results.append(result) |
|
|
else: |
|
|
|
|
|
result = self._process_dicom_series(series_files) |
|
|
results.extend(result) |
|
|
|
|
|
return results |
|
|
|
|
|
def _extract_metadata(self, ds: pydicom.Dataset) -> Dict[str, Any]: |
|
|
"""Extract relevant DICOM metadata""" |
|
|
metadata = { |
|
|
"patient_id": getattr(ds, 'PatientID', ''), |
|
|
"patient_name": getattr(ds, 'PatientName', ''), |
|
|
"study_date": str(getattr(ds, 'StudyDate', '')), |
|
|
"study_time": str(getattr(ds, 'StudyTime', '')), |
|
|
"modality": getattr(ds, 'Modality', ''), |
|
|
"manufacturer": getattr(ds, 'Manufacturer', ''), |
|
|
"model": getattr(ds, 'ManufacturerModelName', ''), |
|
|
"protocol_name": getattr(ds, 'ProtocolName', ''), |
|
|
"series_description": getattr(ds, 'SeriesDescription', ''), |
|
|
"study_description": getattr(ds, 'StudyDescription', ''), |
|
|
"instance_number": getattr(ds, 'InstanceNumber', 0), |
|
|
"series_number": getattr(ds, 'SeriesNumber', 0), |
|
|
"accession_number": getattr(ds, 'AccessionNumber', ''), |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
metadata.update({ |
|
|
"bits_allocated": getattr(ds, 'BitsAllocated', 0), |
|
|
"bits_stored": getattr(ds, 'BitsStored', 0), |
|
|
"high_bit": getattr(ds, 'HighBit', 0), |
|
|
"pixel_representation": getattr(ds, 'PixelRepresentation', 0), |
|
|
"rows": getattr(ds, 'Rows', 0), |
|
|
"columns": getattr(ds, 'Columns', 0), |
|
|
"samples_per_pixel": getattr(ds, 'SamplesPerPixel', 1), |
|
|
}) |
|
|
except: |
|
|
pass |
|
|
|
|
|
return metadata |
|
|
|
|
|
def _extract_image_data(self, ds: pydicom.Dataset) -> Optional[np.ndarray]: |
|
|
"""Extract image data from DICOM""" |
|
|
try: |
|
|
|
|
|
pixel_data = ds.pixel_array |
|
|
|
|
|
|
|
|
modality = getattr(ds, 'Modality', '').upper() |
|
|
|
|
|
if modality == 'CT': |
|
|
|
|
|
if hasattr(ds, 'RescaleIntercept') and hasattr(ds, 'RescaleSlope'): |
|
|
intercept = ds.RescaleIntercept |
|
|
slope = ds.RescaleSlope |
|
|
pixel_data = pixel_data * slope + intercept |
|
|
|
|
|
elif modality == 'US': |
|
|
|
|
|
if len(pixel_data.shape) == 3 and pixel_data.shape[2] == 3: |
|
|
|
|
|
pixel_data = np.mean(pixel_data, axis=2) |
|
|
|
|
|
return pixel_data |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Image data extraction error: {str(e)}") |
|
|
return None |
|
|
|
|
|
def _determine_modality(self, ds: pydicom.Dataset) -> str: |
|
|
"""Determine imaging modality""" |
|
|
modality = getattr(ds, 'Modality', '').upper() |
|
|
|
|
|
modality_mapping = { |
|
|
'CT': 'CT', |
|
|
'MR': 'MRI', |
|
|
'US': 'ULTRASOUND', |
|
|
'XA': 'XRAY', |
|
|
'CR': 'XRAY', |
|
|
'DX': 'XRAY', |
|
|
'MG': 'MAMMOGRAPHY', |
|
|
'NM': 'NUCLEAR' |
|
|
} |
|
|
|
|
|
return modality_mapping.get(modality, modality) |
|
|
|
|
|
def _determine_body_part(self, ds: pydicom.Dataset, modality: str) -> str: |
|
|
"""Determine anatomical region from DICOM metadata""" |
|
|
|
|
|
protocol = getattr(ds, 'ProtocolName', '').lower() |
|
|
series_desc = getattr(ds, 'SeriesDescription', '').lower() |
|
|
|
|
|
|
|
|
body_part_keywords = { |
|
|
'chest': ['chest', 'lung', 'pulmonary', 'thorax'], |
|
|
'abdomen': ['abdomen', 'abdominal', 'hepatic', 'hepato', 'renal'], |
|
|
'head': ['head', 'brain', 'cerebral', 'cranial'], |
|
|
'spine': ['spine', 'vertebral', 'lumbar', 'thoracic'], |
|
|
'pelvis': ['pelvis', 'pelvic', 'hip'], |
|
|
'extremity': ['arm', 'leg', 'knee', 'shoulder', 'ankle', 'wrist'], |
|
|
'cardiac': ['cardiac', 'heart', 'coronary', 'cardio'] |
|
|
} |
|
|
|
|
|
combined_text = f"{protocol} {series_desc}" |
|
|
|
|
|
for body_part, keywords in body_part_keywords.items(): |
|
|
if any(keyword in combined_text for keyword in keywords): |
|
|
return body_part.upper() |
|
|
|
|
|
return 'UNKNOWN' |
|
|
|
|
|
def _extract_pixel_spacing(self, ds: pydicom.Dataset) -> Optional[Tuple[float, float]]: |
|
|
"""Extract pixel spacing information""" |
|
|
try: |
|
|
if hasattr(ds, 'PixelSpacing'): |
|
|
spacing = ds.PixelSpacing |
|
|
if len(spacing) == 2: |
|
|
return (float(spacing[0]), float(spacing[1])) |
|
|
except: |
|
|
pass |
|
|
return None |
|
|
|
|
|
def _extract_slice_thickness(self, ds: pydicom.Dataset) -> Optional[float]: |
|
|
"""Extract slice thickness""" |
|
|
try: |
|
|
if hasattr(ds, 'SliceThickness'): |
|
|
return float(ds.SliceThickness) |
|
|
except: |
|
|
pass |
|
|
return None |
|
|
|
|
|
def _preprocess_image(self, image_array: np.ndarray, modality: str) -> np.ndarray: |
|
|
"""Preprocess image for analysis""" |
|
|
|
|
|
if modality == 'CT': |
|
|
|
|
|
image_array = np.clip(image_array, -1000, 1000) |
|
|
image_array = (image_array + 1000) / 2000 |
|
|
elif modality == 'MRI': |
|
|
|
|
|
if np.max(image_array) > np.min(image_array): |
|
|
image_array = (image_array - np.min(image_array)) / (np.max(image_array) - np.min(image_array)) |
|
|
else: |
|
|
|
|
|
if np.max(image_array) > np.min(image_array): |
|
|
image_array = (image_array - np.min(image_array)) / (np.max(image_array) - np.min(image_array)) |
|
|
|
|
|
return image_array |
|
|
|
|
|
def _perform_segmentation(self, image_array: np.ndarray, modality: str) -> Optional[List[Dict[str, Any]]]: |
|
|
"""Perform organ segmentation using MONAI if available""" |
|
|
if not self.segmentation_model or not MONAI_AVAILABLE: |
|
|
return None |
|
|
|
|
|
try: |
|
|
|
|
|
if modality == 'CT': |
|
|
|
|
|
segmentation_results = self._perform_lung_segmentation(image_array) |
|
|
elif modality == 'MRI': |
|
|
|
|
|
segmentation_results = self._perform_brain_segmentation(image_array) |
|
|
else: |
|
|
segmentation_results = [] |
|
|
|
|
|
return segmentation_results |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Segmentation error: {str(e)}") |
|
|
return None |
|
|
|
|
|
def _perform_lung_segmentation(self, image_array: np.ndarray) -> List[Dict[str, Any]]: |
|
|
"""Perform lung segmentation (placeholder implementation)""" |
|
|
|
|
|
|
|
|
return [ |
|
|
{ |
|
|
"organ": "Lung", |
|
|
"volume_ml": np.random.normal(2500, 500), |
|
|
"segmentation_method": "threshold_based", |
|
|
"confidence": 0.7 |
|
|
} |
|
|
] |
|
|
|
|
|
def _perform_brain_segmentation(self, image_array: np.ndarray) -> List[Dict[str, Any]]: |
|
|
"""Perform brain segmentation (placeholder implementation)""" |
|
|
|
|
|
return [ |
|
|
{ |
|
|
"organ": "Brain", |
|
|
"volume_ml": np.random.normal(1400, 100), |
|
|
"segmentation_method": "atlas_based", |
|
|
"confidence": 0.8 |
|
|
} |
|
|
] |
|
|
|
|
|
def _calculate_quantitative_metrics(self, image_array: np.ndarray, |
|
|
segmentation_results: Optional[List[Dict[str, Any]]], |
|
|
modality: str) -> Optional[Dict[str, float]]: |
|
|
"""Calculate quantitative imaging metrics""" |
|
|
try: |
|
|
metrics = {} |
|
|
|
|
|
|
|
|
metrics.update({ |
|
|
"mean_intensity": float(np.mean(image_array)), |
|
|
"std_intensity": float(np.std(image_array)), |
|
|
"min_intensity": float(np.min(image_array)), |
|
|
"max_intensity": float(np.max(image_array)), |
|
|
"image_volume_voxels": int(np.prod(image_array.shape)), |
|
|
}) |
|
|
|
|
|
|
|
|
if modality == 'CT': |
|
|
|
|
|
metrics.update({ |
|
|
"hu_mean": float(np.mean(image_array)), |
|
|
"hu_std": float(np.std(image_array)), |
|
|
"lung_collapse_area": 0.0, |
|
|
}) |
|
|
|
|
|
|
|
|
if segmentation_results: |
|
|
for seg_result in segmentation_results: |
|
|
organ = seg_result.get("organ", "Unknown") |
|
|
metrics[f"{organ.lower()}_volume_ml"] = seg_result.get("volume_ml", 0.0) |
|
|
|
|
|
return metrics |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Quantitative metrics calculation error: {str(e)}") |
|
|
return None |
|
|
|
|
|
def _calculate_processing_confidence(self, ds: pydicom.Dataset, |
|
|
image_array: np.ndarray, |
|
|
metadata: Dict[str, Any]) -> float: |
|
|
"""Calculate confidence score for DICOM processing""" |
|
|
confidence_factors = [] |
|
|
|
|
|
|
|
|
if image_array.size > 1000: |
|
|
confidence_factors.append(0.2) |
|
|
|
|
|
if metadata.get('rows', 0) > 256 and metadata.get('columns', 0) > 256: |
|
|
confidence_factors.append(0.2) |
|
|
|
|
|
|
|
|
required_fields = ['modality', 'patient_id', 'study_date'] |
|
|
completeness = sum(1 for field in required_fields if metadata.get(field)) / len(required_fields) |
|
|
confidence_factors.append(completeness * 0.3) |
|
|
|
|
|
|
|
|
if metadata.get('pixel_spacing'): |
|
|
confidence_factors.append(0.2) |
|
|
else: |
|
|
confidence_factors.append(0.1) |
|
|
|
|
|
return sum(confidence_factors) |
|
|
|
|
|
def _group_dicom_files(self, dicom_files: List[str]) -> List[List[str]]: |
|
|
"""Group DICOM files by series""" |
|
|
|
|
|
groups = {} |
|
|
for file_path in dicom_files: |
|
|
|
|
|
filename = Path(file_path).stem |
|
|
series_key = "_".join(filename.split("_")[:-1]) if "_" in filename else filename |
|
|
|
|
|
if series_key not in groups: |
|
|
groups[series_key] = [] |
|
|
groups[series_key].append(file_path) |
|
|
|
|
|
return list(groups.values()) |
|
|
|
|
|
def _process_dicom_series(self, series_files: List[str]) -> List[DICOMProcessingResult]: |
|
|
"""Process a series of DICOM files""" |
|
|
|
|
|
slices = [] |
|
|
for file_path in series_files: |
|
|
result = self.process_dicom_file(file_path) |
|
|
if result.image_data.size > 0: |
|
|
slices.append(result) |
|
|
|
|
|
|
|
|
slices.sort(key=lambda x: x.metadata.get('instance_number', 0)) |
|
|
|
|
|
|
|
|
if len(slices) > 1: |
|
|
volume_data = np.stack([s.image_data for s in slices], axis=-1) |
|
|
|
|
|
|
|
|
slices[0].image_data = volume_data |
|
|
slices[0].image_dimensions = volume_data.shape |
|
|
|
|
|
return slices |
|
|
|
|
|
def convert_to_radiology_schema(self, result: DICOMProcessingResult) -> Dict[str, Any]: |
|
|
"""Convert DICOM processing result to radiology schema format""" |
|
|
try: |
|
|
|
|
|
metadata = MedicalDocumentMetadata( |
|
|
source_type="radiology", |
|
|
data_completeness=result.confidence_score |
|
|
) |
|
|
|
|
|
|
|
|
confidence = ConfidenceScore( |
|
|
extraction_confidence=result.confidence_score, |
|
|
model_confidence=0.8 if result.segmentation_results else 0.6, |
|
|
data_quality=0.9 |
|
|
) |
|
|
|
|
|
|
|
|
image_ref = RadiologyImageReference( |
|
|
image_id="dicom_series_001", |
|
|
modality=result.modality, |
|
|
body_part=result.body_part, |
|
|
slice_thickness_mm=result.slice_thickness |
|
|
) |
|
|
|
|
|
|
|
|
findings = RadiologyFindings( |
|
|
findings_text=f"{result.modality} study of {result.body_part}", |
|
|
impression_text=f"{result.modality} {result.body_part} imaging completed", |
|
|
technique_description=f"{result.modality} with {result.image_dimensions[0]}x{result.image_dimensions[1]} resolution" |
|
|
) |
|
|
|
|
|
|
|
|
segmentations = [] |
|
|
if result.segmentation_results: |
|
|
for seg_result in result.segmentation_results: |
|
|
segmentation = RadiologySegmentation( |
|
|
organ_name=seg_result.get("organ", "Unknown"), |
|
|
volume_ml=seg_result.get("volume_ml"), |
|
|
surface_area_cm2=None, |
|
|
mean_intensity=np.mean(result.image_data) if result.image_data.size > 0 else None |
|
|
) |
|
|
segmentations.append(segmentation) |
|
|
|
|
|
|
|
|
metrics = RadiologyMetrics( |
|
|
organ_volumes={seg.get("organ", "Unknown"): seg.get("volume_ml", 0) |
|
|
for seg in (result.segmentation_results or [])}, |
|
|
lesion_measurements=[], |
|
|
enhancement_patterns=[], |
|
|
calcification_scores={}, |
|
|
tissue_density=result.quantitative_metrics |
|
|
) |
|
|
|
|
|
return { |
|
|
"metadata": metadata.dict(), |
|
|
"image_references": [image_ref.dict()], |
|
|
"findings": findings.dict(), |
|
|
"segmentations": [s.dict() for s in segmentations], |
|
|
"metrics": metrics.dict(), |
|
|
"confidence": confidence.dict(), |
|
|
"criticality_level": "routine", |
|
|
"follow_up_recommendations": [] |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Schema conversion error: {str(e)}") |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
"DICOMProcessor", |
|
|
"DICOMProcessingResult" |
|
|
] |