Data_Engineering / PSMA_clean /dataclean_PSMA_petct.py
maxmo2009's picture
Initial upload: data cleanup pipeline for 12 medical imaging datasets
da9fb1e verified
#coding:utf-8
'''
writebyygq
createon2025-08-30
PSMAPET/CT本质上也是一种PET/CT,只是它的示踪剂和传统的18F-FDG不同,目前国际上应用较多的PSMAPET/CT的示踪剂是68GA-PSMA、18F-PSMA,其中68GA及18F是一种放射性核素,具有成像功能,PSMA是前列腺特异性膜抗原,具有引导功能,引导PSMA更准确地向前列腺癌细胞聚拢,这样就大大增加了PSMAPET/CT用于发现前列腺癌的敏感性。
PSMA,全称前列腺特异性膜抗原(Prostate-SpecificMembraneAntigen),是一种与前列腺癌密切相关的蛋白质。存在于前列腺上皮细胞的固有膜蛋白,在前列腺癌细胞表面强表达,在前列腺正常组织和非前列腺组织中表达量相对较低,表达量是正常前列腺细胞的100-1000倍,且与前列腺癌分级和分期呈正相关。这种强表达、高度特异性使得PSMA成为前列腺癌诊断和治疗的重要靶点。
而PSMAPET/CT实际上是一种靶向显像,用放射性核素(常用68Ga、18F)标记PSMA配体作为示踪剂,通过静脉注入体内,经过分布代谢于病灶,然后用PET/CT进行扫描,即完成显像。借助PSMA的引导功能,将放射性核素更精准地聚集在前列腺癌细胞,结合正电子发射断层扫描(PET)和计算机断层扫描(CT),实现对前列腺癌的精准检测。
fdgpet/ct和psmapet/ct检查就像"肿瘤侦探"使用不同的破案工具,各有所长又互为补充。fdg和psma是pet检查使用的两种不同显像剂,二者显像原理不同,因此追踪的“目标分子”不同。
fdgpet/ct
追踪目标:恶性肿瘤细胞消耗的葡萄糖(类似给恶性肿瘤细胞“测饭量”)
原理:恶性肿瘤细胞生长、代谢旺盛,会大量摄取显像剂fdg(葡萄糖类似物),通过检测“高耗能区”定位肿瘤
优势:广谱肿瘤示踪剂,发展成熟、应用广泛,可反应肿瘤恶性程度,同时发现其他部位恶性肿瘤
局限性:肿瘤细胞数量少或处于低度恶性时,常常降低对葡萄糖摄取的需求,pet影像表现为低代谢,此时容易漏诊
psmapet/ct
追踪目标:前列腺特异性膜抗原(前列腺癌细胞戴着的特殊“徽章”)
原理:90%前列腺癌细胞表面戴着这种“徽章”,psma靠着追踪并粘住这种“徽章”精准锁定前列腺癌病灶,哪里亮起来,哪里就有肿瘤
优势:针对性强,能早期发现微小病灶,甚至在其他检查还正常时就预警
局限性:体内存在部分正常或病变细胞,同样具有psma蛋白高度表达的情况,如神经节、神经组织、肉芽肿性病变、肾癌、肺癌等,可能导致假阳性表现。此外,约10%的前列腺癌细胞没有佩戴这种“徽章”,导致漏诊
PSMA-FDG-PET-CT-Lesion 数据集指的是同时包含 PSMA-PET 和 FDG-PET(以及对应CT)两种扫描模态,并且带有病灶标注的医学影像数据集。
这种数据集在前列腺癌研究中具有极高的价值,因为它允许研究者直接比较和分析同一患者体内不同病灶的分子表达特性。
前列腺癌病灶在分子水平上具有异质性。并非所有病灶都表达相同的生物标志物。
PSMA(前列腺特异性膜抗原):在大多数前列腺癌细胞表面过度表达,是前列腺癌相对特异的靶点。PSMA-PET用于检测前列腺癌特异性病灶。
FDG(氟代脱氧葡萄糖):反映细胞的葡萄糖代谢活性。高度侵袭性、低分化的肿瘤通常具有很高的FDG摄取。
PSMA-FDG-PET/CT:
https://autopet-iii.grand-challenge.org/
"channel_names": {
"0": "CT",
"1": "CT"--PET
},
"labels": {
"background": 0,
"tumor": 1
},
同一个病例同在000,001两个影像,分别表示CT,PET,合并到第四个维度作为SUB_MODALITY
label:
0:backgroud 1: tumor
FDG-元数据信息
'Series UID', 'Collection', '3rd Party Analysis',
'Data Description URI', 'Subject ID', 'Study UID', 'Study Description',
'Study Date', 'Series Description', 'Manufacturer', 'Modality',
'SOP Class Name', 'SOP Class UID', 'Number of Images', 'File Size',
'File Location', 'Download Timestamp', 'diagnosis', 'age', 'sex'
通过Subject ID,以及Modality共同确定唯一的描述信息,获取相应的,Study Description,Study Date,Series Description, Manufacturer,diagnosis, age, sex信息;【只获取CT模态的一行描述信息即可】
FDG文件名组成:fdg_b2f82ed4b9_04-17-2003-NA-PET-CT Ganzkoerper primaer mit KM-26753_[0000].nii.gz
Subject ID[PETCT_b2f82ed4b9] && Modality[CT]
PSMA-元数据信息
'Subject ID', 'Study Date', 'age', 'manufacturer_model_name',
'pet_radionuclide', 'ct_contrast_agent'
需要依靠'Subject ID', 'Study Date'共同确定唯一,存在相同的subject_id不同时间的样例--作为单独数据处理,
PSMA文件名组成:psma_d5b636ea4da7638b_2019-03-15_[0000].nii.gz
Subject ID[psma_d5b636ea4da7638b]&&Study Date[2019-03-15]
综上:将id定义为subject_id+study_date 共同标识唯一的ID
处理流程:
1.查找所有的ID;
2.根据ID查找对应的两个channel的影像以及对应的label;
3.对两个channel的影像进行合并转4D;
4.按照4D图像处理的惯例(第四个维度不参与计算,取前3个的spaceing最小值)重采样插值;--label
5.保存
'''
import os
import glob
import pandas as pd
import SimpleITK as sitk
import argparse
import json
from tqdm import tqdm
from util import meta_data
import util
import numpy as np
# from bert_helper import *
import shutil
##dataset_meta
meta_id_name='BraTS_2019_subject_ID'
meta_grade_name='Grade'
##HGG_survival_info
survival_id_name='BraTS19ID'
meta_age_name='Age'
meta_survival_name='Survival'
meta_status_name='ResectionStatus'
TASK_VALUE="segmentation"
CLAMP_RANGE_CT = [-300,300]
CLAMP_RANGE_MRI = None # MRI images threshold placeholder TBC...
TARGET_VOXEL_SPACING=None
##参考MSD的sub_modality描述信息
SUB_MODALITY=["CT","PET"]
##文件名对应的排序顺序
SERIES_ORDER=["0000","0001"]
LABEL_DICT={
"0":"backgroud",
"1":"tumor",
}
PSMA_META_COLUMN=['Subject ID', 'Study Date', 'age', 'manufacturer_model_name','pet_radionuclide', 'ct_contrast_agent']
FDG_META_COLUMN=['Subject ID', 'Study Description','Study Date', 'Series Description', 'Manufacturer', 'Modality','diagnosis', 'age', 'sex']
# def find_metadata_files(path):
# # for Cancer Image Archive (TCIA) dataset
# search_pattern = os.path.join(path, '**', 'metadata.csv')
# return glob.glob(search_pattern, recursive=True)
def find_metadata_files(path):
# for Cancer Image Archive (TCIA) dataset
search_pattern = os.path.join(path, '*.csv')
return glob.glob(search_pattern, recursive=True)
##added by yanguoqing on 20250527
def find_image_dirs(path):
return os.listdir(path)
##modify by yanguoqing on 20250527
def load_dicom_images(folder_path):
reader = sitk.ImageSeriesReader()
dicom_names = reader.GetGDCMSeriesFileNames(folder_path)
reader.SetFileNames(dicom_names)
image = reader.Execute()
return dicom_names,image
##added by yanguoqing on 20250527
def load_dicom_tag(imgs):
reader = sitk.ImageFileReader()
# dicom_names = reader.GetGDCMSeriesFileNames(folder_path)
reader.SetFileName(imgs)
reader.ReadImageInformation() # 仅读取元信息,不加载像素数据
# metadata_keys = reader.GetMetaDataKeys()
tag=reader.Execute()
return tag
def load_nrrd(fp):
return sitk.ReadImage(fp)
##modify by yanguoqing on 20250830
def merge_images(series_files):
'''
每个病例包含两种不同序列的 CT:CT/PET--0000/0001
将多个分开的模态合并,构建第四个维度的数组,分别按照CT,PET顺序存放
'''
reader = sitk.ImageSeriesReader()
reader.SetFileNames(series_files)
image = reader.Execute()
return image
def save_nifti(image, output_path, folder_path):
# Set metadata in the NIfTI file's header
output_dirpath = os.path.dirname(output_path)
if not os.path.exists(output_dirpath):
print(f"Creating directory {output_dirpath}")
os.makedirs(output_dirpath)
# Set metadata in the NIfTI file's header
image.SetMetaData("FolderPath", folder_path)
sitk.WriteImage(image, output_path)
##modify by yanguoqing on 20250527
def convert_windows_to_linux_path(windows_path):
# Replace backslashes with forward slashes and remove the drive letter
# Some meta files have windows paths, but the data is stored on a linux server
linux_path = windows_path.replace('\\', '/')
if ':' in linux_path:
linux_path = linux_path.split(':', 1)[1]
return linux_path
##added by yanguoqing on 2025-08-30
##获取PSMA-PET-CT的1614个数据名称
def get_filename_list(fp):
with open(fp,'r') as fi:
fls=json.load(fi)
filename_list=fls[0]['train']+fls[0]['val']
return filename_list
##获取study_id以及study_date
def check_fname(fname):
if fname.startswith("fdg"):
sid=fname[:14]
sdate=fname[15:25]
else:
sid=fname[:21]
sdate=fname[22:]
return sid,sdate
def main(target_path, output_dir):
# metadata_files = find_metadata_files(target_path)
# pid_dirs=find_image_dirs(target_path)
fdg_meta="fdg_metadata.csv"
psma_meta="psma_metadata.csv"
filename_meta="splits_final.json" ##包含所有1614个数据的名称列表信息
# pid_dirs=["imagesTr","labelsTr"]
pid_dirs=["imagesTr"]
failed_files = []
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
json_output_path = os.path.join(output_dir, 'nifti_mappings.json')
failed_files_path = os.path.join(output_dir, 'failed_files.json')
meta = meta_data()
# Initialize the JSON file
if not os.path.exists(json_output_path):
with open(json_output_path, 'w') as json_file:
json.dump({}, json_file)
psma_meta_file=os.path.join(target_path,psma_meta)
fdg_meta_file=os.path.join(target_path,fdg_meta)
filename_file=os.path.join(target_path,filename_meta)
pdf_meta=pd.read_csv(psma_meta_file)
fdf_meta=pd.read_csv(fdg_meta_file)
fp_names=get_filename_list(filename_file)
##从辅助文件信息中获取所有1614个病例名称,每个病例名称存在0000,0001两个三维影像数据,按照顺序合并;
if pid_dirs:
for pid_dir in tqdm(pid_dirs, desc="Processing all dataset"):
for fp_name in tqdm(fp_names, desc="Processing all dataset"):
ct_fp=os.path.join(target_path,pid_dir,fp_name+"_0000.nii.gz")
pet_fp=os.path.join(target_path,pid_dir,fp_name+"_0001.nii.gz")
label_fp=os.path.join(target_path,'labelsTr',fp_name+".nii.gz")
modality="CT"
study='PSMA-FDG-PET-CT-LESION'##Dataset_name
CIA_other_info = {'metadata_file':''}
CIA_other_info['split'] = "train"
if fp_name.startswith("fdg"):
CIA_other_info['metadata_file']=fdg_meta_file
df_meta=fdf_meta
sid,sdate=check_fname(fp_name)
study_id=sid.replace("fdg","PETCT")
data_info_row=df_meta[np.logical_and(df_meta['Subject ID']==study_id,df_meta['Modality']=='CT')]
data_info_row=data_info_row.reset_index()
for keyname in FDG_META_COLUMN:
CIA_other_info[keyname]=str(data_info_row[keyname][0])
CIA_other_info['Image_id']=fp_name
else:
CIA_other_info['metadata_file']=psma_meta_file
df_meta=pdf_meta
sid,sdate=check_fname(fp_name)
study_id=sid.replace("psma","PSMA")
# print('>>',study_id,sdate)
data_info_row=df_meta[np.logical_and(df_meta['Subject ID']==study_id,df_meta['Study Date']==sdate)]
data_info_row=data_info_row.reset_index()
# print(data_info_row.columns)
for keyname in PSMA_META_COLUMN:
print(keyname)
print(data_info_row[keyname][0])
CIA_other_info[keyname]=str(data_info_row[keyname][0])
CIA_other_info['Image_id']=fp_name
try:
##读取MRI四组文件,按照flair,t1,t1ce,t2的顺序叠加,对于seg先剔除不参与
series_files=[ct_fp,pet_fp]
sub_modality=['CT','PET']
if len(series_files)>0:
##存在有效的MRI影像数据进行后续处理
sitk_img_original=merge_images(series_files)
original_spacing = list(sitk_img_original.GetSpacing())
original_size = list(sitk_img_original.GetSize())
is_4d_image = sitk_img_original.GetDimension() == 4
frame_flag=False
# --- Resampling Logic (Revised for 4D) ---
if is_4d_image:
# Always process 4D images channel-wise for resampling
# logging.info(f" Processing 4D image channel-wise: {original_img_full_path}") # Keep log for errors only
channels = []
num_channels = original_size[3] if len(original_size) == 4 and sitk_img_original.GetDimension() == 4 else 1
channel_target_spacing = TARGET_VOXEL_SPACING if TARGET_VOXEL_SPACING else original_spacing[:3] # Use 3D spacing
for i in range(num_channels):
extractor = sitk.ExtractImageFilter()
current_3d_channel_size = original_size[:3]
if sitk_img_original.GetDimension() == 4:
extractor.SetSize([current_3d_channel_size[0], current_3d_channel_size[1], current_3d_channel_size[2], 0])
extractor.SetIndex([0,0,0,i])
channel_3d_img = extractor.Execute(sitk_img_original)
else:
channel_3d_img = sitk_img_original
if i > 0: break
channel_resampler = util.get_unisize_resampler(
channel_3d_img, 'linear',
spacing=channel_target_spacing, size=current_3d_channel_size
)
if channel_resampler:
channels.append(channel_resampler.Execute(channel_3d_img))
else:
channels.append(channel_3d_img)
if channels:
if len(channels) > 1: # Only join if there are multiple channels
sitk_img_processed = sitk.JoinSeriesImageFilter().Execute(channels)
##aded by yanguoqing on 2025-08-11
frame_flag=True
# imgDict={}
# for kf_idx in range(num_channels):
# imgDict[str(kf_idx)]='none'
# if str(meta_ed):imgDict[str(meta_ed)]='ed'
# if str(meta_es):imgDict[str(meta_es)]='es'
# meta.add_keyvalue('ImgDict',imgDict)
elif len(channels) == 1: # If only one channel resulted (e.g. original was 3D misidentified as 4D by tensorImageSize)
sitk_img_processed = channels[0]
elif TARGET_VOXEL_SPACING: # 3D image with target spacing
img_resampler_obj = util.get_unisize_resampler(sitk_img_original, 'linear',
spacing=TARGET_VOXEL_SPACING, size=original_size)
if img_resampler_obj: sitk_img_processed = img_resampler_obj.Execute(sitk_img_original)
else: # 3D image, no TARGET_VOXEL_SPACING
img_resampler_obj = util.get_unisize_resampler(sitk_img_original, 'linear',
spacing=original_spacing, size=original_size)
if img_resampler_obj: sitk_img_processed = img_resampler_obj.Execute(sitk_img_original)
output_path = os.path.join(output_dir,fp_name,fp_name+".nii.gz")
# output_path=convert_windows_to_linux_path(output_path)
save_nifti(sitk_img_processed, output_path, os.path.dirname(ct_fp))
print(f"Saved NIfTI file to {output_path}")
size_processed = list(sitk_img_processed.GetSize())
print('size_processed',size_processed,original_size)
# meta.add_keyvalue('Image_id',meta_image_id)
meta.add_keyvalue('Spacing_mm',min(original_spacing[:3]))##保留前三个x,y,z的最小spacing
meta.add_keyvalue('OriImg_path',",".join(series_files))
meta.add_keyvalue('Size',size_processed) # 这里用处理后的size -- YH Jachin
meta.add_keyvalue('Modality',modality)
meta.add_keyvalue('Dataset_name',study)
meta.add_keyvalue('ROI','whole-body')
sub_modality_dict={}
for idx,value in enumerate(sub_modality):
if value:
sub_modality_dict[str(idx)]=SUB_MODALITY[idx]
meta.add_keyvalue('Sub_modality',sub_modality_dict)
meta.add_keyvalue('Label_Dict',LABEL_DICT)
##Label processing
label_path_dict={}
full_label_file=label_fp
full_path_label=os.path.dirname(full_label_file)
process_label_path=os.path.join(output_dir,fp_name,'segmentation')
processed_lbl_full_path=os.path.join(process_label_path, f"{fp_name}.nii.gz")
if not os.path.isdir(process_label_path):
os.makedirs(process_label_path,exist_ok=True)
if not os.path.isfile(full_label_file):
pass
label_flag=False
else:
sitk_lbl_original = util.load_nifti(full_label_file)
if sitk_lbl_original:
label_resampler = sitk.ResampleImageFilter()
reference_for_label = sitk_img_processed # Default to processed image
if sitk_img_processed.GetDimension() == 4:
num_comp_proc = sitk_img_processed.GetSize()[3] if len(sitk_img_processed.GetSize()) == 4 else 1
if num_comp_proc > 0:
extractor = sitk.ExtractImageFilter()
proc_img_size_for_lbl_ref = sitk_img_processed.GetSize()
extractor.SetSize([proc_img_size_for_lbl_ref[0], proc_img_size_for_lbl_ref[1], proc_img_size_for_lbl_ref[2], 0])
extractor.SetIndex([0,0,0,0])
try:
reference_for_label = extractor.Execute(sitk_img_processed)
except Exception as ref_err:
print(f" Failed to extract 3D reference from 4D image: {output_path} for label alignment.")
# print(traceback.format_exc())
reference_for_label = None
else: # Fallback if extraction fails
print(f" Could not extract 3D reference for label from 4D image {output_path}. Label may not be correctly resampled.")
reference_for_label = None # This will cause an issue below if not handled
sitk_lbl_processed = None
if reference_for_label and reference_for_label.GetDimension() > 0:
label_resampler.SetInterpolator(sitk.sitkNearestNeighbor)
label_resampler.SetOutputPixelType(sitk_lbl_original.GetPixelID())
if sitk_lbl_original.GetDimension() == 4:
lbl_channels = []
lbl_size = list(sitk_lbl_original.GetSize())
for i in range(lbl_size[3]):
extractor = sitk.ExtractImageFilter()
extractor.SetSize([lbl_size[0], lbl_size[1], lbl_size[2], 0])
extractor.SetIndex([0, 0, 0, i])
single_channel = extractor.Execute(sitk_lbl_original)
label_resampler.SetReferenceImage(reference_for_label)
resampled_channel = label_resampler.Execute(single_channel)
lbl_channels.append(resampled_channel)
if len(lbl_channels) > 1:
sitk_lbl_processed = sitk.JoinSeriesImageFilter().Execute(lbl_channels)
elif len(lbl_channels) == 1:
sitk_lbl_processed = lbl_channels[0]
else:
label_resampler.SetReferenceImage(reference_for_label)
sitk_lbl_processed = label_resampler.Execute(sitk_lbl_original)
if processed_lbl_full_path:
if sitk_img_processed.GetSize()[:3] != sitk_lbl_processed.GetSize()[:3]:
print(f" Mismatch between image and label size (ignoring channels):")
print(f" Image size: {sitk_img_processed.GetSize()}")
print(f" Label size: {sitk_lbl_processed.GetSize()}")
util.save_nifti(sitk_lbl_processed, processed_lbl_full_path, full_path_label)
else:
print(f" Failed to set reference image for label resampling for {full_path_label}. Saving original label.")
util.save_nifti(sitk_lbl_original, processed_lbl_full_path, full_path_label) # Save original
# processed_lbl_full_path should still point to this saved original label
sitk_lbl_processed=sitk_lbl_original
else:
processed_lbl_full_path = None
util.save_nifti(sitk_lbl_original, processed_lbl_full_path, full_label_file) # Save original
print(f"Saved Segemention NIfTI file to {processed_lbl_full_path}")
if processed_lbl_full_path:
label_path_dict['tumor'] = processed_lbl_full_path
print(label_path_dict.keys())
meta.add_keyvalue('Task',TASK_VALUE)
# meta.add_keyvalue('Label_tissue',list(label_path_dict.keys()))
meta.add_keyvalue('Label_path',{TASK_VALUE:label_path_dict})
meta.add_keyvalue('Label_Dict',LABEL_DICT)
meta.add_extra_keyvalue('Metadata',CIA_other_info)
# try:
# assert sitk_img_processed.GetSize() == sitk_lbl_processed.GetSize()
# except Exception as e:
# failed_files.append(full_path_label)
# continue
print(sitk_img_original.GetSize(),sitk_lbl_original.GetSize())
except Exception as e:
print(e)
failed_files.append(ct_fp)
print(f"Failed to load PSMA images from {ct_fp}")
continue
meta.add_extra_keyvalue('Metadata',CIA_other_info)
# Write the mapping to the JSON file on the fly
with open(json_output_path, 'r+') as json_file:
existing_mappings = json.load(json_file)
existing_mappings[output_path] = meta.get_meta_data()
json_file.seek(0)
# print(existing_mappings)
json.dump(existing_mappings, json_file, indent=4)
json_file.truncate()
# else:
# print("No metadata.csv files found.")
with open(failed_files_path, "w") as json_file:
json.dump(failed_files, json_file)
print(f"The list has been written to {failed_files_path}")
print(f"Saved NIfTI mappings to {json_output_path}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process DICOM files and save as NIfTI.")
parser.add_argument("--target_path", type=str, help="Path to the target directory containing metadata files.", default="/home/data/Github/data/data_gen_def/DATASETS/PSMA/psma-fdg-pet-ct-lesion/")
parser.add_argument("--output_dir", type=str, help="Directory to save the NIfTI files.", default="/home/data/Github/data/data_gen_def/DATASETS_processed/PSMA/PSMA-FDG-PET-CT-LESION/")
args = parser.parse_args()
print(args.target_path, args.output_dir)
main(args.target_path, args.output_dir)