Spaces:
Running
Running
| from lungmask import mask | |
| import lungmask | |
| import SimpleITK as sitk | |
| import numpy as np | |
| import nibabel | |
| import platform | |
| import glob | |
| import torch | |
| import skimage | |
| from tqdm import tqdm | |
| from monai.transforms import Compose, LoadImaged, ToTensord, Spacingd, DivisiblePadd, SpatialCropd, ToNumpyd, AddChanneld, SqueezeDimd, Resized, Flipd, Rotate90d, NormalizeIntensityd, ThresholdIntensityd | |
| from Logger.loggingservice import Logger | |
| logger = Logger("http://82.194.207.154:5000/api/log", "XqnJHdalUd") | |
| directory_split = "\\" if platform.system() == "Windows" else "/" | |
| def find_probable_air_value(image): | |
| holder = np.copy(image) | |
| min_val = np.amin(holder) | |
| holder[holder == min_val] = float('inf') | |
| return np.amin(holder), min_val | |
| def mask_lung(scan_dict, batch_size=20): | |
| model = lungmask.mask.get_model('unet', 'R231') | |
| device = torch.device('cuda') | |
| model.to(device) | |
| transformer = Compose( | |
| [ | |
| LoadImaged(keys=['image']), | |
| ToNumpyd(keys=['image']), | |
| ] | |
| ) | |
| scan_read = transformer(scan_dict) | |
| inimg_raw = scan_read['image'].swapaxes(0,2) | |
| tvolslices, xnew_box = lungmask.utils.preprocess(inimg_raw, resolution=[256, 256]) | |
| tvolslices[tvolslices > 600] = 600 | |
| tvolslices = np.divide((tvolslices + 1024), 1624) | |
| torch_ds_val = lungmask.utils.LungLabelsDS_inf(tvolslices) | |
| dataloader_val = torch.utils.data.DataLoader(torch_ds_val, batch_size=batch_size, shuffle=False, num_workers=1, | |
| pin_memory=False) | |
| timage_res = np.empty((np.append(0, tvolslices[0].shape)), dtype=np.uint8) | |
| with torch.no_grad(): | |
| for X in tqdm(dataloader_val): | |
| X = X.float().to(device) | |
| prediction = model(X) | |
| pls = torch.max(prediction, 1)[1].detach().cpu().numpy().astype(np.uint8) | |
| timage_res = np.vstack((timage_res, pls)) | |
| outmask = lungmask.utils.postrocessing(timage_res) | |
| outmask = np.asarray( | |
| [lungmask.utils.reshape_mask(outmask[i], xnew_box[i], inimg_raw.shape[1:]) for i in range(outmask.shape[0])], | |
| dtype=np.uint8) | |
| outmask = np.swapaxes(outmask, 0, 2) | |
| #outmask = np.flip(outmask, 0) | |
| return outmask.astype(np.uint8), scan_read['image_meta_dict']['affine'] | |
| def segment_lung(image_path): | |
| sitk_image = sitk.ReadImage(image_path) | |
| segmentation = mask.apply(sitk_image, batch_size = 5, model = mask.get_model('unet', 'R231')) | |
| return segmentation | |
| def calculate_extremes(image, annotation_value): | |
| holder = np.copy(image) | |
| x_min = float('inf') | |
| x_max = 0 | |
| y_min = float('inf') | |
| y_max = 0 | |
| z_min = -1 | |
| z_max = 0 | |
| holder[holder != annotation_value] = 0 | |
| holder = np.swapaxes(holder, 0, 2) | |
| for i, layer in enumerate(holder): | |
| if(np.amax(layer) < 1): | |
| continue | |
| if(z_min == -1): | |
| z_min = i | |
| z_max = i | |
| y = np.any(layer, axis = 1) | |
| x = np.any(layer, axis = 0) | |
| y_minl, y_maxl = np.argmax(y) + 1, layer.shape[0] - np.argmax(np.flipud(y)) | |
| x_minl, x_maxl = np.argmax(x) + 1, layer.shape[1] - np.argmax(np.flipud(x)) | |
| if(y_minl < y_min): | |
| y_min = y_minl | |
| if(x_minl < x_min): | |
| x_min = x_minl | |
| if(y_maxl > y_max): | |
| y_max = y_maxl | |
| if(x_maxl > x_max): | |
| x_max = x_maxl | |
| return ((x_min, x_max), (y_min, y_max), (z_min, z_max)) | |
| def process_lung_scan(scan_dict, save_directory, extremes, lung): | |
| load_transformer = Compose( | |
| [ | |
| LoadImaged(keys=["image", "label", "boxes"]), | |
| ThresholdIntensityd(keys=['image'], above = False, threshold = 1000, cval = 1000), | |
| ThresholdIntensityd(keys=['image'], above = True, threshold = -1024, cval = -1024), | |
| AddChanneld(keys=["image", "label", "boxes"]), | |
| NormalizeIntensityd(keys=["image"]), | |
| SpatialCropd(keys=["image", "label", "boxes"], roi_start=(extremes[0][0], extremes[1][0], extremes[2][0]), roi_end=(extremes[0][1], extremes[1][1], extremes[2][1])), | |
| Spacingd(keys=["image"], pixdim=(1, 1, 1.5)), | |
| ] | |
| ) | |
| processed_1 = load_transformer(scan_dict) | |
| if(np.amax(processed_1['label'][0]) == 0): | |
| return | |
| transformer_1 = Compose( | |
| [ | |
| Resized(keys=["label", "boxes"], spatial_size=processed_1['image'].shape[1:]), | |
| ThresholdIntensityd(keys=['boxes', 'label'], above = False, threshold = 0.5, cval = 1), | |
| ThresholdIntensityd(keys=['boxes', 'label'], above = True, threshold = 0.5, cval = 0), | |
| DivisiblePadd(keys=["image", "label", "boxes"], k=16, mode='symmetric'), | |
| SqueezeDimd(keys=["image", "label", "boxes"], dim = 0), | |
| ToNumpyd(keys=["image", "label", "boxes"]), | |
| ] | |
| ) | |
| processed_2 = transformer_1(processed_1) | |
| affine = processed_1['image_meta_dict']['affine'] | |
| filename = scan_dict['image'].split(directory_split)[-1].split('.')[0] | |
| filename_extension = '.' + '.'.join(scan_dict['image'].split(directory_split)[-1].split('.')[1:]) | |
| normalized_image = processed_2['image'] | |
| image_save = nibabel.Nifti1Image(normalized_image, affine) | |
| boxes_save = nibabel.Nifti1Image(processed_2['boxes'], affine) | |
| label_save = nibabel.Nifti1Image(processed_2['label'], affine) | |
| nibabel.save(image_save, f"{save_directory}{directory_split}Images{directory_split}{filename}_{lung}{filename_extension}") | |
| nibabel.save(boxes_save, f"{save_directory}{directory_split}Boxes{directory_split}{filename}_{lung}{filename_extension}") | |
| nibabel.save(label_save, f"{save_directory}{directory_split}Labels{directory_split}{filename}_{lung}{filename_extension}") | |
| passed = 0 | |
| def process_scan(scan_dict, save_directory): | |
| global passed | |
| try: | |
| masked, affine = mask_lung(scan_dict, batch_size=5) | |
| #s = nibabel.Nifti1Image(masked, affine) | |
| #nibabel.save(s, "D:\\Datasets\\Temp\\Images\\test4.nii.gz") | |
| extremes = calculate_extremes(np.copy(masked), 1) | |
| process_lung_scan(scan_dict, save_directory, extremes, "right") | |
| extremes = calculate_extremes(masked, 2) | |
| process_lung_scan(scan_dict, save_directory, extremes, "left") | |
| except: | |
| passed += 1 | |
| logger.LogWarning("Skipped scan", [str(scan_dict)]) | |
| print(f"passed {passed}") | |
| def process_directory(directory, save_directory): | |
| for image_file in glob.glob(f"{directory}{directory_split}Images{directory_split}*.nii.gz"): | |
| filename = image_file.split(directory_split)[-1] | |
| scan_dict = { | |
| 'image' : image_file, | |
| 'label' : f"{directory}{directory_split}Labels{directory_split}{filename}", | |
| 'boxes' : f"{directory}{directory_split}Boxes{directory_split}{filename}" | |
| } | |
| print(f"Processing {filename}") | |
| process_scan(scan_dict, save_directory) | |
| if __name__ == "__main__": | |
| load_folder = "/home/tumor/data/MSD/" | |
| store_folder = "/home/tumor/data/MSD-Lung/" | |
| #load_folder = "D:\\Datasets\\Temp\\" | |
| #store_folder = "D:\\Datasets\\Temp\\Save\\" | |
| logger.LogInfo("Started cropping lungs", []) | |
| process_directory(load_folder, store_folder) | |
| logger.LogMilestone("Finished cropping lungs!", []) | |