Spaces:
Sleeping
Sleeping
| import nibabel as nib | |
| import numpy as np | |
| from constants import Constants | |
| from werkzeug.datastructures import MultiDict | |
| import scipy.ndimage as ndimage | |
| import os, sys | |
| import tempfile | |
| from scipy.ndimage import label | |
| import pathlib | |
| from openpyxl import load_workbook | |
| import json | |
| def get_panTS_id(index): | |
| cur_case_id = str(index) | |
| iter = max(0, 8 - len(str(index))) | |
| for _ in range(iter): | |
| cur_case_id = "0" + cur_case_id | |
| cur_case_id = "PanTS_" + cur_case_id | |
| return cur_case_id | |
| def has_large_connected_component(slice_mask, threshold=8): | |
| """ | |
| Check if there is a connected component larger than a threshold in a 2D mask. | |
| """ | |
| labeled, num_features = label(slice_mask) | |
| sizes = np.bincount(labeled.ravel()) | |
| sizes[0] = 0 # ignore background | |
| return np.any(sizes > threshold) | |
| class NpzProcessor: | |
| def __init__(self, main_npz_path=None, clabel_path=None, organ_intensities=None): | |
| self._main_nifti_path = main_npz_path | |
| self._clabel_path = clabel_path | |
| self.number_max = 999999 | |
| self._organ_intensities = organ_intensities | |
| def set_organ_intensities(self, organ_intensities): | |
| self._organ_intensities = organ_intensities | |
| def from_clabel_path(cls, clabel_path): | |
| return cls(None, clabel_path) | |
| # not used | |
| def calculate_mean_hu_with_erosion(self, binary_mask, ct_array): | |
| """ | |
| Calculate mean HU using erosion to avoid edge noise. | |
| """ | |
| erosion_array = ndimage.binary_erosion(binary_mask, structure=Constants.STRUCTURING_ELEMENT) | |
| hu_values = ct_array[erosion_array > 0] | |
| if hu_values.size == 0: | |
| hu_values = ct_array[binary_mask > 0] | |
| if hu_values.size == 0: | |
| return 0 | |
| return round(float(np.mean(hu_values)), Constants.DECIMAL_PRECISION_HU) | |
| def npz_to_nifti(self, id: int, combined_label=True, save=True, path=None): | |
| subfolder = "LabelTr" if id < 9000 else "LabelTe" | |
| image_subfolder = "ImageTe" if id >= 9000 else "ImageTr" | |
| if combined_label and path is None: | |
| dir_path = pathlib.Path(f"{Constants.PANTS_PATH}/data/{subfolder}/{get_panTS_id(id)}/{Constants.COMBINED_LABELS_FILENAME}") | |
| else: | |
| dir_path = pathlib.Path(path) | |
| nifti_path = pathlib.Path(f"{Constants.PANTS_PATH}/data/{image_subfolder}/{get_panTS_id(id)}/{Constants.MAIN_NIFTI_FILENAME}") | |
| nifti_dat = nib.load(nifti_path) | |
| arr = np.load(dir_path)["data"].astype(np.float32) | |
| img = nib.nifti1.Nifti1Image(arr, affine=nifti_dat.affine, header=nifti_dat.header) | |
| nib.save(img, dir_path.with_suffix(".nii.gz")) | |
| def combine_labels(self, id: int, keywords={"pancrea": "pancreas"}, save=True): | |
| """ | |
| Merge multiple label masks into one combined segmentation and re-index the labels. | |
| """ | |
| organ_intensities = {} | |
| segment_subfolder = "LabelTr" | |
| if id >= 9000: | |
| segment_subfolder = "LabelTe" | |
| image_subfolder = "ImageTe" if id >= 9000 else "ImageTr" | |
| nifti_path = pathlib.Path(f"{Constants.PANTS_PATH}/data/{image_subfolder}/{get_panTS_id(id)}/{Constants.MAIN_NIFTI_FILENAME}") | |
| nifti_dat = nib.load(nifti_path) | |
| dir_path = pathlib.Path(f"{Constants.PANTS_PATH}/data/{segment_subfolder}/{get_panTS_id(id)}/segmentations") | |
| npz_files = list(dir_path.glob("*.npz")) | |
| combined_labels_img_data = None | |
| keyword_dict = {organ: None for organ in keywords.values()} | |
| for i, file in enumerate(npz_files): | |
| filename = file.name | |
| data = np.load(file)["data"] | |
| if combined_labels_img_data is None: | |
| combined_labels_img_data = np.ndarray(shape=data.shape, dtype=np.float64) | |
| matched = False | |
| for substring, organ in keywords.items(): | |
| if substring in filename: | |
| if keyword_dict[organ] is None: | |
| keyword_dict[organ] = np.ndarray(shape=data.shape, dtype=np.float64) | |
| scaled = data * np.float64(i + 1) | |
| keyword_dict[organ] = np.maximum(keyword_dict[organ], scaled) | |
| combined_labels_img_data = np.maximum(combined_labels_img_data, scaled) | |
| organ_intensities[organ] = i + 1 | |
| matched = True | |
| break | |
| if not matched: # no keyword match, still add to combined | |
| scaled = data * np.float64(i + 1) | |
| combined_labels_img_data = np.maximum(combined_labels_img_data, scaled) | |
| organ_intensities[filename] = i + 1 | |
| if save: | |
| # save each organ-specific file | |
| for organ, data in keyword_dict.items(): | |
| if data is not None: | |
| save_path = ( | |
| f"{Constants.PANTS_PATH}/data/{segment_subfolder}/{get_panTS_id(id)}/segmentations/{organ}.nii.gz" | |
| ) | |
| img = nib.Nifti1Image(data, affine=nifti_dat.affine, header=nifti_dat.header) | |
| nib.save(img, save_path) | |
| # save combined labels | |
| if combined_labels_img_data is not None: | |
| save_path = ( | |
| f"{Constants.PANTS_PATH}/data/{segment_subfolder}/{get_panTS_id(id)}/{Constants.COMBINED_LABELS_FILENAME}" | |
| ) | |
| np.savez_compressed(save_path, data=combined_labels_img_data) | |
| # save organ intensities | |
| organ_save_path = ( | |
| f"{Constants.PANTS_PATH}/data/{segment_subfolder}/{get_panTS_id(id)}/{Constants.ORGAN_INTENSITIES_FILENAME}" | |
| ) | |
| with open(organ_save_path, "w") as f: | |
| json.dump(organ_intensities, f) | |
| return combined_labels_img_data, organ_intensities | |
| def nifti_combine_labels(self, id: int, keywords: dict[str, str] = {"pancrea": "pancreas"}, save=True): | |
| """ | |
| Merge multiple NIfTI label masks into one combined segmentation and re-index the labels. | |
| """ | |
| organ_intensities = {} | |
| segment_subfolder = "LabelTr" if id < 9000 else "LabelTe" | |
| image_subfolder = "ImageTr" if id < 9000 else "ImageTe" | |
| # load main reference image (for affine/header) | |
| nifti_path = pathlib.Path( | |
| f"{Constants.PANTS_PATH}/data/{image_subfolder}/{get_panTS_id(id)}/{Constants.MAIN_NIFTI_FILENAME}" | |
| ) | |
| base_nifti = nib.load(nifti_path) | |
| # folder containing NIfTI segmentations | |
| dir_path = pathlib.Path( | |
| f"{Constants.PANTS_PATH}/data/{segment_subfolder}/{get_panTS_id(id)}/segmentations" | |
| ) | |
| nii_files = list(dir_path.glob("*.nii*")) | |
| if not nii_files: | |
| raise FileNotFoundError(f"No NIfTI label files found in {dir_path}") | |
| combined_labels = None | |
| keyword_dict = {organ: None for organ in keywords.values()} | |
| for i, file in enumerate(sorted(nii_files)): | |
| filename = file.name | |
| nii = nib.load(file) | |
| data = nii.get_fdata() | |
| if combined_labels is None: | |
| combined_labels = np.zeros_like(data, dtype=np.float64) | |
| matched = False | |
| for substring, organ in keywords.items(): | |
| if substring.lower() in filename.lower(): | |
| if keyword_dict[organ] is None: | |
| keyword_dict[organ] = np.zeros_like(data, dtype=np.float64) | |
| scaled = data * float(i + 1) | |
| keyword_dict[organ] = np.maximum(keyword_dict[organ], scaled) | |
| combined_labels = np.maximum(combined_labels, scaled) | |
| organ_intensities[organ] = i + 1 | |
| matched = True | |
| break | |
| if not matched: | |
| scaled = data * float(i + 1) | |
| combined_labels = np.maximum(combined_labels, scaled) | |
| organ_intensities[filename] = i + 1 | |
| if save: | |
| # save each organ-specific mask | |
| for organ, data in keyword_dict.items(): | |
| if data is not None: | |
| save_path = ( | |
| f"{Constants.PANTS_PATH}/data/{segment_subfolder}/{get_panTS_id(id)}/segmentations/{organ}.nii.gz" | |
| ) | |
| img = nib.Nifti1Image(data, affine=base_nifti.affine, header=base_nifti.header) | |
| nib.save(img, save_path) | |
| # save combined mask as NIfTI | |
| if combined_labels is not None: | |
| save_path = ( | |
| f"{Constants.PANTS_PATH}/data/{segment_subfolder}/{get_panTS_id(id)}/{Constants.COMBINED_LABELS_NIFTI_FILENAME}" | |
| ) | |
| img = nib.Nifti1Image(combined_labels, affine=base_nifti.affine, header=base_nifti.header) | |
| nib.save(img, save_path) | |
| # save organ intensity mapping | |
| organ_save_path = ( | |
| f"{Constants.PANTS_PATH}/data/{segment_subfolder}/{get_panTS_id(id)}/{Constants.ORGAN_INTENSITIES_FILENAME}" | |
| ) | |
| with open(organ_save_path, "w") as f: | |
| json.dump(organ_intensities, f, indent=2) | |
| return combined_labels, organ_intensities |