| |
| ''' |
| OAI-ZIB Dataset Processing Script |
| create on 2026-03-05 |
| |
| OAI-ZIB: Osteoarthritis Initiative dataset curated by ZIB (Zuse Institute Berlin). |
| Contains RIGHT knee MRI scans and corresponding segmentation labelmaps for 507 |
| subjects, split into train (253) and test (254) sets. |
| |
| All images are RIGHT knee (confirmed via OAIZIB-CM kneeSideInfo.csv). |
| |
| Label values: |
| 0: background |
| 1: femur |
| 2: femoral cartilage |
| 3: tibia |
| 4: medial tibial cartilage |
| 5: lateral tibial cartilage |
| |
| Nonimaging metadata extracted per subject (baseline visit V00, right knee): |
| - enrollee01.txt: age, gender, race, ethnicity, cohort |
| - oscf01.txt: BMI, height, weight |
| - kxrsq01.txt: KL grade (right knee, Kellgren-Lawrence OA severity 0-4) |
| - womac01.txt: WOMAC scores (right knee: pain, ADL, stiffness) |
| ''' |
| import os |
| import glob |
| import csv |
| import argparse |
| import json |
| import SimpleITK as sitk |
| from tqdm import tqdm |
| from util import meta_data |
| import util |
|
|
|
|
| TASK_VALUE = "segmentation" |
| TARGET_SPACING = [0.36, 0.36, 0.36] |
|
|
|
|
| def resample_to_isotropic(sitk_img, target_spacing=TARGET_SPACING, interpolator=sitk.sitkLinear): |
| """Resample a SimpleITK image to isotropic spacing.""" |
| original_spacing = sitk_img.GetSpacing() |
| original_size = sitk_img.GetSize() |
|
|
| new_size = [ |
| int(round(osz * osp / tsp)) |
| for osz, osp, tsp in zip(original_size, original_spacing, target_spacing) |
| ] |
|
|
| resampler = sitk.ResampleImageFilter() |
| resampler.SetOutputSpacing(target_spacing) |
| resampler.SetSize(new_size) |
| resampler.SetOutputDirection(sitk_img.GetDirection()) |
| resampler.SetOutputOrigin(sitk_img.GetOrigin()) |
| resampler.SetInterpolator(interpolator) |
| resampler.SetDefaultPixelValue(0) |
| resampler.SetTransform(sitk.Transform()) |
|
|
| return resampler.Execute(sitk_img) |
|
|
| LABEL_DICT = { |
| "0": "background", |
| "1": "femur", |
| "2": "femoral cartilage", |
| "3": "tibia", |
| "4": "medial tibial cartilage", |
| "5": "lateral tibial cartilage" |
| } |
|
|
|
|
| def load_nonimaging_table(filepath): |
| """Load a tab-delimited nonimaging .txt file, skipping the description row (row 2).""" |
| rows = [] |
| with open(filepath, 'r') as f: |
| reader = csv.DictReader(f, delimiter='\t', quotechar='"') |
| for i, row in enumerate(reader): |
| if i == 0: |
| |
| continue |
| rows.append(row) |
| return rows |
|
|
|
|
| def build_subject_lookup(rows, key='src_subject_id', visit_filter=None): |
| """Build a dict keyed by subject ID. If visit_filter is set, only keep rows with that visit.""" |
| lookup = {} |
| for row in rows: |
| sid = row.get(key, '').strip('"') |
| visit = row.get('visit', '').strip('"') |
| if visit_filter and visit != visit_filter: |
| continue |
| if sid not in lookup: |
| lookup[sid] = row |
| return lookup |
|
|
|
|
| def load_all_nonimaging(nonimaging_dir): |
| """Load and index all relevant nonimaging tables by subject ID (baseline V00).""" |
| tables = {} |
|
|
| |
| fp = os.path.join(nonimaging_dir, 'enrollee01.txt') |
| if os.path.isfile(fp): |
| tables['enrollee'] = build_subject_lookup(load_nonimaging_table(fp), visit_filter='V00') |
|
|
| |
| fp = os.path.join(nonimaging_dir, 'oscf01.txt') |
| if os.path.isfile(fp): |
| rows = load_nonimaging_table(fp) |
| oscf_lookup = {} |
| for row in rows: |
| sid = row.get('src_subject_id', '').strip('"') |
| bmi = row.get('bmi', '').strip('"') |
| visit = row.get('visit', '').strip('"') |
| if not bmi: |
| continue |
| |
| if sid not in oscf_lookup or visit == 'V00': |
| oscf_lookup[sid] = row |
| tables['oscf'] = oscf_lookup |
|
|
| |
| fp = os.path.join(nonimaging_dir, 'kxrsq01.txt') |
| if os.path.isfile(fp): |
| rows = load_nonimaging_table(fp) |
| kl_lookup = {} |
| for row in rows: |
| sid = row.get('src_subject_id', '').strip('"') |
| visit = row.get('visit', '').strip('"') |
| side = row.get('side', '').strip('"') |
| if visit != 'V00' or side != '1': |
| continue |
| kl = row.get('xrkl', '').strip('"') |
| if sid not in kl_lookup: |
| kl_lookup[sid] = kl |
| tables['kl_grade'] = kl_lookup |
|
|
| |
| fp = os.path.join(nonimaging_dir, 'womac01.txt') |
| if os.path.isfile(fp): |
| tables['womac'] = build_subject_lookup(load_nonimaging_table(fp), visit_filter='V00') |
|
|
| return tables |
|
|
|
|
| def get_subject_metadata(subject_id, tables): |
| """Extract relevant metadata for a subject from preloaded tables.""" |
| info = {} |
| info['Knee_Side'] = 'right' |
|
|
| |
| enrollee = tables.get('enrollee', {}).get(subject_id, {}) |
| if enrollee: |
| info['Age'] = enrollee.get('ageyears', '').strip('"') |
| info['Gender'] = enrollee.get('gender', '').strip('"') |
| info['Race'] = enrollee.get('race', '').strip('"') |
| info['Ethnicity'] = enrollee.get('ethnicity', '').strip('"') |
| info['Cohort'] = enrollee.get('e_cohort', '').strip('"') |
|
|
| |
| oscf = tables.get('oscf', {}).get(subject_id, {}) |
| if oscf: |
| info['BMI'] = oscf.get('bmi', '').strip('"') |
| info['Height_mm'] = oscf.get('height_av', '').strip('"') |
| info['Weight_kg'] = oscf.get('weight_met', '').strip('"') |
|
|
| |
| kl = tables.get('kl_grade', {}).get(subject_id) |
| if kl is not None: |
| info['KL_Grade'] = kl |
|
|
| |
| womac = tables.get('womac', {}).get(subject_id, {}) |
| if womac: |
| info['WOMAC_Pain'] = womac.get('womkpr', '').strip('"') |
| info['WOMAC_ADL'] = womac.get('womadlr', '').strip('"') |
| info['WOMAC_Stiffness'] = womac.get('womtsr', '').strip('"') |
|
|
| return info |
|
|
|
|
| def main(target_path, output_dir): |
| if not os.path.isdir(output_dir): |
| os.makedirs(output_dir) |
|
|
| failed_files = [] |
|
|
| |
| nonimaging_dir = os.path.join(target_path, 'nonimaging', 'NonImaging') |
| print("Loading nonimaging metadata...") |
| tables = load_all_nonimaging(nonimaging_dir) |
| print(f" enrollee: {len(tables.get('enrollee', {}))} subjects") |
| print(f" oscf (BMI): {len(tables.get('oscf', {}))} subjects") |
| print(f" kl_grade (right): {len(tables.get('kl_grade', {}))} subjects") |
| print(f" womac: {len(tables.get('womac', {}))} subjects") |
|
|
| |
| for split in ['train', 'test']: |
| image_dir = os.path.join(target_path, 'images', split) |
| label_dir = os.path.join(target_path, 'labels', split) |
|
|
| if not os.path.isdir(image_dir): |
| print(f"Image directory not found: {image_dir}") |
| continue |
|
|
| split_output_dir = os.path.join(output_dir, split) |
| os.makedirs(split_output_dir, exist_ok=True) |
|
|
| json_output_path = os.path.join(split_output_dir, 'nifti_mappings.json') |
| |
| with open(json_output_path, 'w') as json_file: |
| json.dump({}, json_file) |
|
|
| image_files = sorted(glob.glob(os.path.join(image_dir, '*.nii.gz'))) |
| print(f"\nProcessing {split} split: {len(image_files)} subjects -> {split_output_dir}") |
|
|
| for image_path in tqdm(image_files, desc=f"Processing {split}"): |
| filename = os.path.basename(image_path) |
| subject_id = filename.replace('.nii.gz', '') |
|
|
| try: |
| |
| sitk_img = sitk.ReadImage(image_path) |
| original_size = list(sitk_img.GetSize()) |
| original_spacing = list(sitk_img.GetSpacing()) |
|
|
| |
| sitk_img_iso = resample_to_isotropic(sitk_img, TARGET_SPACING, sitk.sitkLinear) |
| resampled_size = list(sitk_img_iso.GetSize()) |
| resampled_spacing = list(sitk_img_iso.GetSpacing()) |
|
|
| |
| meta = meta_data() |
| meta.add_keyvalue('Modality', 'MRI') |
| meta.add_keyvalue('OriImg_path', image_path) |
| meta.add_keyvalue('Spacing_mm', min(resampled_spacing)) |
| meta.add_keyvalue('Size', resampled_size) |
| meta.add_keyvalue('Dataset_name', 'OAI_ZIB') |
| meta.add_keyvalue('ROI', 'leg') |
| meta.add_keyvalue('Label_Dict', LABEL_DICT) |
|
|
| |
| output_subject_dir = os.path.join(split_output_dir, subject_id) |
| output_image_file = os.path.join(output_subject_dir, f"{subject_id}.nii.gz") |
|
|
| |
| util.save_nifti(sitk_img_iso, output_image_file, image_path) |
|
|
| |
| label_path = os.path.join(label_dir, filename) |
| if os.path.isfile(label_path): |
| sitk_lbl = sitk.ReadImage(label_path) |
| sitk_lbl_iso = resample_to_isotropic(sitk_lbl, TARGET_SPACING, sitk.sitkNearestNeighbor) |
| process_label_dir = os.path.join(output_subject_dir, 'segmentation') |
| processed_lbl_path = os.path.join(process_label_dir, f"{subject_id}.nii.gz") |
| os.makedirs(process_label_dir, exist_ok=True) |
| util.save_nifti(sitk_lbl_iso, processed_lbl_path, label_path) |
|
|
| label_path_dict = {'knee': processed_lbl_path} |
| meta.add_keyvalue('Task', TASK_VALUE) |
| meta.add_keyvalue('Label_path', {TASK_VALUE: label_path_dict}) |
|
|
| print(f" {subject_id}: {original_size} @ {[f'{s:.3f}' for s in original_spacing]} -> {resampled_size} @ {[f'{s:.3f}' for s in resampled_spacing]}") |
|
|
| |
| extra_info = { |
| 'split': split, |
| 'Image_id': subject_id, |
| 'nonimaging_dir': nonimaging_dir, |
| } |
| subject_meta = get_subject_metadata(subject_id, tables) |
| extra_info.update(subject_meta) |
|
|
| meta.add_extra_keyvalue('Metadata', extra_info) |
|
|
| |
| with open(json_output_path, 'r+') as json_file: |
| existing_mappings = json.load(json_file) |
| existing_mappings[output_image_file] = meta.get_meta_data() |
| json_file.seek(0) |
| json.dump(existing_mappings, json_file, indent=4) |
| json_file.truncate() |
|
|
| except Exception as e: |
| print(f" Failed {subject_id}: {e}") |
| failed_files.append(subject_id) |
| continue |
|
|
| |
| failed_files_path = os.path.join(output_dir, 'failed_files.json') |
| with open(failed_files_path, "w") as json_file: |
| json.dump(failed_files, json_file) |
|
|
| print(f"\nDone. Failed files ({len(failed_files)}): {failed_files_path}") |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser(description="Process OAI-ZIB dataset and save as processed NIfTI with mappings.") |
| parser.add_argument("--target_path", type=str, |
| default="/home/dn-zhen2/rds/rds-airr-p51-TWhPgQVLKbA/Data/DATASETS/OAI_ZIB", |
| help="Path to raw OAI-ZIB dataset directory.") |
| parser.add_argument("--output_dir", type=str, |
| default="/home/dn-zhen2/rds/rds-airr-p51-TWhPgQVLKbA/Data/Omini3D/DATASETS_processed/OAI_ZIB", |
| help="Directory to save processed NIfTI files and mappings.") |
| args = parser.parse_args() |
| print(f"Input: {args.target_path}") |
| print(f"Output: {args.output_dir}") |
| main(args.target_path, args.output_dir) |
|
|