|
|
from cvseg_utils import* |
|
|
import warnings |
|
|
import os |
|
|
import logging |
|
|
import random |
|
|
import cv2 |
|
|
import json |
|
|
import torch |
|
|
import logging |
|
|
import argparse |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from tqdm import tqdm |
|
|
import SimpleITK as sitk |
|
|
from monai.transforms import Compose, ScaleIntensityRanged |
|
|
random.seed(200) |
|
|
np.random.seed(200) |
|
|
from datetime import datetime |
|
|
|
|
|
def create_folder_if_not_exists(folder_path): |
|
|
import os |
|
|
|
|
|
if not os.path.exists(folder_path): |
|
|
|
|
|
os.makedirs(folder_path) |
|
|
print(f"Folder created: {folder_path}") |
|
|
else: |
|
|
print(f"Folder already exists: {folder_path}") |
|
|
|
|
|
def nifti_patche_extractor_for_worldCoord_main(): |
|
|
parser = argparse.ArgumentParser(description='Nodule segmentation and feature extraction from CT images.') |
|
|
parser.add_argument('--raw_data_path', type=str, required=True, help='Path to raw CT images') |
|
|
parser.add_argument('--csv_save_path', type=str, required=True, help='Path to save the CSV files') |
|
|
parser.add_argument('--dataset_csv', type=str, required=True, help='Path to the dataset CSV') |
|
|
parser.add_argument('--dataset_name', type=str, default='DLCS24', help='Dataset to use') |
|
|
|
|
|
parser.add_argument('--nifti_clm_name', type=str, required=True, help='name to the nifti column name') |
|
|
parser.add_argument('--unique_Annotation_id', type=str, help='Column for unique annotation ID') |
|
|
parser.add_argument('--Malignant_lbl', type=str, help='Column name for malignancy labels') |
|
|
parser.add_argument('--coordX', type=str, required=True, help='Column name for X coordinate') |
|
|
parser.add_argument('--coordY', type=str, required=True, help='Column name for Y coordinate') |
|
|
parser.add_argument('--coordZ', type=str, required=True, help='Column name for Z coordinate') |
|
|
parser.add_argument('--patch_size', type=int, nargs=3, default=[64, 64, 64], help="Patch size as three integers, e.g., --patch_size 64 64 64") |
|
|
|
|
|
parser.add_argument('--normalization', type=float, nargs=4, default=[-1000, 500.0, 0.0, 1.0],help="Normalization values as four floats: A_min A_max B_min B_max") |
|
|
|
|
|
parser.add_argument('--clip', type=str, choices=["True", "False"], default="False",help="Enable or disable clipping (True/False). Default is False.") |
|
|
parser.add_argument('--save_nifti_path', type=str, help='Path to save the nifti files') |
|
|
|
|
|
args = parser.parse_args() |
|
|
raw_data_path = args.raw_data_path |
|
|
csv_save_path = args.csv_save_path |
|
|
dataset_csv = args.dataset_csv |
|
|
|
|
|
create_folder_if_not_exists(csv_save_path) |
|
|
create_folder_if_not_exists(args.save_nifti_path) |
|
|
|
|
|
A_min, A_max, B_min, B_max = args.normalization |
|
|
|
|
|
CLIP = args.clip == "True" |
|
|
output_csv = csv_save_path + f'CandidateSeg_{args.dataset_name}_patch{args.patch_size[0]}x{args.patch_size[1]}y{args.patch_size[2]}z.csv' |
|
|
Erroroutput_csv = csv_save_path + f'CandidateSeg_{args.dataset_name}_patch{args.patch_size[0]}x{args.patch_size[1]}y{args.patch_size[2]}z_Error.csv' |
|
|
|
|
|
|
|
|
log_file = output_csv.replace('.csv', '.log') |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
filename=log_file, |
|
|
level=logging.INFO, |
|
|
format="%(asctime)s - %(levelname)s - %(message)s", |
|
|
datefmt="%Y-%m-%d %H:%M:%S" |
|
|
) |
|
|
|
|
|
logging.info(f"Output CSV File: {output_csv}") |
|
|
logging.info(f"Error CSV File: {Erroroutput_csv}") |
|
|
logging.info(f"Log File Created: {log_file}") |
|
|
logging.info("File names generated successfully.") |
|
|
|
|
|
|
|
|
|
|
|
df = pd.read_csv(dataset_csv) |
|
|
final_dect = df[args.nifti_clm_name].unique() |
|
|
output_df = pd.DataFrame() |
|
|
Error_ids = [] |
|
|
for dictonary_list_i in tqdm(range(0,len(final_dect)), desc='Processing CTs'): |
|
|
try: |
|
|
logging.info(f"---Loading---: {dictonary_list_i+1}") |
|
|
|
|
|
|
|
|
|
|
|
desired_value = final_dect[dictonary_list_i] |
|
|
filtered_df = df[df[args.nifti_clm_name] == desired_value] |
|
|
example_dictionary = filtered_df.reset_index() |
|
|
|
|
|
logging.info(f"Loading the Image:{example_dictionary[args.nifti_clm_name][0]}") |
|
|
logging.info(f"Number of Annotations:{len(example_dictionary)}") |
|
|
|
|
|
|
|
|
ct_nifti_path = raw_data_path + example_dictionary[args.nifti_clm_name][0] |
|
|
ct_image = sitk.ReadImage(ct_nifti_path) |
|
|
ct_array = sitk.GetArrayFromImage(ct_image) |
|
|
|
|
|
|
|
|
torch_image = torch.from_numpy(ct_array) |
|
|
temp_torch_image = {"image": torch_image} |
|
|
intensity_transform = Compose([ScaleIntensityRanged(keys=["image"], a_min=A_min, a_max=A_max, b_min=B_min, b_max=B_max, clip=CLIP),]) |
|
|
transformed_image = intensity_transform(temp_torch_image) |
|
|
numpyImage = transformed_image["image"].numpy() |
|
|
|
|
|
for Which_box_to_use in range(0,len(example_dictionary)): |
|
|
|
|
|
|
|
|
if args.unique_Annotation_id in example_dictionary.columns: |
|
|
annotation_id = example_dictionary[args.unique_Annotation_id][Which_box_to_use] |
|
|
else: |
|
|
|
|
|
image_name = example_dictionary[args.nifti_clm_name][0].split('.nii')[0] |
|
|
annotation_id = f"{image_name}_candidate_{Which_box_to_use+1}" |
|
|
|
|
|
|
|
|
|
|
|
worldCoord = np.asarray([float(example_dictionary[args.coordX][Which_box_to_use]), float(example_dictionary[args.coordY][Which_box_to_use]), float(example_dictionary[args.coordZ][Which_box_to_use])]) |
|
|
voxelCoord = ct_image.TransformPhysicalPointToIndex(worldCoord) |
|
|
|
|
|
w = args.patch_size[0] |
|
|
h = args.patch_size[1] |
|
|
d = args.patch_size[2] |
|
|
start_x, end_x = int(voxelCoord[0] - w/2), int(voxelCoord[0] + w/2) |
|
|
start_y, end_y = int(voxelCoord[1] - h/2), int(voxelCoord[1] + h/2) |
|
|
start_z, end_z = int(voxelCoord[2] - d/2), int(voxelCoord[2] + d/2) |
|
|
X, Y, Z = int(voxelCoord[0]), int(voxelCoord[1]), int(voxelCoord[2]) |
|
|
numpy_to_save_np = numpyImage[max(start_z,0):end_z, max(start_y,0):end_y, max(start_x,0):end_x] |
|
|
|
|
|
|
|
|
if np.any(numpy_to_save_np.shape != (d, h, w)): |
|
|
dZ, dY, dX = numpyImage.shape |
|
|
numpy_to_save_np = np.pad(numpy_to_save_np, ((max(d // 2 - Z, 0), d // 2 - min(dZ - Z, d // 2)), |
|
|
(max(h // 2 - Y, 0), h // 2 - min(dY - Y, h // 2)), |
|
|
(max(w // 2 - X, 0), w // 2 - min(dX - X, w // 2))), mode="constant", constant_values=0.) |
|
|
|
|
|
|
|
|
patch_image = sitk.GetImageFromArray(numpy_to_save_np) |
|
|
patch_image.SetSpacing(ct_image.GetSpacing()) |
|
|
patch_image.SetDirection(ct_image.GetDirection()) |
|
|
patch_image.SetOrigin(ct_image.GetOrigin()) |
|
|
if args.Malignant_lbl in example_dictionary.columns: |
|
|
feature_row = pd.DataFrame({args.nifti_clm_name: [example_dictionary[args.nifti_clm_name][0]],'candidateID': [annotation_id],args.Malignant_lbl: [example_dictionary[args.Malignant_lbl][Which_box_to_use]]}) |
|
|
else: |
|
|
feature_row = pd.DataFrame({args.nifti_clm_name: [example_dictionary[args.nifti_clm_name][0]],'candidateID': [annotation_id]}) |
|
|
feature_row[args.coordX] = example_dictionary[args.coordX][Which_box_to_use] |
|
|
feature_row[args.coordY] = example_dictionary[args.coordY][Which_box_to_use] |
|
|
feature_row[args.coordZ] = example_dictionary[args.coordZ][Which_box_to_use] |
|
|
|
|
|
output_nifti_path = os.path.join(args.save_nifti_path, f"{annotation_id}.nii.gz") |
|
|
|
|
|
output_nifti_dir = os.path.dirname(output_nifti_path) |
|
|
os.makedirs(output_nifti_dir, exist_ok=True) |
|
|
sitk.WriteImage(patch_image, output_nifti_path) |
|
|
|
|
|
output_df = pd.concat([output_df, feature_row], ignore_index=True) |
|
|
except Exception as e: |
|
|
logging.error(f"An error occurred: {str(e)}") |
|
|
print(f" Error occured: {e}") |
|
|
Error_ids.append(final_dect[dictonary_list_i]) |
|
|
pass |
|
|
|
|
|
|
|
|
output_df.to_csv(output_csv, index=False,encoding='utf-8') |
|
|
print("completed and saved to {}".format(output_csv)) |
|
|
Erroroutput_df = pd.DataFrame(list(Error_ids),columns=[args.nifti_clm_name]) |
|
|
Erroroutput_df.to_csv(Erroroutput_csv, index=False,encoding='utf-8') |
|
|
print("completed and saved Error to {}".format(Erroroutput_csv)) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
nifti_patche_extractor_for_worldCoord_main() |
|
|
|