Spaces:
Sleeping
Sleeping
File size: 4,930 Bytes
a29fdb5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | import pydicom
import logging
import hashlib
from typing import Tuple, Dict, Any, Optional
from pathlib import Path
import os
import io
logger = logging.getLogger(__name__)
# Mandatory DICOM Tags for Medical Validity
REQUIRED_TAGS = [
'PatientID',
'StudyInstanceUID',
'SeriesInstanceUID',
'Modality',
'PixelSpacing', # Crucial for measurements
# 'ImageOrientationPatient' # Often missing in simple CR/DX, but critical for CT/MRI
]
# Tags to Anonymize (PHI)
PHI_TAGS = [
'PatientName',
'PatientBirthDate',
'PatientAddress',
'InstitutionName',
'ReferringPhysicianName'
]
def validate_dicom(file_bytes: bytes) -> pydicom.dataset.FileDataset:
"""
Strict validation of DICOM file.
Raises ValueError if invalid.
"""
try:
# 1. Parse without loading pixel data first (speed)
ds = pydicom.dcmread(io.BytesIO(file_bytes), stop_before_pixels=False)
except Exception as e:
raise ValueError(f"Invalid DICOM format: {str(e)}")
# 2. Check Mandatory Tags
missing_tags = [tag for tag in REQUIRED_TAGS if tag not in ds]
if missing_tags:
# Modality specific relaxation could go here, but strict for now
raise ValueError(f"Missing critical DICOM tags: {missing_tags}")
# 3. Check Pixel Data presence
if 'PixelData' not in ds:
raise ValueError("DICOM file has no image data (PixelData missing).")
return ds
def anonymize_dicom(ds: pydicom.dataset.FileDataset) -> pydicom.dataset.FileDataset:
"""
Remove PHI from dataset.
Returns modified dataset.
"""
# Hash PatientID to keep linkable anonymous ID
original_id = str(ds.get('PatientID', 'Unknown'))
hashed_id = hashlib.sha256(original_id.encode()).hexdigest()[:16].upper()
ds.PatientID = f"ANON-{hashed_id}"
# Wipe other fields
for tag in PHI_TAGS:
if tag in ds:
ds.data_element(tag).value = "ANONYMIZED"
return ds
def process_dicom_upload(file_bytes: bytes, username: str) -> Tuple[bytes, Dict[str, Any]]:
"""
Main Gateway Function: Validate -> Anonymize -> Return Bytes & Metadata
"""
# 1. Validate
try:
ds = validate_dicom(file_bytes)
except Exception as e:
logger.error(f"DICOM Validation Failed: {e}")
raise ValueError(f"DICOM Rejected: {e}")
# 2. Anonymize
ds = anonymize_dicom(ds)
# 3. Extract safe metadata for Indexing
metadata = {
"modality": ds.get("Modality", "Unknown"),
"body_part": ds.get("BodyPartExamined", "Unknown"),
"study_uid": str(ds.get("StudyInstanceUID", "")),
"series_uid": str(ds.get("SeriesInstanceUID", "")),
"pixel_spacing": ds.get("PixelSpacing", [1.0, 1.0]),
"original_filename_hint": "dicom_file.dcm" # We generally lose original filename in API
}
# 4. Convert back to bytes for storage
# We save the ANONYMIZED version
with io.BytesIO() as buffer:
ds.save_as(buffer)
safe_bytes = buffer.getvalue()
return safe_bytes, metadata
def convert_dicom_to_image(ds: pydicom.dataset.FileDataset) -> Any:
"""
Convert DICOM to PIL Image / Numpy array for inference.
Handles Hounsfield Units (HU) and Windowing if CT.
"""
import numpy as np
from PIL import Image
try:
# Start with raw pixel array
pixel_array = ds.pixel_array.astype(float)
# Rescale Slope/Intercept (Hounsfield Units)
slope = getattr(ds, 'RescaleSlope', 1)
intercept = getattr(ds, 'RescaleIntercept', 0)
pixel_array = (pixel_array * slope) + intercept
# Windowing (Basic Auto-Windowing if not specified)
# Improvement: Use window center/width from tags if available
# window_center = ds.get("WindowCenter", ... )
# Normalize to 0-255 for standard Vision Models (unless model expects HU)
# For CLIP/Vision models trained on PNGs, 0-255 is safe
pixel_min = np.min(pixel_array)
pixel_max = np.max(pixel_array)
if pixel_max - pixel_min != 0:
pixel_array = ((pixel_array - pixel_min) / (pixel_max - pixel_min)) * 255.0
else:
pixel_array = np.zeros_like(pixel_array)
pixel_array = pixel_array.astype(np.uint8)
# Handle Color Space (Monochrome usually)
if len(pixel_array.shape) == 2:
image = Image.fromarray(pixel_array).convert("RGB")
else:
image = Image.fromarray(pixel_array) # RGB already?
return image
except Exception as e:
logger.error(f"DICOM Conversion Error: {e}")
raise ValueError(f"Could not convert DICOM to image: {e}")
|