import os import shutil from pathlib import Path from typing import List from batchgenerators.utilities.file_and_folder_operations import nifti_files, join, maybe_mkdir_p, save_json from nnunetv2.dataset_conversion.generate_dataset_json import generate_dataset_json from nnunetv2.paths import nnUNet_raw, nnUNet_preprocessed import numpy as np def make_out_dirs(dataset_id: int, task_name="ACDC"): dataset_name = f"Dataset{dataset_id:03d}_{task_name}" out_dir = Path(nnUNet_raw.replace('"', "")) / dataset_name out_train_dir = out_dir / "imagesTr" out_labels_dir = out_dir / "labelsTr" out_test_dir = out_dir / "imagesTs" os.makedirs(out_dir, exist_ok=True) os.makedirs(out_train_dir, exist_ok=True) os.makedirs(out_labels_dir, exist_ok=True) os.makedirs(out_test_dir, exist_ok=True) return out_dir, out_train_dir, out_labels_dir, out_test_dir def create_ACDC_split(labelsTr_folder: str, seed: int = 1234) -> List[dict[str, List]]: # labelsTr_folder = '/home/isensee/drives/gpu_data_root/OE0441/isensee/nnUNet_raw/nnUNet_raw_remake/Dataset027_ACDC/labelsTr' nii_files = nifti_files(labelsTr_folder, join=False) patients = np.unique([i[:len('patient000')] for i in nii_files]) rs = np.random.RandomState(seed) rs.shuffle(patients) splits = [] for fold in range(5): val_patients = patients[fold::5] train_patients = [i for i in patients if i not in val_patients] val_cases = [i[:-7] for i in nii_files for j in val_patients if i.startswith(j)] train_cases = [i[:-7] for i in nii_files for j in train_patients if i.startswith(j)] splits.append({'train': train_cases, 'val': val_cases}) return splits def copy_files(src_data_folder: Path, train_dir: Path, labels_dir: Path, test_dir: Path): """Copy files from the ACDC dataset to the nnUNet dataset folder. Returns the number of training cases.""" patients_train = sorted([f for f in (src_data_folder / "training").iterdir() if f.is_dir()]) patients_test = sorted([f for f in (src_data_folder / "testing").iterdir() if f.is_dir()]) num_training_cases = 0 # Copy training files and corresponding labels. for patient_dir in patients_train: for file in patient_dir.iterdir(): if file.suffix == ".gz" and "_gt" not in file.name and "_4d" not in file.name: # The stem is 'patient.nii', and the suffix is '.gz'. # We split the stem and append _0000 to the patient part. shutil.copy(file, train_dir / f"{file.stem.split('.')[0]}_0000.nii.gz") num_training_cases += 1 elif file.suffix == ".gz" and "_gt" in file.name: shutil.copy(file, labels_dir / file.name.replace("_gt", "")) # Copy test files. for patient_dir in patients_test: for file in patient_dir.iterdir(): if file.suffix == ".gz" and "_gt" not in file.name and "_4d" not in file.name: shutil.copy(file, test_dir / f"{file.stem.split('.')[0]}_0000.nii.gz") return num_training_cases def convert_acdc(src_data_folder: str, dataset_id=27): out_dir, train_dir, labels_dir, test_dir = make_out_dirs(dataset_id=dataset_id) num_training_cases = copy_files(Path(src_data_folder), train_dir, labels_dir, test_dir) generate_dataset_json( str(out_dir), channel_names={ 0: "cineMRI", }, labels={ "background": 0, "RV": 1, "MLV": 2, "LVC": 3, }, file_ending=".nii.gz", num_training_cases=num_training_cases, ) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument( "-i", "--input_folder", type=str, help="The downloaded ACDC dataset dir. Should contain extracted 'training' and 'testing' folders.", ) parser.add_argument( "-d", "--dataset_id", required=False, type=int, default=27, help="nnU-Net Dataset ID, default: 27" ) args = parser.parse_args() print("Converting...") convert_acdc(args.input_folder, args.dataset_id) dataset_name = f"Dataset{args.dataset_id:03d}_{'ACDC'}" labelsTr = join(nnUNet_raw, dataset_name, 'labelsTr') preprocessed_folder = join(nnUNet_preprocessed, dataset_name) maybe_mkdir_p(preprocessed_folder) split = create_ACDC_split(labelsTr) save_json(split, join(preprocessed_folder, 'splits_final.json'), sort_keys=False) print("Done!")