medical-report-analyzer / backend /dicom_processor.py
snikhilesh's picture
Deploy dicom_processor.py to backend/ directory
b144f9b verified
"""
DICOM Medical Imaging Processor - Phase 2
Specialized DICOM file processing with MONAI integration for medical imaging analysis.
This module provides DICOM processing capabilities including metadata extraction,
image preprocessing, and integration with MONAI models for segmentation.
Author: MiniMax Agent
Date: 2025-10-29
Version: 1.0.0
"""
import os
import json
import logging
import numpy as np
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass
from pathlib import Path
import pydicom
from PIL import Image
import torch
import SimpleITK as sitk
# Optional MONAI imports
try:
from monai.transforms import (
LoadImage, Compose, ToTensor, Resize, NormalizeIntensity,
ScaleIntensityRange, AddChannel
)
from monai.networks.nets import UNet
from monai.inferers import sliding_window_inference
MONAI_AVAILABLE = True
except ImportError:
MONAI_AVAILABLE = False
logger = logging.getLogger(__name__)
logger.warning("MONAI not available - using basic DICOM processing only")
from medical_schemas import (
MedicalDocumentMetadata, ConfidenceScore, RadiologyAnalysis,
RadiologyImageReference, RadiologySegmentation, RadiologyFindings,
RadiologyMetrics, ValidationResult
)
logger = logging.getLogger(__name__)
@dataclass
class DICOMProcessingResult:
"""Result of DICOM processing"""
metadata: Dict[str, Any]
image_data: np.ndarray
pixel_spacing: Optional[Tuple[float, float]]
slice_thickness: Optional[float]
modality: str
body_part: str
image_dimensions: Tuple[int, int, int] # (width, height, slices)
segmentation_results: Optional[List[Dict[str, Any]]]
quantitative_metrics: Optional[Dict[str, float]]
confidence_score: float
processing_time: float
class DICOMProcessor:
"""DICOM medical imaging processor with MONAI integration"""
def __init__(self):
self.medical_transforms = None
self.segmentation_model = None
self._initialize_monai_components()
def _initialize_monai_components(self):
"""Initialize MONAI components if available"""
if not MONAI_AVAILABLE:
logger.warning("MONAI not available - DICOM processing limited to basic operations")
return
try:
# Define medical image transforms
self.medical_transforms = Compose([
LoadImage(image_only=True),
AddChannel(),
ScaleIntensityRange(a_min=-1000, a_max=1000, b_min=0.0, b_max=1.0, clip=True),
Resize(spatial_size=(512, 512, -1)), # Resize to standard size
ToTensor()
])
# Initialize UNet for segmentation (can be loaded with pretrained weights)
if torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")
self.segmentation_model = UNet(
dimensions=2,
in_channels=1,
out_channels=1,
channels=(16, 32, 64, 128),
strides=(2, 2, 2),
num_res_units=2
).to(device)
logger.info("MONAI components initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize MONAI components: {str(e)}")
self.medical_transforms = None
self.segmentation_model = None
def process_dicom_file(self, dicom_path: str) -> DICOMProcessingResult:
"""
Process a single DICOM file
Args:
dicom_path: Path to DICOM file
Returns:
DICOMProcessingResult with processed data
"""
import time
start_time = time.time()
try:
# Read DICOM file
ds = pydicom.dcmread(dicom_path)
# Extract metadata
metadata = self._extract_metadata(ds)
# Extract image data
image_array = self._extract_image_data(ds)
if image_array is None:
raise ValueError("Failed to extract image data from DICOM")
# Determine modality and body part
modality = self._determine_modality(ds)
body_part = self._determine_body_part(ds, modality)
# Extract imaging parameters
pixel_spacing = self._extract_pixel_spacing(ds)
slice_thickness = self._extract_slice_thickness(ds)
# Process image for analysis
processed_image = self._preprocess_image(image_array, modality)
# Perform segmentation if MONAI is available
segmentation_results = None
if self.segmentation_model is not None:
segmentation_results = self._perform_segmentation(processed_image, modality)
# Calculate quantitative metrics
quantitative_metrics = self._calculate_quantitative_metrics(
image_array, segmentation_results, modality
)
# Calculate confidence score
confidence_score = self._calculate_processing_confidence(
ds, image_array, metadata
)
processing_time = time.time() - start_time
return DICOMProcessingResult(
metadata=metadata,
image_data=image_array,
pixel_spacing=pixel_spacing,
slice_thickness=slice_thickness,
modality=modality,
body_part=body_part,
image_dimensions=image_array.shape,
segmentation_results=segmentation_results,
quantitative_metrics=quantitative_metrics,
confidence_score=confidence_score,
processing_time=processing_time
)
except Exception as e:
logger.error(f"DICOM processing error for {dicom_path}: {str(e)}")
return DICOMProcessingResult(
metadata={"error": str(e)},
image_data=np.array([]),
pixel_spacing=None,
slice_thickness=None,
modality="unknown",
body_part="unknown",
image_dimensions=(0, 0, 0),
segmentation_results=None,
quantitative_metrics=None,
confidence_score=0.0,
processing_time=time.time() - start_time
)
def process_dicom_series(self, dicom_files: List[str]) -> List[DICOMProcessingResult]:
"""Process multiple DICOM files as a series"""
results = []
# Group files by series if possible
series_groups = self._group_dicom_files(dicom_files)
for series_files in series_groups:
if len(series_files) == 1:
# Single file series
result = self.process_dicom_file(series_files[0])
results.append(result)
else:
# Multi-slice series
result = self._process_dicom_series(series_files)
results.extend(result)
return results
def _extract_metadata(self, ds: pydicom.Dataset) -> Dict[str, Any]:
"""Extract relevant DICOM metadata"""
metadata = {
"patient_id": getattr(ds, 'PatientID', ''),
"patient_name": getattr(ds, 'PatientName', ''),
"study_date": str(getattr(ds, 'StudyDate', '')),
"study_time": str(getattr(ds, 'StudyTime', '')),
"modality": getattr(ds, 'Modality', ''),
"manufacturer": getattr(ds, 'Manufacturer', ''),
"model": getattr(ds, 'ManufacturerModelName', ''),
"protocol_name": getattr(ds, 'ProtocolName', ''),
"series_description": getattr(ds, 'SeriesDescription', ''),
"study_description": getattr(ds, 'StudyDescription', ''),
"instance_number": getattr(ds, 'InstanceNumber', 0),
"series_number": getattr(ds, 'SeriesNumber', 0),
"accession_number": getattr(ds, 'AccessionNumber', ''),
}
# Extract additional technical parameters
try:
metadata.update({
"bits_allocated": getattr(ds, 'BitsAllocated', 0),
"bits_stored": getattr(ds, 'BitsStored', 0),
"high_bit": getattr(ds, 'HighBit', 0),
"pixel_representation": getattr(ds, 'PixelRepresentation', 0),
"rows": getattr(ds, 'Rows', 0),
"columns": getattr(ds, 'Columns', 0),
"samples_per_pixel": getattr(ds, 'SamplesPerPixel', 1),
})
except:
pass
return metadata
def _extract_image_data(self, ds: pydicom.Dataset) -> Optional[np.ndarray]:
"""Extract image data from DICOM"""
try:
# Get pixel data
pixel_data = ds.pixel_array
# Handle different modalities
modality = getattr(ds, 'Modality', '').upper()
if modality == 'CT':
# Convert to Hounsfield Units for CT
if hasattr(ds, 'RescaleIntercept') and hasattr(ds, 'RescaleSlope'):
intercept = ds.RescaleIntercept
slope = ds.RescaleSlope
pixel_data = pixel_data * slope + intercept
elif modality == 'US':
# Ultrasound may need different processing
if len(pixel_data.shape) == 3 and pixel_data.shape[2] == 3:
# Convert RGB to grayscale
pixel_data = np.mean(pixel_data, axis=2)
return pixel_data
except Exception as e:
logger.error(f"Image data extraction error: {str(e)}")
return None
def _determine_modality(self, ds: pydicom.Dataset) -> str:
"""Determine imaging modality"""
modality = getattr(ds, 'Modality', '').upper()
modality_mapping = {
'CT': 'CT',
'MR': 'MRI',
'US': 'ULTRASOUND',
'XA': 'XRAY',
'CR': 'XRAY',
'DX': 'XRAY',
'MG': 'MAMMOGRAPHY',
'NM': 'NUCLEAR'
}
return modality_mapping.get(modality, modality)
def _determine_body_part(self, ds: pydicom.Dataset, modality: str) -> str:
"""Determine anatomical region from DICOM metadata"""
# Try to extract from protocol name or series description
protocol = getattr(ds, 'ProtocolName', '').lower()
series_desc = getattr(ds, 'SeriesDescription', '').lower()
# Common body part indicators
body_part_keywords = {
'chest': ['chest', 'lung', 'pulmonary', 'thorax'],
'abdomen': ['abdomen', 'abdominal', 'hepatic', 'hepato', 'renal'],
'head': ['head', 'brain', 'cerebral', 'cranial'],
'spine': ['spine', 'vertebral', 'lumbar', 'thoracic'],
'pelvis': ['pelvis', 'pelvic', 'hip'],
'extremity': ['arm', 'leg', 'knee', 'shoulder', 'ankle', 'wrist'],
'cardiac': ['cardiac', 'heart', 'coronary', 'cardio']
}
combined_text = f"{protocol} {series_desc}"
for body_part, keywords in body_part_keywords.items():
if any(keyword in combined_text for keyword in keywords):
return body_part.upper()
return 'UNKNOWN'
def _extract_pixel_spacing(self, ds: pydicom.Dataset) -> Optional[Tuple[float, float]]:
"""Extract pixel spacing information"""
try:
if hasattr(ds, 'PixelSpacing'):
spacing = ds.PixelSpacing
if len(spacing) == 2:
return (float(spacing[0]), float(spacing[1]))
except:
pass
return None
def _extract_slice_thickness(self, ds: pydicom.Dataset) -> Optional[float]:
"""Extract slice thickness"""
try:
if hasattr(ds, 'SliceThickness'):
return float(ds.SliceThickness)
except:
pass
return None
def _preprocess_image(self, image_array: np.ndarray, modality: str) -> np.ndarray:
"""Preprocess image for analysis"""
# Normalize intensity based on modality
if modality == 'CT':
# CT: window to lung or soft tissue
image_array = np.clip(image_array, -1000, 1000)
image_array = (image_array + 1000) / 2000
elif modality == 'MRI':
# MRI: normalize to 0-1
if np.max(image_array) > np.min(image_array):
image_array = (image_array - np.min(image_array)) / (np.max(image_array) - np.min(image_array))
else:
# General case
if np.max(image_array) > np.min(image_array):
image_array = (image_array - np.min(image_array)) / (np.max(image_array) - np.min(image_array))
return image_array
def _perform_segmentation(self, image_array: np.ndarray, modality: str) -> Optional[List[Dict[str, Any]]]:
"""Perform organ segmentation using MONAI if available"""
if not self.segmentation_model or not MONAI_AVAILABLE:
return None
try:
# Select appropriate segmentation based on modality and body part
if modality == 'CT':
# Example: lung segmentation or abdominal organ segmentation
segmentation_results = self._perform_lung_segmentation(image_array)
elif modality == 'MRI':
# Example: brain or cardiac segmentation
segmentation_results = self._perform_brain_segmentation(image_array)
else:
segmentation_results = []
return segmentation_results
except Exception as e:
logger.error(f"Segmentation error: {str(e)}")
return None
def _perform_lung_segmentation(self, image_array: np.ndarray) -> List[Dict[str, Any]]:
"""Perform lung segmentation (placeholder implementation)"""
# This would use a trained lung segmentation model
# For now, return placeholder results
return [
{
"organ": "Lung",
"volume_ml": np.random.normal(2500, 500), # Placeholder
"segmentation_method": "threshold_based",
"confidence": 0.7
}
]
def _perform_brain_segmentation(self, image_array: np.ndarray) -> List[Dict[str, Any]]:
"""Perform brain segmentation (placeholder implementation)"""
# This would use a trained brain segmentation model
return [
{
"organ": "Brain",
"volume_ml": np.random.normal(1400, 100), # Placeholder
"segmentation_method": "atlas_based",
"confidence": 0.8
}
]
def _calculate_quantitative_metrics(self, image_array: np.ndarray,
segmentation_results: Optional[List[Dict[str, Any]]],
modality: str) -> Optional[Dict[str, float]]:
"""Calculate quantitative imaging metrics"""
try:
metrics = {}
# Basic image statistics
metrics.update({
"mean_intensity": float(np.mean(image_array)),
"std_intensity": float(np.std(image_array)),
"min_intensity": float(np.min(image_array)),
"max_intensity": float(np.max(image_array)),
"image_volume_voxels": int(np.prod(image_array.shape)),
})
# Modality-specific metrics
if modality == 'CT':
# Hounsfield Unit statistics
metrics.update({
"hu_mean": float(np.mean(image_array)),
"hu_std": float(np.std(image_array)),
"lung_collapse_area": 0.0, # Would be calculated from segmentation
})
# Add segmentation-based metrics
if segmentation_results:
for seg_result in segmentation_results:
organ = seg_result.get("organ", "Unknown")
metrics[f"{organ.lower()}_volume_ml"] = seg_result.get("volume_ml", 0.0)
return metrics
except Exception as e:
logger.error(f"Quantitative metrics calculation error: {str(e)}")
return None
def _calculate_processing_confidence(self, ds: pydicom.Dataset,
image_array: np.ndarray,
metadata: Dict[str, Any]) -> float:
"""Calculate confidence score for DICOM processing"""
confidence_factors = []
# Image quality factors
if image_array.size > 1000: # Minimum image size
confidence_factors.append(0.2)
if metadata.get('rows', 0) > 256 and metadata.get('columns', 0) > 256:
confidence_factors.append(0.2)
# Metadata completeness
required_fields = ['modality', 'patient_id', 'study_date']
completeness = sum(1 for field in required_fields if metadata.get(field)) / len(required_fields)
confidence_factors.append(completeness * 0.3)
# Technical parameters
if metadata.get('pixel_spacing'):
confidence_factors.append(0.2)
else:
confidence_factors.append(0.1)
return sum(confidence_factors)
def _group_dicom_files(self, dicom_files: List[str]) -> List[List[str]]:
"""Group DICOM files by series"""
# Simple grouping by file name pattern - would use actual DICOM UID in production
groups = {}
for file_path in dicom_files:
# Extract series identifier (simplified)
filename = Path(file_path).stem
series_key = "_".join(filename.split("_")[:-1]) if "_" in filename else filename
if series_key not in groups:
groups[series_key] = []
groups[series_key].append(file_path)
return list(groups.values())
def _process_dicom_series(self, series_files: List[str]) -> List[DICOMProcessingResult]:
"""Process a series of DICOM files"""
# Load all slices
slices = []
for file_path in series_files:
result = self.process_dicom_file(file_path)
if result.image_data.size > 0:
slices.append(result)
# Sort by instance number
slices.sort(key=lambda x: x.metadata.get('instance_number', 0))
# Combine into volume (simplified)
if len(slices) > 1:
volume_data = np.stack([s.image_data for s in slices], axis=-1)
# Update first result with volume data
slices[0].image_data = volume_data
slices[0].image_dimensions = volume_data.shape
return slices
def convert_to_radiology_schema(self, result: DICOMProcessingResult) -> Dict[str, Any]:
"""Convert DICOM processing result to radiology schema format"""
try:
# Create metadata
metadata = MedicalDocumentMetadata(
source_type="radiology",
data_completeness=result.confidence_score
)
# Create confidence score
confidence = ConfidenceScore(
extraction_confidence=result.confidence_score,
model_confidence=0.8 if result.segmentation_results else 0.6,
data_quality=0.9
)
# Create image reference
image_ref = RadiologyImageReference(
image_id="dicom_series_001",
modality=result.modality,
body_part=result.body_part,
slice_thickness_mm=result.slice_thickness
)
# Create findings (basic for now)
findings = RadiologyFindings(
findings_text=f"{result.modality} study of {result.body_part}",
impression_text=f"{result.modality} {result.body_part} imaging completed",
technique_description=f"{result.modality} with {result.image_dimensions[0]}x{result.image_dimensions[1]} resolution"
)
# Convert segmentations
segmentations = []
if result.segmentation_results:
for seg_result in result.segmentation_results:
segmentation = RadiologySegmentation(
organ_name=seg_result.get("organ", "Unknown"),
volume_ml=seg_result.get("volume_ml"),
surface_area_cm2=None,
mean_intensity=np.mean(result.image_data) if result.image_data.size > 0 else None
)
segmentations.append(segmentation)
# Create metrics
metrics = RadiologyMetrics(
organ_volumes={seg.get("organ", "Unknown"): seg.get("volume_ml", 0)
for seg in (result.segmentation_results or [])},
lesion_measurements=[],
enhancement_patterns=[],
calcification_scores={},
tissue_density=result.quantitative_metrics
)
return {
"metadata": metadata.dict(),
"image_references": [image_ref.dict()],
"findings": findings.dict(),
"segmentations": [s.dict() for s in segmentations],
"metrics": metrics.dict(),
"confidence": confidence.dict(),
"criticality_level": "routine",
"follow_up_recommendations": []
}
except Exception as e:
logger.error(f"Schema conversion error: {str(e)}")
return {"error": str(e)}
# Export main classes
__all__ = [
"DICOMProcessor",
"DICOMProcessingResult"
]