File size: 9,874 Bytes
7f24887
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
from cvseg_utils import*
import warnings
import os
import logging
import random
import cv2
import json
import torch
import logging
import argparse
import numpy as np
import pandas as pd
from tqdm import tqdm
import SimpleITK as sitk
from monai.transforms import Compose, ScaleIntensityRanged
random.seed(200)
np.random.seed(200)
from datetime import datetime

def create_folder_if_not_exists(folder_path):
    import os
    # Check if the folder exists
    if not os.path.exists(folder_path):
        # If the folder doesn't exist, create it
        os.makedirs(folder_path)
        print(f"Folder created: {folder_path}")
    else:
        print(f"Folder already exists: {folder_path}")

def nifti_patche_extractor_for_worldCoord_main():
    parser = argparse.ArgumentParser(description='Nodule segmentation and feature extraction from CT images.')
    parser.add_argument('--raw_data_path', type=str, required=True, help='Path to raw CT images')
    parser.add_argument('--csv_save_path', type=str, required=True, help='Path to save the CSV files')
    parser.add_argument('--dataset_csv',   type=str, required=True, help='Path to the dataset CSV')
    parser.add_argument('--dataset_name',  type=str, default='DLCS24', help='Dataset to use')
    # Allow multiple column names as input arguments
    parser.add_argument('--nifti_clm_name',   type=str, required=True, help='name to the nifti column name')
    parser.add_argument('--unique_Annotation_id', type=str, help='Column for unique annotation ID')
    parser.add_argument('--Malignant_lbl', type=str, help='Column name for malignancy labels')
    parser.add_argument('--coordX', type=str, required=True, help='Column name for X coordinate')
    parser.add_argument('--coordY', type=str, required=True, help='Column name for Y coordinate')
    parser.add_argument('--coordZ', type=str, required=True, help='Column name for Z coordinate')
    parser.add_argument('--patch_size', type=int, nargs=3, default=[64, 64, 64], help="Patch size as three integers, e.g., --patch_size 64 64 64")
    # Normalization (4 values)
    parser.add_argument('--normalization', type=float, nargs=4, default=[-1000, 500.0, 0.0, 1.0],help="Normalization values as four floats: A_min A_max B_min B_max")
    # Clip (Boolean from string input)
    parser.add_argument('--clip', type=str, choices=["True", "False"], default="False",help="Enable or disable clipping (True/False). Default is False.")
    parser.add_argument('--save_nifti_path', type=str, help='Path to save the nifti files')

    args           = parser.parse_args()
    raw_data_path  = args.raw_data_path
    csv_save_path  = args.csv_save_path
    dataset_csv    = args.dataset_csv

    create_folder_if_not_exists(csv_save_path)
    create_folder_if_not_exists(args.save_nifti_path)
    # Extract normalization values
    A_min, A_max, B_min, B_max = args.normalization
    # Convert clip argument to boolean
    CLIP = args.clip == "True"
    output_csv = csv_save_path + f'CandidateSeg_{args.dataset_name}_patch{args.patch_size[0]}x{args.patch_size[1]}y{args.patch_size[2]}z.csv'
    Erroroutput_csv = csv_save_path + f'CandidateSeg_{args.dataset_name}_patch{args.patch_size[0]}x{args.patch_size[1]}y{args.patch_size[2]}z_Error.csv'

    # Derive the log file name from the output CSV file
    log_file = output_csv.replace('.csv', '.log')

    # Configure logging
    logging.basicConfig(
        filename=log_file, 
        level=logging.INFO, 
        format="%(asctime)s - %(levelname)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S"
    )

    logging.info(f"Output CSV File: {output_csv}")
    logging.info(f"Error CSV File: {Erroroutput_csv}")
    logging.info(f"Log File Created: {log_file}")
    logging.info("File names generated successfully.")


    ###----input CSV
    df                      = pd.read_csv(dataset_csv)
    final_dect              = df[args.nifti_clm_name].unique()
    output_df               = pd.DataFrame()
    Error_ids = []
    for dictonary_list_i in tqdm(range(0,len(final_dect)), desc='Processing CTs'):
        try:
            logging.info(f"---Loading---: {dictonary_list_i+1}")
            #print(make_bold('|' + '-'*30 + ' No={} '.format(dictonary_list_i+1) + '-'*30 + '|'))
            #print('\n')

            desired_value      = final_dect[dictonary_list_i]
            filtered_df        = df[df[args.nifti_clm_name] == desired_value]
            example_dictionary = filtered_df.reset_index()

            logging.info(f"Loading the Image:{example_dictionary[args.nifti_clm_name][0]}")
            logging.info(f"Number of Annotations:{len(example_dictionary)}")
            #print('Loading the Image: {}'.format(example_dictionary[args.nifti_clm_name][0]))
            #print('Number of Annotations: {}'.format(len(example_dictionary)))
            ct_nifti_path = raw_data_path + example_dictionary[args.nifti_clm_name][0]
            ct_image      = sitk.ReadImage(ct_nifti_path)
            ct_array      = sitk.GetArrayFromImage(ct_image)


            torch_image         = torch.from_numpy(ct_array)
            temp_torch_image    = {"image": torch_image}
            intensity_transform = Compose([ScaleIntensityRanged(keys=["image"], a_min=A_min, a_max=A_max, b_min=B_min, b_max=B_max, clip=CLIP),])
            transformed_image   = intensity_transform(temp_torch_image)
            numpyImage          = transformed_image["image"].numpy()

            for Which_box_to_use in range(0,len(example_dictionary)):

                #print('-----------------------------------------------------------------------------------------------')
                if args.unique_Annotation_id in example_dictionary.columns:
                    annotation_id = example_dictionary[args.unique_Annotation_id][Which_box_to_use]
                else:
                    # Generate an ID using the image name (without extension) and an index
                    image_name = example_dictionary[args.nifti_clm_name][0].split('.nii')[0]
                    annotation_id = f"{image_name}_candidate_{Which_box_to_use+1}"

                
                
                worldCoord = np.asarray([float(example_dictionary[args.coordX][Which_box_to_use]), float(example_dictionary[args.coordY][Which_box_to_use]), float(example_dictionary[args.coordZ][Which_box_to_use])])
                voxelCoord = ct_image.TransformPhysicalPointToIndex(worldCoord)
                # Access individual values
                w = args.patch_size[0]
                h = args.patch_size[1]
                d = args.patch_size[2]
                start_x, end_x = int(voxelCoord[0] - w/2), int(voxelCoord[0] + w/2)
                start_y, end_y = int(voxelCoord[1] - h/2), int(voxelCoord[1] + h/2)
                start_z, end_z = int(voxelCoord[2] - d/2), int(voxelCoord[2] + d/2)
                X, Y, Z = int(voxelCoord[0]), int(voxelCoord[1]), int(voxelCoord[2])
                numpy_to_save_np = numpyImage[max(start_z,0):end_z, max(start_y,0):end_y, max(start_x,0):end_x]

                # Pad if necessary
                if np.any(numpy_to_save_np.shape != (d, h, w)):
                    dZ, dY, dX = numpyImage.shape
                    numpy_to_save_np = np.pad(numpy_to_save_np, ((max(d // 2 - Z, 0), d // 2 - min(dZ - Z, d // 2)),
                                                                 (max(h // 2 - Y, 0), h // 2 - min(dY - Y, h // 2)),
                                                                 (max(w // 2 - X, 0), w // 2 - min(dX - X, w // 2))), mode="constant", constant_values=0.)                

                #--- Segmentation---#
                patch_image       = sitk.GetImageFromArray(numpy_to_save_np)
                patch_image.SetSpacing(ct_image.GetSpacing())
                patch_image.SetDirection(ct_image.GetDirection())
                patch_image.SetOrigin(ct_image.GetOrigin())
                if args.Malignant_lbl in example_dictionary.columns:
                    feature_row = pd.DataFrame({args.nifti_clm_name: [example_dictionary[args.nifti_clm_name][0]],'candidateID': [annotation_id],args.Malignant_lbl: [example_dictionary[args.Malignant_lbl][Which_box_to_use]]})
                else:
                    feature_row = pd.DataFrame({args.nifti_clm_name: [example_dictionary[args.nifti_clm_name][0]],'candidateID': [annotation_id]})
                feature_row[args.coordX] = example_dictionary[args.coordX][Which_box_to_use]
                feature_row[args.coordY] = example_dictionary[args.coordY][Which_box_to_use]
                feature_row[args.coordZ] = example_dictionary[args.coordZ][Which_box_to_use]
                # Save 
                output_nifti_path = os.path.join(args.save_nifti_path, f"{annotation_id}.nii.gz")
                # Ensure the directory exists before writing the file
                output_nifti_dir = os.path.dirname(output_nifti_path)
                os.makedirs(output_nifti_dir, exist_ok=True)         # Creates the directory if it doesn't exist
                sitk.WriteImage(patch_image, output_nifti_path)
                # Append the row to the output DataFrame
                output_df = pd.concat([output_df, feature_row], ignore_index=True)
        except Exception as e:
            logging.error(f"An error occurred: {str(e)}")
            print(f" Error occured: {e}")
            Error_ids.append(final_dect[dictonary_list_i])
            pass
    
    # Save the output DataFrame to a CSV file
    output_df.to_csv(output_csv, index=False,encoding='utf-8')
    print("completed and saved to {}".format(output_csv))
    Erroroutput_df = pd.DataFrame(list(Error_ids),columns=[args.nifti_clm_name])
    Erroroutput_df.to_csv(Erroroutput_csv, index=False,encoding='utf-8')
    print("completed and saved Error to {}".format(Erroroutput_csv))


if __name__ == "__main__":
    nifti_patche_extractor_for_worldCoord_main()