PiNS / scr /candidate_worldCoord_patchExtarctor_pipeline.py
ft42's picture
Upload 21 files
7f24887 verified
from cvseg_utils import*
import warnings
import os
import logging
import random
import cv2
import json
import torch
import logging
import argparse
import numpy as np
import pandas as pd
from tqdm import tqdm
import SimpleITK as sitk
from monai.transforms import Compose, ScaleIntensityRanged
random.seed(200)
np.random.seed(200)
from datetime import datetime
def create_folder_if_not_exists(folder_path):
import os
# Check if the folder exists
if not os.path.exists(folder_path):
# If the folder doesn't exist, create it
os.makedirs(folder_path)
print(f"Folder created: {folder_path}")
else:
print(f"Folder already exists: {folder_path}")
def nifti_patche_extractor_for_worldCoord_main():
parser = argparse.ArgumentParser(description='Nodule segmentation and feature extraction from CT images.')
parser.add_argument('--raw_data_path', type=str, required=True, help='Path to raw CT images')
parser.add_argument('--csv_save_path', type=str, required=True, help='Path to save the CSV files')
parser.add_argument('--dataset_csv', type=str, required=True, help='Path to the dataset CSV')
parser.add_argument('--dataset_name', type=str, default='DLCS24', help='Dataset to use')
# Allow multiple column names as input arguments
parser.add_argument('--nifti_clm_name', type=str, required=True, help='name to the nifti column name')
parser.add_argument('--unique_Annotation_id', type=str, help='Column for unique annotation ID')
parser.add_argument('--Malignant_lbl', type=str, help='Column name for malignancy labels')
parser.add_argument('--coordX', type=str, required=True, help='Column name for X coordinate')
parser.add_argument('--coordY', type=str, required=True, help='Column name for Y coordinate')
parser.add_argument('--coordZ', type=str, required=True, help='Column name for Z coordinate')
parser.add_argument('--patch_size', type=int, nargs=3, default=[64, 64, 64], help="Patch size as three integers, e.g., --patch_size 64 64 64")
# Normalization (4 values)
parser.add_argument('--normalization', type=float, nargs=4, default=[-1000, 500.0, 0.0, 1.0],help="Normalization values as four floats: A_min A_max B_min B_max")
# Clip (Boolean from string input)
parser.add_argument('--clip', type=str, choices=["True", "False"], default="False",help="Enable or disable clipping (True/False). Default is False.")
parser.add_argument('--save_nifti_path', type=str, help='Path to save the nifti files')
args = parser.parse_args()
raw_data_path = args.raw_data_path
csv_save_path = args.csv_save_path
dataset_csv = args.dataset_csv
create_folder_if_not_exists(csv_save_path)
create_folder_if_not_exists(args.save_nifti_path)
# Extract normalization values
A_min, A_max, B_min, B_max = args.normalization
# Convert clip argument to boolean
CLIP = args.clip == "True"
output_csv = csv_save_path + f'CandidateSeg_{args.dataset_name}_patch{args.patch_size[0]}x{args.patch_size[1]}y{args.patch_size[2]}z.csv'
Erroroutput_csv = csv_save_path + f'CandidateSeg_{args.dataset_name}_patch{args.patch_size[0]}x{args.patch_size[1]}y{args.patch_size[2]}z_Error.csv'
# Derive the log file name from the output CSV file
log_file = output_csv.replace('.csv', '.log')
# Configure logging
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logging.info(f"Output CSV File: {output_csv}")
logging.info(f"Error CSV File: {Erroroutput_csv}")
logging.info(f"Log File Created: {log_file}")
logging.info("File names generated successfully.")
###----input CSV
df = pd.read_csv(dataset_csv)
final_dect = df[args.nifti_clm_name].unique()
output_df = pd.DataFrame()
Error_ids = []
for dictonary_list_i in tqdm(range(0,len(final_dect)), desc='Processing CTs'):
try:
logging.info(f"---Loading---: {dictonary_list_i+1}")
#print(make_bold('|' + '-'*30 + ' No={} '.format(dictonary_list_i+1) + '-'*30 + '|'))
#print('\n')
desired_value = final_dect[dictonary_list_i]
filtered_df = df[df[args.nifti_clm_name] == desired_value]
example_dictionary = filtered_df.reset_index()
logging.info(f"Loading the Image:{example_dictionary[args.nifti_clm_name][0]}")
logging.info(f"Number of Annotations:{len(example_dictionary)}")
#print('Loading the Image: {}'.format(example_dictionary[args.nifti_clm_name][0]))
#print('Number of Annotations: {}'.format(len(example_dictionary)))
ct_nifti_path = raw_data_path + example_dictionary[args.nifti_clm_name][0]
ct_image = sitk.ReadImage(ct_nifti_path)
ct_array = sitk.GetArrayFromImage(ct_image)
torch_image = torch.from_numpy(ct_array)
temp_torch_image = {"image": torch_image}
intensity_transform = Compose([ScaleIntensityRanged(keys=["image"], a_min=A_min, a_max=A_max, b_min=B_min, b_max=B_max, clip=CLIP),])
transformed_image = intensity_transform(temp_torch_image)
numpyImage = transformed_image["image"].numpy()
for Which_box_to_use in range(0,len(example_dictionary)):
#print('-----------------------------------------------------------------------------------------------')
if args.unique_Annotation_id in example_dictionary.columns:
annotation_id = example_dictionary[args.unique_Annotation_id][Which_box_to_use]
else:
# Generate an ID using the image name (without extension) and an index
image_name = example_dictionary[args.nifti_clm_name][0].split('.nii')[0]
annotation_id = f"{image_name}_candidate_{Which_box_to_use+1}"
worldCoord = np.asarray([float(example_dictionary[args.coordX][Which_box_to_use]), float(example_dictionary[args.coordY][Which_box_to_use]), float(example_dictionary[args.coordZ][Which_box_to_use])])
voxelCoord = ct_image.TransformPhysicalPointToIndex(worldCoord)
# Access individual values
w = args.patch_size[0]
h = args.patch_size[1]
d = args.patch_size[2]
start_x, end_x = int(voxelCoord[0] - w/2), int(voxelCoord[0] + w/2)
start_y, end_y = int(voxelCoord[1] - h/2), int(voxelCoord[1] + h/2)
start_z, end_z = int(voxelCoord[2] - d/2), int(voxelCoord[2] + d/2)
X, Y, Z = int(voxelCoord[0]), int(voxelCoord[1]), int(voxelCoord[2])
numpy_to_save_np = numpyImage[max(start_z,0):end_z, max(start_y,0):end_y, max(start_x,0):end_x]
# Pad if necessary
if np.any(numpy_to_save_np.shape != (d, h, w)):
dZ, dY, dX = numpyImage.shape
numpy_to_save_np = np.pad(numpy_to_save_np, ((max(d // 2 - Z, 0), d // 2 - min(dZ - Z, d // 2)),
(max(h // 2 - Y, 0), h // 2 - min(dY - Y, h // 2)),
(max(w // 2 - X, 0), w // 2 - min(dX - X, w // 2))), mode="constant", constant_values=0.)
#--- Segmentation---#
patch_image = sitk.GetImageFromArray(numpy_to_save_np)
patch_image.SetSpacing(ct_image.GetSpacing())
patch_image.SetDirection(ct_image.GetDirection())
patch_image.SetOrigin(ct_image.GetOrigin())
if args.Malignant_lbl in example_dictionary.columns:
feature_row = pd.DataFrame({args.nifti_clm_name: [example_dictionary[args.nifti_clm_name][0]],'candidateID': [annotation_id],args.Malignant_lbl: [example_dictionary[args.Malignant_lbl][Which_box_to_use]]})
else:
feature_row = pd.DataFrame({args.nifti_clm_name: [example_dictionary[args.nifti_clm_name][0]],'candidateID': [annotation_id]})
feature_row[args.coordX] = example_dictionary[args.coordX][Which_box_to_use]
feature_row[args.coordY] = example_dictionary[args.coordY][Which_box_to_use]
feature_row[args.coordZ] = example_dictionary[args.coordZ][Which_box_to_use]
# Save
output_nifti_path = os.path.join(args.save_nifti_path, f"{annotation_id}.nii.gz")
# Ensure the directory exists before writing the file
output_nifti_dir = os.path.dirname(output_nifti_path)
os.makedirs(output_nifti_dir, exist_ok=True) # Creates the directory if it doesn't exist
sitk.WriteImage(patch_image, output_nifti_path)
# Append the row to the output DataFrame
output_df = pd.concat([output_df, feature_row], ignore_index=True)
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
print(f" Error occured: {e}")
Error_ids.append(final_dect[dictonary_list_i])
pass
# Save the output DataFrame to a CSV file
output_df.to_csv(output_csv, index=False,encoding='utf-8')
print("completed and saved to {}".format(output_csv))
Erroroutput_df = pd.DataFrame(list(Error_ids),columns=[args.nifti_clm_name])
Erroroutput_df.to_csv(Erroroutput_csv, index=False,encoding='utf-8')
print("completed and saved Error to {}".format(Erroroutput_csv))
if __name__ == "__main__":
nifti_patche_extractor_for_worldCoord_main()