"""Aggregate per-rank JSONL eval outputs using the OFFICIAL ActivityForensics metric: compute_AP_AR (mAP@tIoU [0.5..0.95] + AR@{10,20,50,100}). Our generative model emits a single per video, so per-segment scores are all uniform (1.0). This is a fair starting point; for full AR@N coverage we would need multi-sampling, noted as a limitation. """ import argparse import glob import json import os import sys from collections import defaultdict import numpy as np # Use the official metric implementation. Bypass libs/utils/__init__.py # (which imports an unbuilt C++ NMS extension we don't need) by pre-registering # fake parent modules in sys.modules so the Evaluation files load cleanly. import importlib.util import types sys.path.insert(0, "/mnt/local-fast/zhangt/activityforensics") # Register fake `libs`, `libs.utils`, `libs.utils.Evaluation` as empty packages # so eval modules' relative-style imports succeed without triggering nms. for pkg_name in ["libs", "libs.utils", "libs.utils.Evaluation"]: if pkg_name not in sys.modules: sys.modules[pkg_name] = types.ModuleType(pkg_name) sys.modules[pkg_name].__path__ = [] # mark as package _eval_root = "/mnt/local-fast/zhangt/activityforensics/libs/utils/Evaluation" for mod_name in ["utils", "eval_detection", "eval_proposal"]: spec = importlib.util.spec_from_file_location( f"libs.utils.Evaluation.{mod_name}", os.path.join(_eval_root, f"{mod_name}.py")) mod = importlib.util.module_from_spec(spec) sys.modules[spec.name] = mod spec.loader.exec_module(mod) # Expose at-package-level for detect_eval's import statement _eval_pkg = sys.modules["libs.utils.Evaluation"] _eval_pkg.compute_average_precision_detection = sys.modules["libs.utils.Evaluation.eval_detection"].compute_average_precision_detection _eval_pkg.average_recall_vs_avg_nr_proposals = sys.modules["libs.utils.Evaluation.eval_proposal"].average_recall_vs_avg_nr_proposals spec = importlib.util.spec_from_file_location( "libs.utils.detect_eval", "/mnt/local-fast/zhangt/activityforensics/libs/utils/detect_eval.py") detect_eval = importlib.util.module_from_spec(spec) sys.modules[spec.name] = detect_eval spec.loader.exec_module(detect_eval) compute_AP_AR = detect_eval.compute_AP_AR def load_records(out_dir): rows = [] for path in sorted(glob.glob(os.path.join(out_dir, "rank_*.jsonl"))): with open(path) as f: for line in f: rows.append(json.loads(line)) return rows def evaluate_subset(rows, label): """Run compute_AP_AR on a subset of records.""" pred_time, gt_time, scores = [], [], [] for r in rows: pred = r.get("pred", []) gt = r.get("gt", []) if len(pred) == 0: # still need to record GT for recall denominators; pred can be empty pred_time.append(np.zeros((0, 2))) scores.append(np.zeros((0,))) else: p = np.asarray(pred, dtype=np.float32).reshape(-1, 2) pred_time.append(p) scores.append(np.ones(len(p), dtype=np.float32)) # uniform = 1.0 gt_time.append(np.asarray(gt, dtype=np.float32).reshape(-1, 2)) out = compute_AP_AR( pred_time, gt_time, scores, iou_thresholds_ap=np.linspace(0.5, 0.95, 10), iou_thresholds_ar=np.linspace(0.5, 0.95, 10), ar_points=(1, 5, 10), subset=label, max_avg_nr_proposals=100, ) return out def main(): p = argparse.ArgumentParser() p.add_argument("--out_dir", required=True) args = p.parse_args() rows = load_records(args.out_dir) if not rows: print(f"NO RESULTS in {args.out_dir}") return print(f"=== {len(rows)} evaluated videos (official ActivityForensics metrics) ===\n") # Overall print("=" * 70) print("OVERALL") print("=" * 70) res = evaluate_subset(rows, "all") # Print mAP@tIoU row map_row = " ".join([f"mAP@{t:.2f}={res.get(f'mAP@{str(round(t,2)).rstrip(chr(48)).rstrip(chr(46))}', float('nan')):.3f}" for t in np.linspace(0.5, 0.95, 10)]) print(f"average-mAP = {res['mAP']:.4f}") print(f" per-tIoU : {map_row}") print(f"average-AR = {res['mAR']:.4f}") ar_str = " ".join([f"{k}={v:.4f}" for k, v in res.items() if k.startswith("AR@")]) print(f" per-N : {ar_str}") print() # Per-generator by_gen = defaultdict(list) for r in rows: by_gen[r["generator"]].append(r) print("=" * 70) print("PER-GENERATOR") print("=" * 70) print(f"{'gen':<12} {'n':>4} {'mAP':>8} {'mAP@.5':>8} {'mAP@.75':>9} {'mAP@.95':>9} {'AR@1':>8} {'AR@10':>8}") gen_results = {} for g in sorted(by_gen.keys()): rs = by_gen[g] r = evaluate_subset(rs, g) gen_results[g] = r print( f" {g:<10} {len(rs):>4} " f"{r['mAP']:>8.4f} " f"{r.get('mAP@0.5', float('nan')):>8.4f} " f"{r.get('mAP@0.75', float('nan')):>9.4f} " f"{r.get('mAP@0.95', float('nan')):>9.4f} " f"{r.get('AR@1', float('nan')):>8.4f} " f"{r.get('AR@10', float('nan')):>8.4f}" ) # Save summary = { "n": len(rows), "overall": res, "per_generator": gen_results, "note": ( "Predictions assigned uniform score=1.0 since the generative model " "emits a single answer per video. AR@N is lower-bounded by the " "single-shot proposal count (~1-3 per video)." ), } out_path = os.path.join(args.out_dir, "summary_official.json") with open(out_path, "w") as f: json.dump(summary, f, indent=2) print(f"\nsaved {out_path}") if __name__ == "__main__": main()