Spaces:
Runtime error
Runtime error
| import os | |
| import pandas as pd | |
| import numpy as np | |
| import nrrd | |
| import SimpleITK as sitk | |
| import cv2 | |
| from dataprocesser.preprocess_MR import step3_vibe_resetsignal | |
| """ | |
| 该代码用于处理一组 MR 图像和对应的分割图,应用掩膜、进行归一化,并根据 CSV 文件中的仿真 MR 灰度值对分割图进行替换。最后将处理后的 MR 图像和分割图保存。 | |
| 主要步骤: | |
| 1. **读取数据**:从指定的文件夹中读取 MR 图像和对应的分割图。 | |
| 2. **归一化处理**:对 MR 图像进行归一化,将其值范围映射到 0 到 255 之间。 | |
| 3. **轮廓提取**:从归一化后的 MR 图像中提取出主体区域的轮廓(根据给定的阈值分割),创建掩膜。 | |
| 4. **掩膜应用**:将提取出的掩膜应用到归一化后的 MR 图像上,保留主体区域,抑制背景。 | |
| 5. **分割图处理**:读取对应的分割图,并与提取出的轮廓进行叠加,之后根据 CSV 文件中的仿真 CT 值替换分割图中的灰度值。 | |
| 6. **图像保存**:将处理后的 MR 图像和修改后的分割图保存到指定的输出文件夹中,保证其空间属性和几何信息与输入图像一致。 | |
| 7. **输出**:在 ITK-SNAP 等医学图像工具中打开时, MR 图像和分割图能够保持同步和正确的比例显示。 | |
| 函数简介: | |
| - `normalize`: 对 MR 图像进行归一化处理,将像素值范围映射到 [0, 255]。 | |
| - `create_body_mask`: 从图像中提取出身体的轮廓,生成二值掩膜。 | |
| - `apply_mask`: 将提取的掩膜应用到 MR 图像上,保留轮廓内部的区域。 | |
| - `process_segmentation`: 读取分割图,并根据 CSV 文件中的仿真 CT 值对其灰度值进行替换。 | |
| - `process_image`: 处理单个 MR 图像及其对应的分割图,包括归一化、轮廓提取、掩膜应用、分割图处理等。 | |
| - `process_folder`: 处理整个文件夹中的 MR 图像和分割图,逐一处理所有图像并保存结果。 | |
| """ | |
| # 归一化函数 | |
| def normalize(img, vmin_out=0, vmax_out=1, norm_min_v=None, norm_max_v=None, epsilon=1e-5): | |
| if norm_min_v is None and norm_max_v is None: | |
| norm_min_v = np.min(img) | |
| norm_max_v = np.max(img) | |
| img = np.clip(img, norm_min_v, norm_max_v) | |
| img = (img - norm_min_v) / (norm_max_v - norm_min_v + epsilon) | |
| img = img * (vmax_out - vmin_out) + vmin_out | |
| return img | |
| # 创建轮廓掩膜 | |
| def create_body_mask_simple(numpy_img, body_threshold=50): | |
| numpy_img = numpy_img.astype(np.int16) | |
| body_mask = np.where(numpy_img > body_threshold, 1, 0).astype(np.uint8) | |
| contours, _ = cv2.findContours(body_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| mask = np.zeros_like(body_mask, dtype=np.uint8) | |
| if contours: | |
| largest_contour = max(contours, key=cv2.contourArea) | |
| mask = np.ascontiguousarray(mask) | |
| largest_contour = np.ascontiguousarray(largest_contour) | |
| cv2.drawContours(mask, [largest_contour], -1, 1, thickness=cv2.FILLED) | |
| return mask | |
| def create_body_mask(numpy_img, body_threshold=-500, min_contour_area=10000): | |
| """ | |
| Create a binary body mask from a CT image tensor, using a specific threshold for the body parts. | |
| Args: | |
| tensor_img (torch.Tensor): A tensor representation of a grayscale CT image, with intensity values from -1024 to 1500. | |
| Returns: | |
| torch.Tensor: A binary mask tensor where the entire body region is 1 and the background is 0. | |
| """ | |
| # Convert tensor to numpy array | |
| numpy_img = np.ascontiguousarray(numpy_img.astype(np.int16)) # Ensure we can handle negative values correctly | |
| #numpy_img = numpy_img.astype(np.int16) | |
| # Threshold the image at -500 to separate potential body from the background | |
| binary_img = np.where(numpy_img > body_threshold, 1, 0).astype(np.uint8) | |
| # Find contours from the binary image | |
| contours, _ = cv2.findContours(binary_img, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) | |
| # Create an empty mask | |
| mask = np.zeros_like(binary_img) | |
| VERBOSE = False | |
| # Fill all detected body contours | |
| if contours: | |
| for contour in contours: | |
| if cv2.contourArea(contour) >= min_contour_area: | |
| if VERBOSE: | |
| print('current contour area: ', cv2.contourArea(contour), 'threshold: ', min_contour_area) | |
| cv2.drawContours(mask, [contour], -1, 1, thickness=cv2.FILLED) | |
| return mask | |
| def apply_mask(normalized_image_array, mask_array): | |
| return normalized_image_array * mask_array | |
| def print_all_info(data, title): | |
| print(f'min, max of {title}:', np.min(data), np.max(data)) | |
| # process the segmentation, replace the classes with simulated MR values | |
| def process_segmentation(combined_array, csv_simulation_values, mr_signal_formula=step3_vibe_resetsignal.calculate_signal_vibe): | |
| combined_array = combined_array.astype(np.int16) | |
| print_all_info(combined_array, 'combine') | |
| # two columns of unique value 和 simulation value | |
| # the first element will not be included | |
| organ_indexs = csv_simulation_values[1:, 0] # first column: organ index | |
| T1_values = csv_simulation_values[1:, 1] # second column: simulate MRI value | |
| T2_values = csv_simulation_values[1:, 2] | |
| Rho_values = csv_simulation_values[1:, 3] | |
| order_begin_from_0 = True if organ_indexs.astype(int).min()==0 else False | |
| #print('organ order number begin from 0:', order_begin_from_0) | |
| #print(organ_indexs) | |
| assign_value_mask = np.zeros_like(combined_array) | |
| step=0 | |
| for step in range(len(organ_indexs)): | |
| organ_index = organ_indexs[step] # in csv file, organs begin with 1 | |
| t1 = float(T1_values[step]) | |
| t2 = float(T2_values[step]) | |
| rho = float(Rho_values[step]) | |
| simulation_value = mr_signal_formula(t1, t2, rho) | |
| organ_index = int(organ_index) | |
| if order_begin_from_0: | |
| #print("order in csv begin from 0") | |
| assign_value_mask[combined_array == organ_index+1] = simulation_value # organ_index+ 1 | |
| else: | |
| #print("order in csv begin from 1") | |
| assign_value_mask[combined_array == organ_index] = simulation_value | |
| step+=1 | |
| print_all_info(assign_value_mask, 'assignment') | |
| return assign_value_mask | |
| # 处理单个图像和分割图 | |
| def process_image(input_path, contour_path, seg_path, csv_simulation_values, output_path1, output_path2, body_threshold): | |
| # 读取原始 MR 图像和分割图 | |
| if input_path.endswith('.nrrd'): | |
| img, header = nrrd.read(input_path) | |
| segmentation_img, header_seg = nrrd.read(seg_path) | |
| elif input_path.endswith('.nii.gz') or input_path.endswith('.nii'): | |
| import nibabel as nib | |
| img_metadata = nib.load(input_path) | |
| img = img_metadata.get_fdata() | |
| affine = img_metadata.affine | |
| seg_metadata = nib.load(seg_path) | |
| segmentation_img = seg_metadata.get_fdata() | |
| # 归一化处理 | |
| norm_max=255 #255 | |
| low_percentile = 5 | |
| high_percentile = 90 | |
| img_normalized = normalize(img, 0, norm_max, np.percentile(img, low_percentile), np.percentile(img, high_percentile), epsilon=0) | |
| # 提取轮廓图 | |
| body_contour = np.zeros_like(img, dtype=np.int16) | |
| for i in range(img.shape[2]): | |
| slice_data = img[:, :, i] | |
| body_contour[:, :, i] = create_body_mask(slice_data, body_threshold=body_threshold) | |
| # 应用掩膜到归一化 MR 图像 | |
| masked_image = apply_mask(img_normalized, body_contour) | |
| # 处理分割图 | |
| # add contour background to the segmentation (all region inside body + 1) | |
| combined_array = segmentation_img + body_contour | |
| combined_array = np.clip(combined_array, 0, np.max(segmentation_img) + 1) | |
| print_all_info(segmentation_img, 'seg') | |
| processed_segmentation = process_segmentation(combined_array, csv_simulation_values) | |
| # normalize to 0-1 | |
| # masked_image = masked_image/norm_max | |
| # processed_segmentation = processed_segmentation/norm_max | |
| if input_path.endswith('.nrrd'): | |
| # 保存处理后的 MR 图像 | |
| nrrd.write(output_path1, masked_image, header) | |
| # 保存处理后的分割图 | |
| nrrd.write(output_path2, processed_segmentation, header_seg) | |
| # save the body contour mask | |
| elif input_path.endswith('.nii.gz') or input_path.endswith('.nii'): | |
| img_processed = nib.Nifti1Image(masked_image, affine) | |
| nib.save(img_processed, output_path1) | |
| seg_processed = nib.Nifti1Image(processed_segmentation, affine) | |
| nib.save(seg_processed, output_path2) | |
| contour_processed = nib.Nifti1Image(body_contour, affine) | |
| # Split the path into directory and filename | |
| directory, filename = os.path.split(output_path2) | |
| new_filename = filename.replace('seg', 'contour') | |
| contour_path = os.path.join(directory, new_filename) | |
| nib.save(contour_processed, contour_path) | |
| return processed_segmentation | |
| # 处理文件夹 | |
| def process_folder(input_folder1, input_folder2, output_folder1, output_folder2, csv_simulation_file, body_threshold=50): | |
| # 读取CSV文件获取仿真CT灰度值 (两列) | |
| csv_simulation_values = pd.read_csv(csv_simulation_file, header=None).to_numpy() | |
| # 检查 csv_simulation_values 是否是二维数组 | |
| if csv_simulation_values.ndim == 1: | |
| raise ValueError("CSV 文件格式不正确,应该包含两列:organ_index 和 simulation_value") | |
| # 确保输出文件夹存在 | |
| os.makedirs(output_folder1, exist_ok=True) | |
| os.makedirs(output_folder2, exist_ok=True) | |
| for filename in os.listdir(input_folder1): | |
| if filename.endswith('.nrrd'): | |
| input_file_path = os.path.join(input_folder1, filename) | |
| seg_file_path = os.path.join(input_folder2, filename) | |
| output_file_path1 = os.path.join(output_folder1, filename) | |
| output_file_path2 = os.path.join(output_folder2, filename) | |
| print(f"Processing {input_file_path} with segmentation {seg_file_path}") | |
| processed_seg = process_image(input_file_path, None, seg_file_path, csv_simulation_values, output_file_path1, output_file_path2, body_threshold) | |
| def analyse_hist(input_path): | |
| if input_path.endswith('.nrrd'): | |
| img, header = nrrd.read(input_path) | |
| elif input_path.endswith('.nii.gz'): | |
| import nibabel as nib | |
| img_metadata = nib.load(input_path) | |
| img = img_metadata.get_fdata() | |
| affine = img_metadata.affine | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| # Plot the histogram | |
| print('shape of img: ', img.shape) | |
| plt.hist(img[:, :, 50], bins=30, edgecolor='black', alpha=0.7) | |
| plt.xlabel('Value') | |
| plt.ylabel('Frequency') | |
| plt.title('Value Distribution') | |
| plt.show() | |
| def process_csv(csv_file, output_folder1, output_folder2, csv_simulation_file, body_threshold=50, output_mr_csv_file='processed_mr_csv_file.csv'): | |
| # 读取CSV文件获取仿真CT灰度值 (两列) | |
| csv_simulation_values = pd.read_csv(csv_simulation_file, header=None).to_numpy() | |
| #csv_simulation_values = pd.read_csv(csv_simulation_file) | |
| # 检查 csv_simulation_values 是否是二维数组 | |
| if csv_simulation_values.ndim == 1: | |
| raise ValueError("CSV 文件格式不正确,应该包含两列:organ_index 和 simulation_value") | |
| # 确保输出文件夹存在 | |
| os.makedirs(output_folder1, exist_ok=True) | |
| os.makedirs(output_folder2, exist_ok=True) | |
| from step1_init_data_list import list_img_seg_ad_pIDs_from_new_simplified_csv | |
| patient_IDs, Aorta_diss, segs, images = list_img_seg_ad_pIDs_from_new_simplified_csv(csv_file) | |
| from tqdm import tqdm | |
| dataset_list = [] | |
| for idx in tqdm(range(len(images))): | |
| if (images[idx].endswith('.nii.gz') and segs[idx].endswith('.nii.gz')) or \ | |
| (images[idx].endswith('.nii') and segs[idx].endswith('.nii')): | |
| input_file_path = images[idx] | |
| seg_file_path = segs[idx] | |
| patient_id = patient_IDs[idx] | |
| ad = Aorta_diss[idx] | |
| root_dir = os.path.dirname(input_file_path) | |
| output_file_path1 = os.path.join(output_folder1, os.path.relpath(input_file_path, start=root_dir)) | |
| synthrad_basic_mr_name = 'mr' | |
| synthrad_basic_seg_name = 'mr_merged_seg' | |
| if os.path.basename(output_file_path1) == f'{synthrad_basic_mr_name}.nii.gz' or \ | |
| os.path.basename(output_file_path1) == f'{synthrad_basic_mr_name}.nii': | |
| # Insert the patient ID in the filename | |
| output_file_path1 = output_file_path1.replace(f'{synthrad_basic_mr_name}', f'mr_{patient_id}') | |
| output_file_path2 = os.path.join(output_folder2, os.path.relpath(seg_file_path, start=root_dir)) | |
| if os.path.basename(output_file_path2) == f'{synthrad_basic_seg_name}.nii.gz' or \ | |
| os.path.basename(output_file_path2) == f'{synthrad_basic_seg_name}.nii': | |
| # Insert the patient ID in the filename | |
| output_file_path2 = output_file_path2.replace(f'{synthrad_basic_seg_name}', f'mr_seg_{patient_id}') | |
| print(f"Processing {input_file_path} with segmentation {seg_file_path}") | |
| print(f"Save results to {output_file_path1} and {output_file_path2}") | |
| processed_seg = process_image(input_file_path, None, seg_file_path, csv_simulation_values, output_file_path1, output_file_path2, body_threshold) | |
| # processed_mr_csv_file = ... | |
| csv_mr_line = [patient_id,ad,output_file_path2,output_file_path1] | |
| dataset_list.append(csv_mr_line) | |
| import csv | |
| with open(output_mr_csv_file, 'w', newline='') as f: | |
| csvwriter = csv.writer(f) | |
| csvwriter.writerow(['id', 'Aorta_diss', 'seg', 'img']) | |
| csvwriter.writerows(dataset_list) | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Process MR images and segmentation maps, apply masks and replace grayscale values.") | |
| parser.add_argument('--input_folder1', required=True, help="Path to the folder containing input MR .nrrd files.") | |
| parser.add_argument('--input_folder2', required=True, help="Path to the folder containing segmentation .nrrd files.") | |
| parser.add_argument('--output_folder1', required=True, help="Path to the folder to save the output MR files.") | |
| parser.add_argument('--output_folder2', required=True, help="Path to the folder to save the output segmentation files.") | |
| parser.add_argument('--csv_simulation_file', required=True, help="CSV file containing simulated CT grayscale values.") | |
| parser.add_argument('--body_threshold', type=int, default=50, help="Threshold to separate body from background.") | |
| args = parser.parse_args() | |
| process_folder(args.input_folder1, args.input_folder2, args.output_folder1, args.output_folder2, args.csv_simulation_file, args.body_threshold) |