forensics-grpo / code /evaluate_grounding_metrics.py
sdzt's picture
Add source code
33569f9 verified
Raw
History Blame Contribute Delete
6.7 kB
"""Grounding-style metrics for multi-target temporal localisation.
Single-shot generative VLMs emit a small set of (start, end) intervals per video.
Detection-style metrics (mAP, AR@N) heavily penalise this regime because they
assume 100+ ranked proposals per video. Grounding-style metrics are designed
for the few-output setting and are standard in temporal-grounding literature
(Charades-STA, ActivityNet Captions, TempSamp-R1).
Metrics reported:
- mIoU: mean over (video, GT-segment) of best-match IoU(pred, GT)
i.e. average per-GT recall using IoU as the score.
- F1@tIoU=τ: Hungarian 1-to-1 matching between preds and GTs, count a
match as TP iff its IoU > τ, then compute F1 = 2·P·R/(P+R).
Reported for τ in {0.3, 0.5, 0.7, 0.85, 0.95}.
- mean_F1@τ: average F1 over τ in {0.5, 0.75, 0.85, 0.95} (AF strict).
- R@K @τ: "of all GT segments in the dataset, what fraction is recovered
by at least one of the model's top-K predictions @ IoU > τ".
K in {1, 3, 5}; τ in {0.3, 0.5, 0.7}.
Reads the same per-rank JSONL files as evaluate_forensics_aggregate.py.
"""
import argparse
import glob
import json
import os
from collections import defaultdict
import numpy as np
from scipy.optimize import linear_sum_assignment
# ---------------------------------------------------------------------------
# IoU primitives
# ---------------------------------------------------------------------------
def iou_1d(a, b):
s1, e1 = a; s2, e2 = b
inter = max(0.0, min(e1, e2) - max(s1, s2))
union = max(e1, e2) - min(s1, s2)
return inter / union if union > 0 else 0.0
def iou_matrix(preds, gts):
"""Return (len(preds), len(gts)) IoU matrix."""
M = np.zeros((len(preds), len(gts)))
for i, p in enumerate(preds):
for j, g in enumerate(gts):
M[i, j] = iou_1d(p, g)
return M
def hungarian_f1_at_tiou(preds, gts, tau):
"""1-to-1 Hungarian match preds↔gts; count TP iff IoU>τ. Compute F1."""
if not preds or not gts:
return 0.0
M = iou_matrix(preds, gts)
# maximise IoU: minimise -IoU
pi, gi = linear_sum_assignment(-M)
tp = int((M[pi, gi] > tau).sum())
p = tp / len(preds)
r = tp / len(gts)
return (2 * p * r / (p + r)) if (p + r) > 0 else 0.0
def per_gt_best_iou(preds, gts):
"""For each GT, return max IoU(pred, gt) across pred set (0 if no preds)."""
if not preds:
return [0.0] * len(gts)
M = iou_matrix(preds, gts)
return list(M.max(axis=0)) # (num_gt,)
# ---------------------------------------------------------------------------
# Aggregation
# ---------------------------------------------------------------------------
def evaluate(rows):
F1_TAUS = [0.3, 0.5, 0.7, 0.85, 0.95]
AF_STRICT = [0.5, 0.75, 0.85, 0.95]
R_TAUS = [0.3, 0.5, 0.7]
R_KS = [1, 3, 5]
mean_iou_per_video = []
f1_at = {t: [] for t in F1_TAUS}
af_strict = []
# R@K @τ counts at GT-instance level
gt_total = 0
recovered = {(k, t): 0 for k in R_KS for t in R_TAUS}
for r in rows:
pred = [tuple(x) for x in r["pred"]]
gt = [tuple(x) for x in r["gt"]]
# mIoU per video = mean over GT of best-match IoU
if gt:
bgi = per_gt_best_iou(pred, gt)
mean_iou_per_video.append(np.mean(bgi))
# F1@τ
for t in F1_TAUS:
f1_at[t].append(hungarian_f1_at_tiou(pred, gt, t))
af_strict.append(np.mean([hungarian_f1_at_tiou(pred, gt, t) for t in AF_STRICT]))
# R@K @τ
if gt:
# take top-K preds (no scores → first K in given order)
for K in R_KS:
preds_k = pred[:K]
bgi_k = per_gt_best_iou(preds_k, gt)
for t in R_TAUS:
recovered[(K, t)] += int(sum(1 for x in bgi_k if x > t))
gt_total += len(gt)
out = {}
out["mIoU"] = float(np.mean(mean_iou_per_video)) if mean_iou_per_video else 0.0
for t in F1_TAUS:
out[f"F1@{t}"] = float(np.mean(f1_at[t]))
out["mean_F1_strict_0.5_0.75_0.85_0.95"] = float(np.mean(af_strict))
for (K, t), c in recovered.items():
out[f"R@{K}_IoU{t}"] = c / max(1, gt_total)
out["n_videos"] = len(rows)
out["n_gt_total"] = gt_total
return out
def print_block(label, m, indent=""):
print(f"{indent}{label}")
print(f"{indent} mIoU = {m['mIoU']*100:.2f}%")
print(f"{indent} F1@0.3 / F1@0.5 / F1@0.7 = {m['F1@0.3']*100:.2f}% / {m['F1@0.5']*100:.2f}% / {m['F1@0.7']*100:.2f}%")
print(f"{indent} F1@0.85 / F1@0.95 = {m['F1@0.85']*100:.2f}% / {m['F1@0.95']*100:.2f}%")
print(f"{indent} mean F1@strict (AF τ=0.5/0.75/0.85/0.95) = {m['mean_F1_strict_0.5_0.75_0.85_0.95']*100:.2f}%")
print(f"{indent} R@1 IoU>0.5 / IoU>0.7 = {m['R@1_IoU0.5']*100:.2f}% / {m['R@1_IoU0.7']*100:.2f}%")
print(f"{indent} R@5 IoU>0.5 / IoU>0.7 = {m['R@5_IoU0.5']*100:.2f}% / {m['R@5_IoU0.7']*100:.2f}%")
def main():
p = argparse.ArgumentParser()
p.add_argument("--out_dir", required=True)
args = p.parse_args()
rows = []
for path in sorted(glob.glob(os.path.join(args.out_dir, "rank_*.jsonl"))):
with open(path) as f:
for line in f:
rows.append(json.loads(line))
if not rows:
print("no records found"); return
print(f"=== {len(rows)} videos — grounding-style metrics ===\n")
print_block("OVERALL", evaluate(rows))
print()
by_gen = defaultdict(list)
for r in rows:
by_gen[r["generator"]].append(r)
print(f"{'PER-GENERATOR':<70}")
print(f"{'gen':<10} {'n':>4} {'mIoU':>7} {'F1@.5':>7} {'F1@.7':>7} {'F1@.85':>7} {'F1@.95':>7} {'R@1@.5':>8} {'R@1@.7':>8}")
for g in sorted(by_gen.keys()):
m = evaluate(by_gen[g])
print(
f" {g:<8} {len(by_gen[g]):>4} "
f"{m['mIoU']*100:>6.2f}% "
f"{m['F1@0.5']*100:>6.2f}% "
f"{m['F1@0.7']*100:>6.2f}% "
f"{m['F1@0.85']*100:>6.2f}% "
f"{m['F1@0.95']*100:>6.2f}% "
f"{m['R@1_IoU0.5']*100:>7.2f}% "
f"{m['R@1_IoU0.7']*100:>7.2f}%"
)
# save
out = {"n_videos": len(rows), "overall": evaluate(rows),
"per_generator": {g: evaluate(by_gen[g]) for g in by_gen}}
with open(os.path.join(args.out_dir, "summary_grounding.json"), "w") as f:
json.dump(out, f, indent=2)
print(f"\nsaved {os.path.join(args.out_dir, 'summary_grounding.json')}")
if __name__ == "__main__":
main()