frankenstein / dataprocesser /Preprocess_MR_Mask_generation.py
zy7_oldserver
1
fd601de
import os
import pandas as pd
import numpy as np
import nrrd
import SimpleITK as sitk
import cv2
from dataprocesser.preprocess_MR import step3_vibe_resetsignal
"""
该代码用于处理一组 MR 图像和对应的分割图,应用掩膜、进行归一化,并根据 CSV 文件中的仿真 MR 灰度值对分割图进行替换。最后将处理后的 MR 图像和分割图保存。
主要步骤:
1. **读取数据**:从指定的文件夹中读取 MR 图像和对应的分割图。
2. **归一化处理**:对 MR 图像进行归一化,将其值范围映射到 0 到 255 之间。
3. **轮廓提取**:从归一化后的 MR 图像中提取出主体区域的轮廓(根据给定的阈值分割),创建掩膜。
4. **掩膜应用**:将提取出的掩膜应用到归一化后的 MR 图像上,保留主体区域,抑制背景。
5. **分割图处理**:读取对应的分割图,并与提取出的轮廓进行叠加,之后根据 CSV 文件中的仿真 CT 值替换分割图中的灰度值。
6. **图像保存**:将处理后的 MR 图像和修改后的分割图保存到指定的输出文件夹中,保证其空间属性和几何信息与输入图像一致。
7. **输出**:在 ITK-SNAP 等医学图像工具中打开时, MR 图像和分割图能够保持同步和正确的比例显示。
函数简介:
- `normalize`: 对 MR 图像进行归一化处理,将像素值范围映射到 [0, 255]。
- `create_body_mask`: 从图像中提取出身体的轮廓,生成二值掩膜。
- `apply_mask`: 将提取的掩膜应用到 MR 图像上,保留轮廓内部的区域。
- `process_segmentation`: 读取分割图,并根据 CSV 文件中的仿真 CT 值对其灰度值进行替换。
- `process_image`: 处理单个 MR 图像及其对应的分割图,包括归一化、轮廓提取、掩膜应用、分割图处理等。
- `process_folder`: 处理整个文件夹中的 MR 图像和分割图,逐一处理所有图像并保存结果。
"""
# 归一化函数
def normalize(img, vmin_out=0, vmax_out=1, norm_min_v=None, norm_max_v=None, epsilon=1e-5):
if norm_min_v is None and norm_max_v is None:
norm_min_v = np.min(img)
norm_max_v = np.max(img)
img = np.clip(img, norm_min_v, norm_max_v)
img = (img - norm_min_v) / (norm_max_v - norm_min_v + epsilon)
img = img * (vmax_out - vmin_out) + vmin_out
return img
# 创建轮廓掩膜
def create_body_mask_simple(numpy_img, body_threshold=50):
numpy_img = numpy_img.astype(np.int16)
body_mask = np.where(numpy_img > body_threshold, 1, 0).astype(np.uint8)
contours, _ = cv2.findContours(body_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
mask = np.zeros_like(body_mask, dtype=np.uint8)
if contours:
largest_contour = max(contours, key=cv2.contourArea)
mask = np.ascontiguousarray(mask)
largest_contour = np.ascontiguousarray(largest_contour)
cv2.drawContours(mask, [largest_contour], -1, 1, thickness=cv2.FILLED)
return mask
def create_body_mask(numpy_img, body_threshold=-500, min_contour_area=10000):
"""
Create a binary body mask from a CT image tensor, using a specific threshold for the body parts.
Args:
tensor_img (torch.Tensor): A tensor representation of a grayscale CT image, with intensity values from -1024 to 1500.
Returns:
torch.Tensor: A binary mask tensor where the entire body region is 1 and the background is 0.
"""
# Convert tensor to numpy array
numpy_img = np.ascontiguousarray(numpy_img.astype(np.int16)) # Ensure we can handle negative values correctly
#numpy_img = numpy_img.astype(np.int16)
# Threshold the image at -500 to separate potential body from the background
binary_img = np.where(numpy_img > body_threshold, 1, 0).astype(np.uint8)
# Find contours from the binary image
contours, _ = cv2.findContours(binary_img, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
# Create an empty mask
mask = np.zeros_like(binary_img)
VERBOSE = False
# Fill all detected body contours
if contours:
for contour in contours:
if cv2.contourArea(contour) >= min_contour_area:
if VERBOSE:
print('current contour area: ', cv2.contourArea(contour), 'threshold: ', min_contour_area)
cv2.drawContours(mask, [contour], -1, 1, thickness=cv2.FILLED)
return mask
def apply_mask(normalized_image_array, mask_array):
return normalized_image_array * mask_array
def print_all_info(data, title):
print(f'min, max of {title}:', np.min(data), np.max(data))
# process the segmentation, replace the classes with simulated MR values
def process_segmentation(combined_array, csv_simulation_values, mr_signal_formula=step3_vibe_resetsignal.calculate_signal_vibe):
combined_array = combined_array.astype(np.int16)
print_all_info(combined_array, 'combine')
# two columns of unique value 和 simulation value
# the first element will not be included
organ_indexs = csv_simulation_values[1:, 0] # first column: organ index
T1_values = csv_simulation_values[1:, 1] # second column: simulate MRI value
T2_values = csv_simulation_values[1:, 2]
Rho_values = csv_simulation_values[1:, 3]
order_begin_from_0 = True if organ_indexs.astype(int).min()==0 else False
#print('organ order number begin from 0:', order_begin_from_0)
#print(organ_indexs)
assign_value_mask = np.zeros_like(combined_array)
step=0
for step in range(len(organ_indexs)):
organ_index = organ_indexs[step] # in csv file, organs begin with 1
t1 = float(T1_values[step])
t2 = float(T2_values[step])
rho = float(Rho_values[step])
simulation_value = mr_signal_formula(t1, t2, rho)
organ_index = int(organ_index)
if order_begin_from_0:
#print("order in csv begin from 0")
assign_value_mask[combined_array == organ_index+1] = simulation_value # organ_index+ 1
else:
#print("order in csv begin from 1")
assign_value_mask[combined_array == organ_index] = simulation_value
step+=1
print_all_info(assign_value_mask, 'assignment')
return assign_value_mask
# 处理单个图像和分割图
def process_image(input_path, contour_path, seg_path, csv_simulation_values, output_path1, output_path2, body_threshold):
# 读取原始 MR 图像和分割图
if input_path.endswith('.nrrd'):
img, header = nrrd.read(input_path)
segmentation_img, header_seg = nrrd.read(seg_path)
elif input_path.endswith('.nii.gz') or input_path.endswith('.nii'):
import nibabel as nib
img_metadata = nib.load(input_path)
img = img_metadata.get_fdata()
affine = img_metadata.affine
seg_metadata = nib.load(seg_path)
segmentation_img = seg_metadata.get_fdata()
# 归一化处理
norm_max=255 #255
low_percentile = 5
high_percentile = 90
img_normalized = normalize(img, 0, norm_max, np.percentile(img, low_percentile), np.percentile(img, high_percentile), epsilon=0)
# 提取轮廓图
body_contour = np.zeros_like(img, dtype=np.int16)
for i in range(img.shape[2]):
slice_data = img[:, :, i]
body_contour[:, :, i] = create_body_mask(slice_data, body_threshold=body_threshold)
# 应用掩膜到归一化 MR 图像
masked_image = apply_mask(img_normalized, body_contour)
# 处理分割图
# add contour background to the segmentation (all region inside body + 1)
combined_array = segmentation_img + body_contour
combined_array = np.clip(combined_array, 0, np.max(segmentation_img) + 1)
print_all_info(segmentation_img, 'seg')
processed_segmentation = process_segmentation(combined_array, csv_simulation_values)
# normalize to 0-1
# masked_image = masked_image/norm_max
# processed_segmentation = processed_segmentation/norm_max
if input_path.endswith('.nrrd'):
# 保存处理后的 MR 图像
nrrd.write(output_path1, masked_image, header)
# 保存处理后的分割图
nrrd.write(output_path2, processed_segmentation, header_seg)
# save the body contour mask
elif input_path.endswith('.nii.gz') or input_path.endswith('.nii'):
img_processed = nib.Nifti1Image(masked_image, affine)
nib.save(img_processed, output_path1)
seg_processed = nib.Nifti1Image(processed_segmentation, affine)
nib.save(seg_processed, output_path2)
contour_processed = nib.Nifti1Image(body_contour, affine)
# Split the path into directory and filename
directory, filename = os.path.split(output_path2)
new_filename = filename.replace('seg', 'contour')
contour_path = os.path.join(directory, new_filename)
nib.save(contour_processed, contour_path)
return processed_segmentation
# 处理文件夹
def process_folder(input_folder1, input_folder2, output_folder1, output_folder2, csv_simulation_file, body_threshold=50):
# 读取CSV文件获取仿真CT灰度值 (两列)
csv_simulation_values = pd.read_csv(csv_simulation_file, header=None).to_numpy()
# 检查 csv_simulation_values 是否是二维数组
if csv_simulation_values.ndim == 1:
raise ValueError("CSV 文件格式不正确,应该包含两列:organ_index 和 simulation_value")
# 确保输出文件夹存在
os.makedirs(output_folder1, exist_ok=True)
os.makedirs(output_folder2, exist_ok=True)
for filename in os.listdir(input_folder1):
if filename.endswith('.nrrd'):
input_file_path = os.path.join(input_folder1, filename)
seg_file_path = os.path.join(input_folder2, filename)
output_file_path1 = os.path.join(output_folder1, filename)
output_file_path2 = os.path.join(output_folder2, filename)
print(f"Processing {input_file_path} with segmentation {seg_file_path}")
processed_seg = process_image(input_file_path, None, seg_file_path, csv_simulation_values, output_file_path1, output_file_path2, body_threshold)
def analyse_hist(input_path):
if input_path.endswith('.nrrd'):
img, header = nrrd.read(input_path)
elif input_path.endswith('.nii.gz'):
import nibabel as nib
img_metadata = nib.load(input_path)
img = img_metadata.get_fdata()
affine = img_metadata.affine
import numpy as np
import matplotlib.pyplot as plt
# Plot the histogram
print('shape of img: ', img.shape)
plt.hist(img[:, :, 50], bins=30, edgecolor='black', alpha=0.7)
plt.xlabel('Value')
plt.ylabel('Frequency')
plt.title('Value Distribution')
plt.show()
def process_csv(csv_file, output_folder1, output_folder2, csv_simulation_file, body_threshold=50, output_mr_csv_file='processed_mr_csv_file.csv'):
# 读取CSV文件获取仿真CT灰度值 (两列)
csv_simulation_values = pd.read_csv(csv_simulation_file, header=None).to_numpy()
#csv_simulation_values = pd.read_csv(csv_simulation_file)
# 检查 csv_simulation_values 是否是二维数组
if csv_simulation_values.ndim == 1:
raise ValueError("CSV 文件格式不正确,应该包含两列:organ_index 和 simulation_value")
# 确保输出文件夹存在
os.makedirs(output_folder1, exist_ok=True)
os.makedirs(output_folder2, exist_ok=True)
from step1_init_data_list import list_img_seg_ad_pIDs_from_new_simplified_csv
patient_IDs, Aorta_diss, segs, images = list_img_seg_ad_pIDs_from_new_simplified_csv(csv_file)
from tqdm import tqdm
dataset_list = []
for idx in tqdm(range(len(images))):
if (images[idx].endswith('.nii.gz') and segs[idx].endswith('.nii.gz')) or \
(images[idx].endswith('.nii') and segs[idx].endswith('.nii')):
input_file_path = images[idx]
seg_file_path = segs[idx]
patient_id = patient_IDs[idx]
ad = Aorta_diss[idx]
root_dir = os.path.dirname(input_file_path)
output_file_path1 = os.path.join(output_folder1, os.path.relpath(input_file_path, start=root_dir))
synthrad_basic_mr_name = 'mr'
synthrad_basic_seg_name = 'mr_merged_seg'
if os.path.basename(output_file_path1) == f'{synthrad_basic_mr_name}.nii.gz' or \
os.path.basename(output_file_path1) == f'{synthrad_basic_mr_name}.nii':
# Insert the patient ID in the filename
output_file_path1 = output_file_path1.replace(f'{synthrad_basic_mr_name}', f'mr_{patient_id}')
output_file_path2 = os.path.join(output_folder2, os.path.relpath(seg_file_path, start=root_dir))
if os.path.basename(output_file_path2) == f'{synthrad_basic_seg_name}.nii.gz' or \
os.path.basename(output_file_path2) == f'{synthrad_basic_seg_name}.nii':
# Insert the patient ID in the filename
output_file_path2 = output_file_path2.replace(f'{synthrad_basic_seg_name}', f'mr_seg_{patient_id}')
print(f"Processing {input_file_path} with segmentation {seg_file_path}")
print(f"Save results to {output_file_path1} and {output_file_path2}")
processed_seg = process_image(input_file_path, None, seg_file_path, csv_simulation_values, output_file_path1, output_file_path2, body_threshold)
# processed_mr_csv_file = ...
csv_mr_line = [patient_id,ad,output_file_path2,output_file_path1]
dataset_list.append(csv_mr_line)
import csv
with open(output_mr_csv_file, 'w', newline='') as f:
csvwriter = csv.writer(f)
csvwriter.writerow(['id', 'Aorta_diss', 'seg', 'img'])
csvwriter.writerows(dataset_list)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Process MR images and segmentation maps, apply masks and replace grayscale values.")
parser.add_argument('--input_folder1', required=True, help="Path to the folder containing input MR .nrrd files.")
parser.add_argument('--input_folder2', required=True, help="Path to the folder containing segmentation .nrrd files.")
parser.add_argument('--output_folder1', required=True, help="Path to the folder to save the output MR files.")
parser.add_argument('--output_folder2', required=True, help="Path to the folder to save the output segmentation files.")
parser.add_argument('--csv_simulation_file', required=True, help="CSV file containing simulated CT grayscale values.")
parser.add_argument('--body_threshold', type=int, default=50, help="Threshold to separate body from background.")
args = parser.parse_args()
process_folder(args.input_folder1, args.input_folder2, args.output_folder1, args.output_folder2, args.csv_simulation_file, args.body_threshold)