import argparse import os import sys import numpy as np import pandas as pd from PIL import Image from tqdm import tqdm import glob from scipy.spatial.distance import directed_hausdorff # Add project root to path for custom module import sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from guided_diffusion.custom_lidc_dataset import CustomLIDCDataset # --- METRIC IMPLEMENTATIONS --- def dice_coefficient(pred, target): smooth = 1e-6 pred, target = pred.astype(bool), target.astype(bool) if not np.any(pred) and not np.any(target): return 1.0 intersection = np.sum(pred & target) return (2. * intersection + smooth) / (np.sum(pred) + np.sum(target) + smooth) def iou_score(pred, target): smooth = 1e-6 pred, target = pred.astype(bool), target.astype(bool) if not np.any(pred) and not np.any(target): return 1.0 intersection = np.sum(pred & target) union = np.sum(pred | target) return (intersection + smooth) / (union + smooth) def hausdorff_distance(pred, target): pred_points, target_points = np.argwhere(pred), np.argwhere(target) if len(pred_points) == 0 or len(target_points) == 0: return np.nan return max(directed_hausdorff(pred_points, target_points)[0], directed_hausdorff(target_points, pred_points)[0]) def calculate_combined_sensitivity(samples, gts): combined_sample = np.logical_or.reduce([s.astype(bool) for s in samples]) combined_gt = np.logical_or.reduce([g.astype(bool) for g in gts]) if not np.any(combined_gt): return 1.0 tp = np.sum(combined_sample & combined_gt) return tp / np.sum(combined_gt) def calculate_d_max(samples, gts): max_dice_scores = [np.max([dice_coefficient(s, gt) for s in samples]) if samples else 0.0 for gt in gts] return np.mean(max_dice_scores) if max_dice_scores else 0.0 def calculate_diversity_agreement(samples, gts): def get_variances(masks): if len(masks) < 2: return 0, 0 scores = [1.0 - dice_coefficient(masks[i], masks[j]) for i in range(len(masks)) for j in range(i + 1, len(masks))] return np.min(scores) if scores else 0, np.max(scores) if scores else 0 V_min_gt, V_max_gt = get_variances(gts) V_min_sample, V_max_sample = get_variances(samples) return 1.0 - (abs(V_min_gt - V_min_sample) + abs(V_max_gt - V_max_sample)) / 2.0 def calculate_ci_score(samples, gts): Sc = calculate_combined_sensitivity(samples, gts) Dmax = calculate_d_max(samples, gts) Da = calculate_diversity_agreement(samples, gts) denominator = Sc + Dmax + Da ci = (3 * Sc * Dmax * Da) / (denominator + 1e-8) return ci, Sc, Dmax, Da def calculate_ged(samples, gts): dist_fn = lambda x, y: 1.0 - iou_score(x, y) d_st = np.mean([dist_fn(s, g) for s in samples for g in gts]) d_ss = np.mean([dist_fn(samples[i], samples[j]) for i in range(len(samples)) for j in range(i + 1, len(samples))]) if len(samples) > 1 else 0 d_gg = np.mean([dist_fn(gts[i], gts[j]) for i in range(len(gts)) for j in range(i + 1, len(gts))]) if len(gts) > 1 else 0 return 2 * d_st - d_ss - d_gg def load_mask(path): with Image.open(path) as img: return np.array(img.convert("L")) > 127 def main(): parser = argparse.ArgumentParser(description="Evaluate ambiguous segmentation samples.") parser.add_argument("--samples_dir", type=str, required=True, help="Directory with saved sample masks.") parser.add_argument("--gt_data_dir", type=str, required=True, help="Path to the root of the LIDC data directory.") parser.add_argument("--dataset_type", type=str, default="lidc", choices=["lidc", "multiannotator"], help="Ground-truth dataset layout.") parser.add_argument("--split_strategy", type=str, default="all_annotations", help="Split strategy for multiannotator dataset.") parser.add_argument("--image_size", type=int, default=128, help="Image size used during training/sampling.") parser.add_argument("--results_file", type=str, default="evaluation_results.csv", help="Path to save the output CSV file.") args = parser.parse_args() gt_dataset = CustomLIDCDataset( data_root=args.gt_data_dir, split="test", image_size=args.image_size, dataset_type=args.dataset_type, split_strategy=args.split_strategy, ) all_results = [] print(f"Evaluating {len(gt_dataset)} images...") for i in tqdm(range(len(gt_dataset))): _, gts_tensor, image_id = gt_dataset[i] image_id = str(image_id) sample_paths = sorted(glob.glob(os.path.join(args.samples_dir, f"{image_id}_sample_*.png"))) if not sample_paths: print(f"Warning: No samples found for {image_id}. Skipping.") continue samples = [load_mask(p) for p in sample_paths] gts = [gt.numpy() for gt in gts_tensor] ci_score, sc, dmax, da = calculate_ci_score(samples, gts) ged = calculate_ged(samples, gts) all_dice = [dice_coefficient(s, g) for s in samples for g in gts] all_iou = [iou_score(s, g) for s in samples for g in gts] all_hd = [d for d in [hausdorff_distance(s,g) for s in samples for g in gts] if not np.isnan(d)] all_results.append({ "image_id": image_id, "CI_Score": ci_score, "Combined_Sensitivity": sc, "D_max": dmax, "Diversity_Agreement": da, "GED": ged, "Avg_Dice": np.mean(all_dice) if all_dice else 0, "Avg_IoU": np.mean(all_iou) if all_iou else 0, "Avg_Hausdorff": np.mean(all_hd) if all_hd else np.nan, }) if not all_results: print("No results generated. Check paths and filenames.") return df = pd.DataFrame(all_results) avg_row = df.mean(numeric_only=True).to_frame().T avg_row['image_id'] = 'AVERAGE' df_final = pd.concat([df, avg_row], ignore_index=True) df_final.to_csv(args.results_file, index=False, float_format='%.4f') print(f"\nEvaluation complete. Results saved to {args.results_file}") print("\n--- Averages ---") print(avg_row.to_string(index=False, float_format='%.4f')) if __name__ == "__main__": main()