|
|
import zipfile |
|
|
|
|
|
from nnunetv2.utilities.file_path_utilities import * |
|
|
|
|
|
|
|
|
def export_pretrained_model(dataset_name_or_id: Union[int, str], output_file: str, |
|
|
configurations: Tuple[str] = ("2d", "3d_lowres", "3d_fullres", "3d_cascade_fullres"), |
|
|
trainer: str = 'nnUNetTrainer', |
|
|
plans_identifier: str = 'nnUNetPlans', |
|
|
folds: Tuple[int, ...] = (0, 1, 2, 3, 4), |
|
|
strict: bool = True, |
|
|
save_checkpoints: Tuple[str, ...] = ('checkpoint_final.pth',), |
|
|
export_crossval_predictions: bool = False) -> None: |
|
|
dataset_name = maybe_convert_to_dataset_name(dataset_name_or_id) |
|
|
with(zipfile.ZipFile(output_file, 'w', zipfile.ZIP_DEFLATED)) as zipf: |
|
|
for c in configurations: |
|
|
print(f"Configuration {c}") |
|
|
trainer_output_dir = get_output_folder(dataset_name, trainer, plans_identifier, c) |
|
|
|
|
|
if not isdir(trainer_output_dir): |
|
|
if strict: |
|
|
raise RuntimeError(f"{dataset_name} is missing the trained model of configuration {c}") |
|
|
else: |
|
|
continue |
|
|
|
|
|
expected_fold_folder = [f"fold_{i}" if i != 'all' else 'fold_all' for i in folds] |
|
|
assert all([isdir(join(trainer_output_dir, i)) for i in expected_fold_folder]), \ |
|
|
f"not all requested folds are present; {dataset_name} {c}; requested folds: {folds}" |
|
|
|
|
|
assert isfile(join(trainer_output_dir, "plans.json")), f"plans.json missing, {dataset_name} {c}" |
|
|
|
|
|
for fold_folder in expected_fold_folder: |
|
|
print(f"Exporting {fold_folder}") |
|
|
|
|
|
source_file = join(trainer_output_dir, fold_folder, "debug.json") |
|
|
if isfile(source_file): |
|
|
zipf.write(source_file, os.path.relpath(source_file, nnUNet_results)) |
|
|
|
|
|
|
|
|
for chk in save_checkpoints: |
|
|
source_file = join(trainer_output_dir, fold_folder, chk) |
|
|
zipf.write(source_file, os.path.relpath(source_file, nnUNet_results)) |
|
|
|
|
|
|
|
|
source_file = join(trainer_output_dir, fold_folder, "progress.png") |
|
|
zipf.write(source_file, os.path.relpath(source_file, nnUNet_results)) |
|
|
|
|
|
|
|
|
source_file = join(trainer_output_dir, fold_folder, "network_architecture.pdf") |
|
|
if isfile(source_file): |
|
|
zipf.write(source_file, os.path.relpath(source_file, nnUNet_results)) |
|
|
|
|
|
|
|
|
if export_crossval_predictions: |
|
|
source_folder = join(trainer_output_dir, fold_folder, "validation") |
|
|
files = [i for i in subfiles(source_folder, join=False) if not i.endswith('.npz') and not i.endswith('.pkl')] |
|
|
for f in files: |
|
|
zipf.write(join(source_folder, f), os.path.relpath(join(source_folder, f), nnUNet_results)) |
|
|
|
|
|
else: |
|
|
source_file = join(trainer_output_dir, fold_folder, "validation", "summary.json") |
|
|
zipf.write(source_file, os.path.relpath(source_file, nnUNet_results)) |
|
|
|
|
|
source_folder = join(trainer_output_dir, f'crossval_results_folds_{folds_tuple_to_string(folds)}') |
|
|
if isdir(source_folder): |
|
|
if export_crossval_predictions: |
|
|
source_files = subfiles(source_folder, join=True) |
|
|
else: |
|
|
source_files = [ |
|
|
join(trainer_output_dir, f'crossval_results_folds_{folds_tuple_to_string(folds)}', i) for i in |
|
|
['summary.json', 'postprocessing.pkl', 'postprocessing.json'] |
|
|
] |
|
|
for s in source_files: |
|
|
if isfile(s): |
|
|
zipf.write(s, os.path.relpath(s, nnUNet_results)) |
|
|
|
|
|
source_file = join(trainer_output_dir, "plans.json") |
|
|
zipf.write(source_file, os.path.relpath(source_file, nnUNet_results)) |
|
|
|
|
|
source_file = join(trainer_output_dir, "dataset_fingerprint.json") |
|
|
zipf.write(source_file, os.path.relpath(source_file, nnUNet_results)) |
|
|
|
|
|
source_file = join(trainer_output_dir, "dataset.json") |
|
|
zipf.write(source_file, os.path.relpath(source_file, nnUNet_results)) |
|
|
|
|
|
ensemble_dir = join(nnUNet_results, dataset_name, 'ensembles') |
|
|
|
|
|
if not isdir(ensemble_dir): |
|
|
print("No ensemble directory found for task", dataset_name_or_id) |
|
|
return |
|
|
subd = subdirs(ensemble_dir, join=False) |
|
|
|
|
|
for ens in subd: |
|
|
identifiers, folds = convert_ensemble_folder_to_model_identifiers_and_folds(ens) |
|
|
ok = True |
|
|
for i in identifiers: |
|
|
tr, pl, c = convert_identifier_to_trainer_plans_config(i) |
|
|
if tr == trainer and pl == plans_identifier and c in configurations: |
|
|
pass |
|
|
else: |
|
|
ok = False |
|
|
if ok: |
|
|
print(f'found matching ensemble: {ens}') |
|
|
source_folder = join(ensemble_dir, ens) |
|
|
if export_crossval_predictions: |
|
|
source_files = subfiles(source_folder, join=True) |
|
|
else: |
|
|
source_files = [ |
|
|
join(source_folder, i) for i in |
|
|
['summary.json', 'postprocessing.pkl', 'postprocessing.json'] if isfile(join(source_folder, i)) |
|
|
] |
|
|
for s in source_files: |
|
|
zipf.write(s, os.path.relpath(s, nnUNet_results)) |
|
|
inference_information_file = join(nnUNet_results, dataset_name, 'inference_information.json') |
|
|
if isfile(inference_information_file): |
|
|
zipf.write(inference_information_file, os.path.relpath(inference_information_file, nnUNet_results)) |
|
|
inference_information_txt_file = join(nnUNet_results, dataset_name, 'inference_information.txt') |
|
|
if isfile(inference_information_txt_file): |
|
|
zipf.write(inference_information_txt_file, os.path.relpath(inference_information_txt_file, nnUNet_results)) |
|
|
print('Done') |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
export_pretrained_model(2, '/home/fabian/temp/dataset2.zip', strict=False, export_crossval_predictions=True, folds=(0, )) |
|
|
|