| import os, glob, shutil, json |
| from pathlib import Path |
| import SimpleITK as sitk |
| import numpy as np |
| from concurrent.futures import ThreadPoolExecutor |
| from tqdm import tqdm |
|
|
| def save_json(config, config_path=None): |
| |
| if not config_path: |
| config_path = Path(config['save_path']) |
| with open(config_path, 'w') as f: |
| json.dump(config, f, indent=4) |
|
|
|
|
| def set_nnunet_path(nnunet_root, config=None): |
| os.environ["nnUNet_raw"] = f"{nnunet_root}/raw" |
| os.environ["nnUNet_preprocessed"] = f"{nnunet_root}/preprocessed" |
| os.environ["nnUNet_results"] = f"{nnunet_root}/results" |
| if config: |
| config["nnUNet_raw"] = os.environ["nnUNet_raw"] |
| config["nnUNet_preprocessed"] = os.environ["nnUNet_preprocessed"] |
| config["nnUNet_results"] = os.environ["nnUNet_results"] |
| save_json(config) |
|
|
|
|
|
|
| def makedirs_raw_dataset(dataset_data_path): |
| |
| os.makedirs(dataset_data_path, exist_ok = True) |
| os.makedirs(os.path.join(dataset_data_path, 'imagesTr'), exist_ok=True) |
| os.makedirs(os.path.join(dataset_data_path, 'labelsTr'), exist_ok = True) |
|
|
| def get_classes_to_use(region:str): |
| classes_to_use = { |
| "AB": [ |
| 2, |
| 3, |
| 5, |
| 6, |
| *range(10, 14+1), |
| *range(26, 50+1), |
| 51, |
| 79, |
| *range(92, 115+1), |
| 116 |
| ], |
| "HN": [ |
| 15, |
| 16, |
| 17, |
| *range(26, 50+1), |
| 79, |
| 90, |
| 91, |
| ], |
| "TH": [ |
| 2, |
| 3, |
| 5, |
| 6, |
| *range(10, 14+1), |
| *range(26, 50+1), |
| 51, |
| 79, |
| *range(92, 115+1), |
| 116 |
| ] |
| } |
| return classes_to_use[region] |
|
|
| def process_segmentation_file(data_path, dataset_path,label_to_use: list): |
| filename = data_path.split(os.sep)[-2] |
| if not filename.endswith('.mha'): |
| filename = filename + '.mha' |
| shutil.copy(data_path, os.path.join(dataset_path, f'labelsTr/{filename}')) |
|
|
| |
| seg_image = sitk.ReadImage(os.path.join(dataset_path, f'labelsTr/{filename}')) |
| seg_array = sitk.GetArrayFromImage(seg_image) |
| new_seg_array = np.zeros_like(seg_array, dtype=seg_array.dtype) |
| new_seg_array[seg_array != 0] = -1 |
| for i, label in enumerate(label_to_use): |
| new_seg_array[seg_array == label] = i + 1 |
| |
| |
| |
| |
| filtered_seg_image = sitk.GetImageFromArray(new_seg_array) |
| filtered_seg_image.CopyInformation(seg_image) |
| sitk.WriteImage(filtered_seg_image, os.path.join(dataset_path, f'labelsTr/{filename}')) |
|
|
| def process_file(data_path, dataset_path, modality_suffix="_0000"): |
| curr_img = sitk.ReadImage(data_path) |
| filename = data_path.split(os.sep)[-2] |
| if not filename.endswith(f'{modality_suffix}.mha'): |
| filename = filename + f'{modality_suffix}.mha' |
| sitk.WriteImage(curr_img, os.path.join(dataset_path, f'imagesTr/{filename}')) |
|
|
| data = sitk.GetArrayFromImage(curr_img) |
| data = np.ones_like(data) |
|
|
| filename = filename.replace(modality_suffix, '') |
| label_path = os.path.join(dataset_path, f'labelsTr/{filename}') |
| if not os.path.exists(label_path): |
| label_img = sitk.GetImageFromArray(data) |
| label_img.SetDirection(curr_img.GetDirection()) |
| label_img.SetOrigin(curr_img.GetOrigin()) |
| label_img.SetSpacing(curr_img.GetSpacing()) |
| sitk.WriteImage(label_img, label_path) |
|
|
| |
|
|
| def create_dataset_json(config, preprocessing, dataset_data_path): |
| data_dataset_json = { |
| "labels": { |
| "label_001": "1", |
| "background": 0 |
| }, |
| "channel_names": { |
| "0": preprocessing, |
| |
| |
| }, |
| "numTraining": config['num_train'], |
| "file_ending": ".mha" |
| } |
| dump_data_datasets_path = os.path.join(dataset_data_path, 'dataset.json') |
| with open(dump_data_datasets_path, 'w') as f: |
| json.dump(data_dataset_json, f) |
|
|
| def move_preprocessed(nnunet_datas_preprocessed_dir, nnunet_targets_preprocessed_dir, folder_name): |
| list_preprocessed_datas_seg_path = sorted(glob.glob(os.path.join(nnunet_targets_preprocessed_dir, f'{folder_name}/*_seg.npy'))) |
| list_preprocessed_targets_path = sorted(glob.glob(os.path.join(nnunet_datas_preprocessed_dir, f'{folder_name}/*.npy'))) |
| list_preprocessed_targets_path = [name for name in list_preprocessed_targets_path if '_seg' not in name] |
|
|
| assert len(list_preprocessed_datas_seg_path) == len(list_preprocessed_targets_path) |
| assert len(list_preprocessed_datas_seg_path) > 0, "No preprocessed data found in the specified directory." |
|
|
| for (datas_path, targets_path) in zip(list_preprocessed_datas_seg_path, list_preprocessed_targets_path): |
| print(targets_path, "->", datas_path) |
| shutil.copy(src = targets_path, dst = datas_path) |
|
|
| def move_preprocessed_mask(nnunet_datas_preprocessed_dir, nnunet_targets_preprocessed_dir, folder_name): |
| list_preprocessed_targets_path = sorted(glob.glob(os.path.join(nnunet_datas_preprocessed_dir, f'{folder_name}/*_seg.npy'))) |
|
|
| assert len(list_preprocessed_targets_path) > 0, "No preprocessed data found in the specified directory." |
|
|
| for targets_path in list_preprocessed_targets_path: |
| datas_path = os.path.join(nnunet_targets_preprocessed_dir, folder_name, os.path.basename(targets_path).replace('_seg', '_mask')) |
| print(targets_path, "->", datas_path) |
| shutil.copy(src = targets_path, dst = datas_path) |
|
|
| def move_gt_segmentations(dataset_target_path, nnunet_targets_preprocessed_dir): |
| list_targets = glob.glob(os.path.join(f"{dataset_target_path}/imagesTr", '*')) |
| list_targets.sort() |
| list_gt_segmentations_datas = glob.glob(os.path.join(f"{nnunet_targets_preprocessed_dir}/gt_segmentations", '*')) |
| list_gt_segmentations_datas.sort() |
|
|
| print(nnunet_targets_preprocessed_dir) |
|
|
| for (preprocessed_path, gt_path) in zip(list_targets, list_gt_segmentations_datas): |
| |
| print(preprocessed_path, "->", gt_path) |
| shutil.copy(src = preprocessed_path, dst = gt_path) |
|
|
| def move_masks(list_data_mask, dataset_mask_path): |
|
|
| def _process_mask_file(data_path, dataset_mask_path): |
|
|
| filename = data_path.split(os.sep)[-2] |
| if not filename.endswith(f'.mha'): |
| filename = filename + f'.mha' |
| shutil.copy(data_path, os.path.join(dataset_mask_path, filename)) |
|
|
|
|
| |
| os.makedirs(dataset_mask_path, exist_ok=True) |
| with ThreadPoolExecutor() as executor: |
| list(tqdm(executor.map(lambda data_path: _process_mask_file(data_path, dataset_mask_path), list_data_mask), total=len(list_data_mask))) |
|
|
| def move_gt_target(list_data_ct, dataset_target_path2): |
|
|
| def _process_target_file(data_path, dataset_target_path2): |
|
|
| filename = data_path.split(os.sep)[-2] |
| if not filename.endswith(f'.mha'): |
| filename = filename + f'.mha' |
| shutil.copy(data_path, os.path.join(dataset_target_path2, filename)) |
|
|
|
|
| |
| os.makedirs(dataset_target_path2, exist_ok=True) |
| with ThreadPoolExecutor() as executor: |
| list(tqdm(executor.map(lambda data_path: _process_target_file(data_path, dataset_target_path2), list_data_ct), total=len(list_data_ct))) |
|
|
| def modify_plan(plan_file_path, old_config_name, new_config_name, attribute_to_change, new_value): |
| ''' |
| Making changes to preprocessing including patch sizes |
| ''' |
|
|
| new_config = { |
| "inherits_from": old_config_name, |
| "data_identifier": new_config_name, |
| attribute_to_change: new_value |
| } |
| with open(plan_file_path) as f: |
| plan = json.load(f) |
| plan['configurations'][new_config_name] = new_config |
| with open(plan_file_path, 'w') as f: |
| json.dump(plan, f, indent=4) |
| print(f'The plan {plan_file_path} has been updated. ') |
|
|