#coding:utf-8 ''' OAI-ZIB Dataset Processing Script create on 2026-03-05 OAI-ZIB: Osteoarthritis Initiative dataset curated by ZIB (Zuse Institute Berlin). Contains RIGHT knee MRI scans and corresponding segmentation labelmaps for 507 subjects, split into train (253) and test (254) sets. All images are RIGHT knee (confirmed via OAIZIB-CM kneeSideInfo.csv). Label values: 0: background 1: femur 2: femoral cartilage 3: tibia 4: medial tibial cartilage 5: lateral tibial cartilage Nonimaging metadata extracted per subject (baseline visit V00, right knee): - enrollee01.txt: age, gender, race, ethnicity, cohort - oscf01.txt: BMI, height, weight - kxrsq01.txt: KL grade (right knee, Kellgren-Lawrence OA severity 0-4) - womac01.txt: WOMAC scores (right knee: pain, ADL, stiffness) ''' import os import glob import csv import argparse import json import SimpleITK as sitk from tqdm import tqdm from util import meta_data import util TASK_VALUE = "segmentation" TARGET_SPACING = [0.36, 0.36, 0.36] # isotropic resampling target (mm) def resample_to_isotropic(sitk_img, target_spacing=TARGET_SPACING, interpolator=sitk.sitkLinear): """Resample a SimpleITK image to isotropic spacing.""" original_spacing = sitk_img.GetSpacing() original_size = sitk_img.GetSize() new_size = [ int(round(osz * osp / tsp)) for osz, osp, tsp in zip(original_size, original_spacing, target_spacing) ] resampler = sitk.ResampleImageFilter() resampler.SetOutputSpacing(target_spacing) resampler.SetSize(new_size) resampler.SetOutputDirection(sitk_img.GetDirection()) resampler.SetOutputOrigin(sitk_img.GetOrigin()) resampler.SetInterpolator(interpolator) resampler.SetDefaultPixelValue(0) resampler.SetTransform(sitk.Transform()) return resampler.Execute(sitk_img) LABEL_DICT = { "0": "background", "1": "femur", "2": "femoral cartilage", "3": "tibia", "4": "medial tibial cartilage", "5": "lateral tibial cartilage" } def load_nonimaging_table(filepath): """Load a tab-delimited nonimaging .txt file, skipping the description row (row 2).""" rows = [] with open(filepath, 'r') as f: reader = csv.DictReader(f, delimiter='\t', quotechar='"') for i, row in enumerate(reader): if i == 0: # Row 0 after header is the description row — skip it continue rows.append(row) return rows def build_subject_lookup(rows, key='src_subject_id', visit_filter=None): """Build a dict keyed by subject ID. If visit_filter is set, only keep rows with that visit.""" lookup = {} for row in rows: sid = row.get(key, '').strip('"') visit = row.get('visit', '').strip('"') if visit_filter and visit != visit_filter: continue if sid not in lookup: lookup[sid] = row return lookup def load_all_nonimaging(nonimaging_dir): """Load and index all relevant nonimaging tables by subject ID (baseline V00).""" tables = {} # enrollee01: demographics (use V00 baseline) fp = os.path.join(nonimaging_dir, 'enrollee01.txt') if os.path.isfile(fp): tables['enrollee'] = build_subject_lookup(load_nonimaging_table(fp), visit_filter='V00') # oscf01: BMI, height, weight (prefer V00, fallback to any visit with BMI) fp = os.path.join(nonimaging_dir, 'oscf01.txt') if os.path.isfile(fp): rows = load_nonimaging_table(fp) oscf_lookup = {} for row in rows: sid = row.get('src_subject_id', '').strip('"') bmi = row.get('bmi', '').strip('"') visit = row.get('visit', '').strip('"') if not bmi: continue # Prefer V00, otherwise keep first available if sid not in oscf_lookup or visit == 'V00': oscf_lookup[sid] = row tables['oscf'] = oscf_lookup # kxrsq01: KL grade (use V00, RIGHT knee only: side=1) fp = os.path.join(nonimaging_dir, 'kxrsq01.txt') if os.path.isfile(fp): rows = load_nonimaging_table(fp) kl_lookup = {} for row in rows: sid = row.get('src_subject_id', '').strip('"') visit = row.get('visit', '').strip('"') side = row.get('side', '').strip('"') if visit != 'V00' or side != '1': # side=1 is RIGHT continue kl = row.get('xrkl', '').strip('"') if sid not in kl_lookup: kl_lookup[sid] = kl tables['kl_grade'] = kl_lookup # womac01: WOMAC scores (use V00) fp = os.path.join(nonimaging_dir, 'womac01.txt') if os.path.isfile(fp): tables['womac'] = build_subject_lookup(load_nonimaging_table(fp), visit_filter='V00') return tables def get_subject_metadata(subject_id, tables): """Extract relevant metadata for a subject from preloaded tables.""" info = {} info['Knee_Side'] = 'right' # Demographics from enrollee01 enrollee = tables.get('enrollee', {}).get(subject_id, {}) if enrollee: info['Age'] = enrollee.get('ageyears', '').strip('"') info['Gender'] = enrollee.get('gender', '').strip('"') info['Race'] = enrollee.get('race', '').strip('"') info['Ethnicity'] = enrollee.get('ethnicity', '').strip('"') info['Cohort'] = enrollee.get('e_cohort', '').strip('"') # BMI from oscf01 oscf = tables.get('oscf', {}).get(subject_id, {}) if oscf: info['BMI'] = oscf.get('bmi', '').strip('"') info['Height_mm'] = oscf.get('height_av', '').strip('"') info['Weight_kg'] = oscf.get('weight_met', '').strip('"') # KL grade from kxrsq01 (right knee only) kl = tables.get('kl_grade', {}).get(subject_id) if kl is not None: info['KL_Grade'] = kl # WOMAC scores from womac01 (right knee only) womac = tables.get('womac', {}).get(subject_id, {}) if womac: info['WOMAC_Pain'] = womac.get('womkpr', '').strip('"') info['WOMAC_ADL'] = womac.get('womadlr', '').strip('"') info['WOMAC_Stiffness'] = womac.get('womtsr', '').strip('"') return info def main(target_path, output_dir): if not os.path.isdir(output_dir): os.makedirs(output_dir) failed_files = [] # Load nonimaging metadata nonimaging_dir = os.path.join(target_path, 'nonimaging', 'NonImaging') print("Loading nonimaging metadata...") tables = load_all_nonimaging(nonimaging_dir) print(f" enrollee: {len(tables.get('enrollee', {}))} subjects") print(f" oscf (BMI): {len(tables.get('oscf', {}))} subjects") print(f" kl_grade (right): {len(tables.get('kl_grade', {}))} subjects") print(f" womac: {len(tables.get('womac', {}))} subjects") # Process train and test splits into separate folders for split in ['train', 'test']: image_dir = os.path.join(target_path, 'images', split) label_dir = os.path.join(target_path, 'labels', split) if not os.path.isdir(image_dir): print(f"Image directory not found: {image_dir}") continue split_output_dir = os.path.join(output_dir, split) os.makedirs(split_output_dir, exist_ok=True) json_output_path = os.path.join(split_output_dir, 'nifti_mappings.json') # Initialize the JSON file fresh with open(json_output_path, 'w') as json_file: json.dump({}, json_file) image_files = sorted(glob.glob(os.path.join(image_dir, '*.nii.gz'))) print(f"\nProcessing {split} split: {len(image_files)} subjects -> {split_output_dir}") for image_path in tqdm(image_files, desc=f"Processing {split}"): filename = os.path.basename(image_path) # e.g. 9002817.nii.gz subject_id = filename.replace('.nii.gz', '') try: # Read original image sitk_img = sitk.ReadImage(image_path) original_size = list(sitk_img.GetSize()) original_spacing = list(sitk_img.GetSpacing()) # Resample to isotropic sitk_img_iso = resample_to_isotropic(sitk_img, TARGET_SPACING, sitk.sitkLinear) resampled_size = list(sitk_img_iso.GetSize()) resampled_spacing = list(sitk_img_iso.GetSpacing()) # Build metadata (use resampled size/spacing) meta = meta_data() meta.add_keyvalue('Modality', 'MRI') meta.add_keyvalue('OriImg_path', image_path) meta.add_keyvalue('Spacing_mm', min(resampled_spacing)) meta.add_keyvalue('Size', resampled_size) meta.add_keyvalue('Dataset_name', 'OAI_ZIB') meta.add_keyvalue('ROI', 'leg') meta.add_keyvalue('Label_Dict', LABEL_DICT) # Output paths output_subject_dir = os.path.join(split_output_dir, subject_id) output_image_file = os.path.join(output_subject_dir, f"{subject_id}.nii.gz") # Save resampled image util.save_nifti(sitk_img_iso, output_image_file, image_path) # Process label (use nearest-neighbor interpolation to preserve discrete labels) label_path = os.path.join(label_dir, filename) if os.path.isfile(label_path): sitk_lbl = sitk.ReadImage(label_path) sitk_lbl_iso = resample_to_isotropic(sitk_lbl, TARGET_SPACING, sitk.sitkNearestNeighbor) process_label_dir = os.path.join(output_subject_dir, 'segmentation') processed_lbl_path = os.path.join(process_label_dir, f"{subject_id}.nii.gz") os.makedirs(process_label_dir, exist_ok=True) util.save_nifti(sitk_lbl_iso, processed_lbl_path, label_path) label_path_dict = {'knee': processed_lbl_path} meta.add_keyvalue('Task', TASK_VALUE) meta.add_keyvalue('Label_path', {TASK_VALUE: label_path_dict}) print(f" {subject_id}: {original_size} @ {[f'{s:.3f}' for s in original_spacing]} -> {resampled_size} @ {[f'{s:.3f}' for s in resampled_spacing]}") # Build extra metadata from nonimaging extra_info = { 'split': split, 'Image_id': subject_id, 'nonimaging_dir': nonimaging_dir, } subject_meta = get_subject_metadata(subject_id, tables) extra_info.update(subject_meta) meta.add_extra_keyvalue('Metadata', extra_info) # Write mapping with open(json_output_path, 'r+') as json_file: existing_mappings = json.load(json_file) existing_mappings[output_image_file] = meta.get_meta_data() json_file.seek(0) json.dump(existing_mappings, json_file, indent=4) json_file.truncate() except Exception as e: print(f" Failed {subject_id}: {e}") failed_files.append(subject_id) continue # Save failed files failed_files_path = os.path.join(output_dir, 'failed_files.json') with open(failed_files_path, "w") as json_file: json.dump(failed_files, json_file) print(f"\nDone. Failed files ({len(failed_files)}): {failed_files_path}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Process OAI-ZIB dataset and save as processed NIfTI with mappings.") parser.add_argument("--target_path", type=str, default="/home/dn-zhen2/rds/rds-airr-p51-TWhPgQVLKbA/Data/DATASETS/OAI_ZIB", help="Path to raw OAI-ZIB dataset directory.") parser.add_argument("--output_dir", type=str, default="/home/dn-zhen2/rds/rds-airr-p51-TWhPgQVLKbA/Data/Omini3D/DATASETS_processed/OAI_ZIB", help="Directory to save processed NIfTI files and mappings.") args = parser.parse_args() print(f"Input: {args.target_path}") print(f"Output: {args.output_dir}") main(args.target_path, args.output_dir)