Spaces:
Running
Running
| """ | |
| DICOM Handler for medical image processing | |
| Optimized for memory-constrained environments | |
| """ | |
| import os | |
| import logging | |
| import numpy as np | |
| from typing import Dict, Any, Optional, Tuple, List | |
| from pathlib import Path | |
| import torch | |
| from PIL import Image | |
| import cv2 | |
| logger = logging.getLogger(__name__) | |
| # Try to import medical libraries with fallbacks | |
| try: | |
| import pydicom | |
| PYDICOM_AVAILABLE = True | |
| except ImportError: | |
| PYDICOM_AVAILABLE = False | |
| logger.warning("pydicom not available - DICOM support limited") | |
| try: | |
| import SimpleITK as sitk | |
| SIMPLEITK_AVAILABLE = True | |
| except ImportError: | |
| SIMPLEITK_AVAILABLE = False | |
| logger.warning("SimpleITK not available - advanced medical image processing limited") | |
| class DicomHandler: | |
| """ | |
| DICOM file handler with memory optimization | |
| """ | |
| def __init__(self, memory_limit_mb: float = 1000.0): | |
| """ | |
| Initialize DICOM handler | |
| Args: | |
| memory_limit_mb: Memory limit for DICOM processing in MB | |
| """ | |
| self.memory_limit_mb = memory_limit_mb | |
| self.memory_limit_bytes = memory_limit_mb * 1024**2 | |
| # Default DICOM processing settings | |
| self.default_window_center = 40 | |
| self.default_window_width = 400 | |
| self.default_output_size = (512, 512) | |
| logger.info(f"DICOM Handler initialized with {memory_limit_mb}MB limit") | |
| logger.info(f"pydicom available: {PYDICOM_AVAILABLE}") | |
| logger.info(f"SimpleITK available: {SIMPLEITK_AVAILABLE}") | |
| def read_dicom_file(self, file_path: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Read DICOM file and extract image data and metadata | |
| Args: | |
| file_path: Path to DICOM file | |
| Returns: | |
| Dictionary containing image data and metadata | |
| """ | |
| if not PYDICOM_AVAILABLE: | |
| logger.error("pydicom not available - cannot read DICOM files") | |
| return None | |
| try: | |
| file_path = Path(file_path) | |
| if not file_path.exists(): | |
| logger.error(f"DICOM file not found: {file_path}") | |
| return None | |
| # Check file size | |
| file_size_mb = file_path.stat().st_size / (1024**2) | |
| if file_size_mb > self.memory_limit_mb: | |
| logger.warning(f"DICOM file too large: {file_size_mb:.1f}MB > {self.memory_limit_mb}MB") | |
| return self._read_large_dicom_file(file_path) | |
| # Read DICOM file | |
| dicom_data = pydicom.dcmread(str(file_path)) | |
| # Extract image data | |
| image_array = dicom_data.pixel_array | |
| # Extract metadata | |
| metadata = self._extract_dicom_metadata(dicom_data) | |
| # Process image | |
| processed_image = self._process_dicom_image(image_array, metadata) | |
| return { | |
| 'image': processed_image, | |
| 'metadata': metadata, | |
| 'original_shape': image_array.shape, | |
| 'file_path': str(file_path), | |
| 'file_size_mb': file_size_mb | |
| } | |
| except Exception as e: | |
| logger.error(f"Error reading DICOM file {file_path}: {e}") | |
| return None | |
| def _read_large_dicom_file(self, file_path: Path) -> Optional[Dict[str, Any]]: | |
| """Read large DICOM file with memory optimization""" | |
| try: | |
| # Read only metadata first | |
| dicom_data = pydicom.dcmread(str(file_path), stop_before_pixels=True) | |
| metadata = self._extract_dicom_metadata(dicom_data) | |
| # Read image data in chunks if possible | |
| if SIMPLEITK_AVAILABLE: | |
| return self._read_dicom_with_sitk(file_path, metadata) | |
| else: | |
| # Fallback: read with reduced resolution | |
| dicom_data = pydicom.dcmread(str(file_path)) | |
| image_array = dicom_data.pixel_array | |
| # Downsample if too large | |
| if image_array.nbytes > self.memory_limit_bytes: | |
| scale_factor = np.sqrt(self.memory_limit_bytes / image_array.nbytes) | |
| new_shape = (int(image_array.shape[0] * scale_factor), | |
| int(image_array.shape[1] * scale_factor)) | |
| image_array = cv2.resize(image_array, new_shape) | |
| logger.info(f"Downsampled DICOM image to {new_shape}") | |
| processed_image = self._process_dicom_image(image_array, metadata) | |
| return { | |
| 'image': processed_image, | |
| 'metadata': metadata, | |
| 'original_shape': dicom_data.pixel_array.shape, | |
| 'file_path': str(file_path), | |
| 'downsampled': True | |
| } | |
| except Exception as e: | |
| logger.error(f"Error reading large DICOM file: {e}") | |
| return None | |
| def _read_dicom_with_sitk(self, file_path: Path, metadata: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| """Read DICOM using SimpleITK for better memory management""" | |
| try: | |
| # Read with SimpleITK | |
| image = sitk.ReadImage(str(file_path)) | |
| image_array = sitk.GetArrayFromImage(image) | |
| # Process image | |
| processed_image = self._process_dicom_image(image_array, metadata) | |
| return { | |
| 'image': processed_image, | |
| 'metadata': metadata, | |
| 'original_shape': image_array.shape, | |
| 'file_path': str(file_path), | |
| 'reader': 'SimpleITK' | |
| } | |
| except Exception as e: | |
| logger.error(f"Error reading DICOM with SimpleITK: {e}") | |
| return None | |
| def _extract_dicom_metadata(self, dicom_data) -> Dict[str, Any]: | |
| """Extract relevant metadata from DICOM data""" | |
| metadata = {} | |
| try: | |
| # Patient information | |
| metadata['patient_id'] = getattr(dicom_data, 'PatientID', 'Unknown') | |
| metadata['patient_age'] = getattr(dicom_data, 'PatientAge', 'Unknown') | |
| metadata['patient_sex'] = getattr(dicom_data, 'PatientSex', 'Unknown') | |
| # Study information | |
| metadata['study_date'] = getattr(dicom_data, 'StudyDate', 'Unknown') | |
| metadata['study_description'] = getattr(dicom_data, 'StudyDescription', 'Unknown') | |
| metadata['modality'] = getattr(dicom_data, 'Modality', 'Unknown') | |
| # Image information | |
| metadata['rows'] = getattr(dicom_data, 'Rows', 0) | |
| metadata['columns'] = getattr(dicom_data, 'Columns', 0) | |
| metadata['pixel_spacing'] = getattr(dicom_data, 'PixelSpacing', [1.0, 1.0]) | |
| metadata['slice_thickness'] = getattr(dicom_data, 'SliceThickness', 1.0) | |
| # Window/Level information for display | |
| metadata['window_center'] = getattr(dicom_data, 'WindowCenter', self.default_window_center) | |
| metadata['window_width'] = getattr(dicom_data, 'WindowWidth', self.default_window_width) | |
| # Ensure window values are scalars | |
| if isinstance(metadata['window_center'], (list, tuple)): | |
| metadata['window_center'] = metadata['window_center'][0] | |
| if isinstance(metadata['window_width'], (list, tuple)): | |
| metadata['window_width'] = metadata['window_width'][0] | |
| except Exception as e: | |
| logger.warning(f"Error extracting DICOM metadata: {e}") | |
| return metadata | |
| def _process_dicom_image(self, image_array: np.ndarray, | |
| metadata: Dict[str, Any]) -> torch.Tensor: | |
| """Process DICOM image array to tensor""" | |
| try: | |
| # Handle different image dimensions | |
| if len(image_array.shape) == 3: | |
| # 3D volume - take middle slice for 2D processing | |
| middle_slice = image_array.shape[0] // 2 | |
| image_array = image_array[middle_slice] | |
| # Apply windowing for better contrast | |
| window_center = metadata.get('window_center', self.default_window_center) | |
| window_width = metadata.get('window_width', self.default_window_width) | |
| image_array = self._apply_windowing(image_array, window_center, window_width) | |
| # Normalize to 0-1 range | |
| image_array = self._normalize_image(image_array) | |
| # Resize to standard size | |
| if image_array.shape != self.default_output_size: | |
| image_array = cv2.resize(image_array, self.default_output_size) | |
| # Convert to tensor | |
| image_tensor = torch.from_numpy(image_array).float() | |
| # Add channel dimension if needed | |
| if len(image_tensor.shape) == 2: | |
| image_tensor = image_tensor.unsqueeze(0) # Add channel dimension | |
| return image_tensor | |
| except Exception as e: | |
| logger.error(f"Error processing DICOM image: {e}") | |
| # Return dummy tensor on error | |
| return torch.zeros(1, *self.default_output_size) | |
| def _apply_windowing(self, image_array: np.ndarray, | |
| window_center: float, window_width: float) -> np.ndarray: | |
| """Apply windowing to DICOM image for better contrast""" | |
| try: | |
| window_min = window_center - window_width / 2 | |
| window_max = window_center + window_width / 2 | |
| # Apply windowing | |
| windowed_image = np.clip(image_array, window_min, window_max) | |
| return windowed_image | |
| except Exception as e: | |
| logger.warning(f"Error applying windowing: {e}") | |
| return image_array | |
| def _normalize_image(self, image_array: np.ndarray) -> np.ndarray: | |
| """Normalize image to 0-1 range""" | |
| try: | |
| # Handle different data types | |
| if image_array.dtype == np.uint8: | |
| return image_array.astype(np.float32) / 255.0 | |
| elif image_array.dtype == np.uint16: | |
| return image_array.astype(np.float32) / 65535.0 | |
| else: | |
| # For other types, normalize to min-max | |
| img_min = image_array.min() | |
| img_max = image_array.max() | |
| if img_max > img_min: | |
| return (image_array - img_min) / (img_max - img_min) | |
| else: | |
| return np.zeros_like(image_array, dtype=np.float32) | |
| except Exception as e: | |
| logger.warning(f"Error normalizing image: {e}") | |
| return image_array.astype(np.float32) | |
| def batch_process_dicom_files(self, file_paths: List[str]) -> List[Dict[str, Any]]: | |
| """Process multiple DICOM files with memory management""" | |
| results = [] | |
| for i, file_path in enumerate(file_paths): | |
| logger.info(f"Processing DICOM file {i+1}/{len(file_paths)}: {file_path}") | |
| result = self.read_dicom_file(file_path) | |
| if result: | |
| results.append(result) | |
| # Memory cleanup every 10 files | |
| if (i + 1) % 10 == 0: | |
| import gc | |
| gc.collect() | |
| logger.debug(f"Memory cleanup after {i+1} files") | |
| return results | |
| def convert_dicom_to_standard_format(self, dicom_result: Dict[str, Any], | |
| output_format: str = 'png') -> Optional[str]: | |
| """Convert processed DICOM to standard image format""" | |
| try: | |
| image_tensor = dicom_result['image'] | |
| # Convert tensor to numpy | |
| if isinstance(image_tensor, torch.Tensor): | |
| image_array = image_tensor.squeeze().numpy() | |
| else: | |
| image_array = image_tensor | |
| # Convert to 8-bit | |
| image_8bit = (image_array * 255).astype(np.uint8) | |
| # Create PIL image | |
| pil_image = Image.fromarray(image_8bit, mode='L') # Grayscale | |
| # Generate output filename | |
| input_path = Path(dicom_result['file_path']) | |
| output_path = input_path.with_suffix(f'.{output_format}') | |
| # Save image | |
| pil_image.save(output_path) | |
| logger.info(f"Converted DICOM to {output_format}: {output_path}") | |
| return str(output_path) | |
| except Exception as e: | |
| logger.error(f"Error converting DICOM to {output_format}: {e}") | |
| return None | |
| def get_dicom_statistics(self, dicom_results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Get statistics from processed DICOM files""" | |
| if not dicom_results: | |
| return {} | |
| try: | |
| modalities = [r['metadata'].get('modality', 'Unknown') for r in dicom_results] | |
| file_sizes = [r.get('file_size_mb', 0) for r in dicom_results] | |
| stats = { | |
| 'total_files': len(dicom_results), | |
| 'modalities': list(set(modalities)), | |
| 'modality_counts': {mod: modalities.count(mod) for mod in set(modalities)}, | |
| 'total_size_mb': sum(file_sizes), | |
| 'average_size_mb': np.mean(file_sizes) if file_sizes else 0, | |
| 'size_range_mb': (min(file_sizes), max(file_sizes)) if file_sizes else (0, 0) | |
| } | |
| return stats | |
| except Exception as e: | |
| logger.error(f"Error calculating DICOM statistics: {e}") | |
| return {} | |