"""Grounding-style metrics for multi-target temporal localisation. Single-shot generative VLMs emit a small set of (start, end) intervals per video. Detection-style metrics (mAP, AR@N) heavily penalise this regime because they assume 100+ ranked proposals per video. Grounding-style metrics are designed for the few-output setting and are standard in temporal-grounding literature (Charades-STA, ActivityNet Captions, TempSamp-R1). Metrics reported: - mIoU: mean over (video, GT-segment) of best-match IoU(pred, GT) i.e. average per-GT recall using IoU as the score. - F1@tIoU=τ: Hungarian 1-to-1 matching between preds and GTs, count a match as TP iff its IoU > τ, then compute F1 = 2·P·R/(P+R). Reported for τ in {0.3, 0.5, 0.7, 0.85, 0.95}. - mean_F1@τ: average F1 over τ in {0.5, 0.75, 0.85, 0.95} (AF strict). - R@K @τ: "of all GT segments in the dataset, what fraction is recovered by at least one of the model's top-K predictions @ IoU > τ". K in {1, 3, 5}; τ in {0.3, 0.5, 0.7}. Reads the same per-rank JSONL files as evaluate_forensics_aggregate.py. """ import argparse import glob import json import os from collections import defaultdict import numpy as np from scipy.optimize import linear_sum_assignment # --------------------------------------------------------------------------- # IoU primitives # --------------------------------------------------------------------------- def iou_1d(a, b): s1, e1 = a; s2, e2 = b inter = max(0.0, min(e1, e2) - max(s1, s2)) union = max(e1, e2) - min(s1, s2) return inter / union if union > 0 else 0.0 def iou_matrix(preds, gts): """Return (len(preds), len(gts)) IoU matrix.""" M = np.zeros((len(preds), len(gts))) for i, p in enumerate(preds): for j, g in enumerate(gts): M[i, j] = iou_1d(p, g) return M def hungarian_f1_at_tiou(preds, gts, tau): """1-to-1 Hungarian match preds↔gts; count TP iff IoU>τ. Compute F1.""" if not preds or not gts: return 0.0 M = iou_matrix(preds, gts) # maximise IoU: minimise -IoU pi, gi = linear_sum_assignment(-M) tp = int((M[pi, gi] > tau).sum()) p = tp / len(preds) r = tp / len(gts) return (2 * p * r / (p + r)) if (p + r) > 0 else 0.0 def per_gt_best_iou(preds, gts): """For each GT, return max IoU(pred, gt) across pred set (0 if no preds).""" if not preds: return [0.0] * len(gts) M = iou_matrix(preds, gts) return list(M.max(axis=0)) # (num_gt,) # --------------------------------------------------------------------------- # Aggregation # --------------------------------------------------------------------------- def evaluate(rows): F1_TAUS = [0.3, 0.5, 0.7, 0.85, 0.95] AF_STRICT = [0.5, 0.75, 0.85, 0.95] R_TAUS = [0.3, 0.5, 0.7] R_KS = [1, 3, 5] mean_iou_per_video = [] f1_at = {t: [] for t in F1_TAUS} af_strict = [] # R@K @τ counts at GT-instance level gt_total = 0 recovered = {(k, t): 0 for k in R_KS for t in R_TAUS} for r in rows: pred = [tuple(x) for x in r["pred"]] gt = [tuple(x) for x in r["gt"]] # mIoU per video = mean over GT of best-match IoU if gt: bgi = per_gt_best_iou(pred, gt) mean_iou_per_video.append(np.mean(bgi)) # F1@τ for t in F1_TAUS: f1_at[t].append(hungarian_f1_at_tiou(pred, gt, t)) af_strict.append(np.mean([hungarian_f1_at_tiou(pred, gt, t) for t in AF_STRICT])) # R@K @τ if gt: # take top-K preds (no scores → first K in given order) for K in R_KS: preds_k = pred[:K] bgi_k = per_gt_best_iou(preds_k, gt) for t in R_TAUS: recovered[(K, t)] += int(sum(1 for x in bgi_k if x > t)) gt_total += len(gt) out = {} out["mIoU"] = float(np.mean(mean_iou_per_video)) if mean_iou_per_video else 0.0 for t in F1_TAUS: out[f"F1@{t}"] = float(np.mean(f1_at[t])) out["mean_F1_strict_0.5_0.75_0.85_0.95"] = float(np.mean(af_strict)) for (K, t), c in recovered.items(): out[f"R@{K}_IoU{t}"] = c / max(1, gt_total) out["n_videos"] = len(rows) out["n_gt_total"] = gt_total return out def print_block(label, m, indent=""): print(f"{indent}{label}") print(f"{indent} mIoU = {m['mIoU']*100:.2f}%") print(f"{indent} F1@0.3 / F1@0.5 / F1@0.7 = {m['F1@0.3']*100:.2f}% / {m['F1@0.5']*100:.2f}% / {m['F1@0.7']*100:.2f}%") print(f"{indent} F1@0.85 / F1@0.95 = {m['F1@0.85']*100:.2f}% / {m['F1@0.95']*100:.2f}%") print(f"{indent} mean F1@strict (AF τ=0.5/0.75/0.85/0.95) = {m['mean_F1_strict_0.5_0.75_0.85_0.95']*100:.2f}%") print(f"{indent} R@1 IoU>0.5 / IoU>0.7 = {m['R@1_IoU0.5']*100:.2f}% / {m['R@1_IoU0.7']*100:.2f}%") print(f"{indent} R@5 IoU>0.5 / IoU>0.7 = {m['R@5_IoU0.5']*100:.2f}% / {m['R@5_IoU0.7']*100:.2f}%") def main(): p = argparse.ArgumentParser() p.add_argument("--out_dir", required=True) args = p.parse_args() rows = [] for path in sorted(glob.glob(os.path.join(args.out_dir, "rank_*.jsonl"))): with open(path) as f: for line in f: rows.append(json.loads(line)) if not rows: print("no records found"); return print(f"=== {len(rows)} videos — grounding-style metrics ===\n") print_block("OVERALL", evaluate(rows)) print() by_gen = defaultdict(list) for r in rows: by_gen[r["generator"]].append(r) print(f"{'PER-GENERATOR':<70}") print(f"{'gen':<10} {'n':>4} {'mIoU':>7} {'F1@.5':>7} {'F1@.7':>7} {'F1@.85':>7} {'F1@.95':>7} {'R@1@.5':>8} {'R@1@.7':>8}") for g in sorted(by_gen.keys()): m = evaluate(by_gen[g]) print( f" {g:<8} {len(by_gen[g]):>4} " f"{m['mIoU']*100:>6.2f}% " f"{m['F1@0.5']*100:>6.2f}% " f"{m['F1@0.7']*100:>6.2f}% " f"{m['F1@0.85']*100:>6.2f}% " f"{m['F1@0.95']*100:>6.2f}% " f"{m['R@1_IoU0.5']*100:>7.2f}% " f"{m['R@1_IoU0.7']*100:>7.2f}%" ) # save out = {"n_videos": len(rows), "overall": evaluate(rows), "per_generator": {g: evaluate(by_gen[g]) for g in by_gen}} with open(os.path.join(args.out_dir, "summary_grounding.json"), "w") as f: json.dump(out, f, indent=2) print(f"\nsaved {os.path.join(args.out_dir, 'summary_grounding.json')}") if __name__ == "__main__": main()