frankenstein / dataprocesser /step2_create_segmentation.py
zy7_oldserver
1
fd601de
from dataprocesser.dataset_anish import list_img_ad_from_anish_csv
from dataprocesser.dataset_synthrad import list_img_pID_from_synthrad_folder
from dataprocesser.dataset_anika import pair_list_from_anika_dataset
from dataprocesser.dataset_anika import all_list_from_anika_dataset
from dataprocesser.dataset_combined_csv import list_img_seg_ad_pIDs_from_new_simplified_csv
from dataprocesser.dataset_anika import all_list_from_anika_dataset_include_duplicate
from dataprocesser.dataset_dominik import all_list_from_dominik_dataset
from dataprocesser.dataset_xcat import list_img_pID_from_XCAT_folder
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
def run(target_file_list=None, task='total', dataset='synthrad', device="gpu"):
if target_file_list is None:
target_file_list=create_dataset_list(dataset)
multi_label_image=False if dataset == 'anika_newsynthetic' else True
multi_label_image=False if dataset == 'synthesized' else True
create_segmentation(target_file_list, task, device, multi_label_image)
def create_segmentation(dataset_list, task='total', device="gpu", multi_label_image=True):
# task:
# total
# total_mr
# tissue_types_mr
import nibabel as nib
import nrrd
import numpy as np
from totalsegmentator.python_api import totalsegmentator
for sample in dataset_list:
input_path=sample
print(f'create segmentation mask for {input_path}')
if input_path.endswith('.nii') or input_path.endswith('.nii.gz'):
if task == 'tissue_types_mr' or task == 'tissue_types':
output_path=input_path.replace('.nii','_seg_tissue.nii')
else:
output_path=input_path.replace('.nii','_seg.nii')
input_img = nib.load(input_path)
elif input_path.endswith('.nrrd'):
if task == 'tissue_types_mr' or task == 'tissue_types':
output_path=input_path.replace('.nrrd','_seg_tissue.nii.gz')
else:
output_path=input_path.replace('.nrrd','_seg.nii.gz')
np_img, header = nrrd.read(input_path)
# Extract metadata for affine transformation
spacing = header.get('space directions', None)
if spacing is None:
spacing = np.eye(3) # Default to identity matrix if not available
else:
spacing = np.array(spacing)
origin = header.get('space origin', [0, 0, 0])
origin = np.array(origin)
affine = np.zeros([4,4])
affine[:3, :3] = spacing # Set voxel dimensions
affine[:3, 3] = origin # Set the origin
print('space directions', spacing)
print('space origin', origin)
print('affine', affine)
input_img = nib.Nifti1Image(np_img, affine)
totalsegmentator(input=input_img, output=output_path, task=task, fast=False, ml=multi_label_image, device=device)
print(f'segmentation mask is saved as {output_path}')
'''try:
pass
except:
print("An exception occurred")'''
from dataprocesser.step1_init_data_list import appart_img_and_seg
from dataprocesser.dataset_anika import (
all_list_from_anika_dataset,
extract_patientID_from_Anika_dataset,
all_list_from_anika_dataset_include_duplicate)
from dataprocesser.dataset_synthrad import list_img_pID_from_synthrad_folder
from dataprocesser.dataset_anish import list_img_seg_ad_pIDs_from_anish_csv
from dataprocesser.dataset_dominik import all_list_from_dominik_dataset
from dataprocesser.dataset_combined_csv import list_img_seg_ad_pIDs_from_new_simplified_csv
from dataprocesser.dataset_xcat import list_img_pID_from_XCAT_folder
def create_dataset_list(dataset='anika_all'):
def get_synthrad_files(data_dir, modality, saved_name):
return list_img_pID_from_synthrad_folder(data_dir, accepted_modalities=modality, saved_name=saved_name)[0]
def get_anika_pairs(ct_dir, mri_dir, mri_mode='t1_vibe_in'):
matched_pairs = all_list_from_anika_dataset(ct_dir, mri_dir, mri_mode)
return [paths['CT'] for paths in matched_pairs.values()], [paths['MRI'] for paths in matched_pairs.values()]
def load_synthetic_folder(synthetic_folder, extract_id_func):
assert os.path.isdir(synthetic_folder), f'{synthetic_folder} is not a valid directory'
images, patient_IDs = [], []
for roots, _, files in sorted(os.walk(synthetic_folder)):
for file in files:
if "seg_volume" not in file:
path = os.path.join(roots, file)
patient_IDs.append(extract_id_func(path))
images.append(path)
return images
dataset_handlers = {
'anish': lambda: list_img_seg_ad_pIDs_from_new_simplified_csv(
r'E:\Projects\yang_proj\SynthRad_GAN\synthrad_conversion\datacsv\healthy_dissec_newserver_new.csv'
)[3],
'synthrad_ct': lambda: get_synthrad_files(r'E:\Projects\yang_proj\data\synthrad\Task1\pelvis', 'ct', 'target_filenames.txt'),
'synthrad_mr': lambda: get_synthrad_files(r'E:\Projects\yang_proj\data\synthrad\Task1_val\pelvis', 'mr', 'target_filenames.txt'),
'anika': lambda: get_anika_pairs(
r'E:\Projects\yang_proj\data\anika\CT',
r'E:\Projects\yang_proj\data\anika\MR_registrated'
)[0],
'anika_all_ct': lambda: appart_img_and_seg(
all_list_from_anika_dataset_include_duplicate(
r'E:\Projects\yang_proj\data\anika\CT',
r'E:\Projects\yang_proj\data\anika\MR_registrated'
)[0]
)[0],
'anika_all_mr': lambda: appart_img_and_seg(
all_list_from_anika_dataset_include_duplicate(
r'E:\Projects\yang_proj\data\anika\CT',
r'E:\Projects\yang_proj\data\anika\MR_registrated'
)[1]
)[0],
'dominik': lambda: all_list_from_dominik_dataset(r'E:\Projects\yang_proj\data\Dominik_MR_VIBE'),
'xcat_ct': lambda: list_img_pID_from_XCAT_folder(
r'E:\Projects\chen_proj\aorta_XCAT2CT\CT_XCAT_aorta', saved_name=None
)[0],
'anika_newsynthetic': lambda: appart_img_and_seg(load_synthetic_folder(
r'E:\Projects\yang_proj\data\anika\new_synthetic',
lambda path: os.path.basename(os.path.dirname(path))
))[0],
'synthesized': lambda: load_synthetic_folder(
r'E:\Projects\yang_proj\data\ddpm_anika_more_512_for_2_seg\volume_output',
lambda path: '_'.join(os.path.basename(path).split('_')[:2])
)
}
if dataset not in dataset_handlers:
raise ValueError(f"Unsupported dataset '{dataset}', please choose an available one!")
return dataset_handlers[dataset]()
if __name__=='__main__':
run()