forensics-grpo / code /evaluate_forensics_official.py
sdzt's picture
Add source code
33569f9 verified
Raw
History Blame Contribute Delete
5.77 kB
"""Aggregate per-rank JSONL eval outputs using the OFFICIAL ActivityForensics
metric: compute_AP_AR (mAP@tIoU [0.5..0.95] + AR@{10,20,50,100}).
Our generative model emits a single <answer> per video, so per-segment scores
are all uniform (1.0). This is a fair starting point; for full AR@N coverage
we would need multi-sampling, noted as a limitation.
"""
import argparse
import glob
import json
import os
import sys
from collections import defaultdict
import numpy as np
# Use the official metric implementation. Bypass libs/utils/__init__.py
# (which imports an unbuilt C++ NMS extension we don't need) by pre-registering
# fake parent modules in sys.modules so the Evaluation files load cleanly.
import importlib.util
import types
sys.path.insert(0, "/mnt/local-fast/zhangt/activityforensics")
# Register fake `libs`, `libs.utils`, `libs.utils.Evaluation` as empty packages
# so eval modules' relative-style imports succeed without triggering nms.
for pkg_name in ["libs", "libs.utils", "libs.utils.Evaluation"]:
if pkg_name not in sys.modules:
sys.modules[pkg_name] = types.ModuleType(pkg_name)
sys.modules[pkg_name].__path__ = [] # mark as package
_eval_root = "/mnt/local-fast/zhangt/activityforensics/libs/utils/Evaluation"
for mod_name in ["utils", "eval_detection", "eval_proposal"]:
spec = importlib.util.spec_from_file_location(
f"libs.utils.Evaluation.{mod_name}", os.path.join(_eval_root, f"{mod_name}.py"))
mod = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = mod
spec.loader.exec_module(mod)
# Expose at-package-level for detect_eval's import statement
_eval_pkg = sys.modules["libs.utils.Evaluation"]
_eval_pkg.compute_average_precision_detection = sys.modules["libs.utils.Evaluation.eval_detection"].compute_average_precision_detection
_eval_pkg.average_recall_vs_avg_nr_proposals = sys.modules["libs.utils.Evaluation.eval_proposal"].average_recall_vs_avg_nr_proposals
spec = importlib.util.spec_from_file_location(
"libs.utils.detect_eval",
"/mnt/local-fast/zhangt/activityforensics/libs/utils/detect_eval.py")
detect_eval = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = detect_eval
spec.loader.exec_module(detect_eval)
compute_AP_AR = detect_eval.compute_AP_AR
def load_records(out_dir):
rows = []
for path in sorted(glob.glob(os.path.join(out_dir, "rank_*.jsonl"))):
with open(path) as f:
for line in f:
rows.append(json.loads(line))
return rows
def evaluate_subset(rows, label):
"""Run compute_AP_AR on a subset of records."""
pred_time, gt_time, scores = [], [], []
for r in rows:
pred = r.get("pred", [])
gt = r.get("gt", [])
if len(pred) == 0:
# still need to record GT for recall denominators; pred can be empty
pred_time.append(np.zeros((0, 2)))
scores.append(np.zeros((0,)))
else:
p = np.asarray(pred, dtype=np.float32).reshape(-1, 2)
pred_time.append(p)
scores.append(np.ones(len(p), dtype=np.float32)) # uniform = 1.0
gt_time.append(np.asarray(gt, dtype=np.float32).reshape(-1, 2))
out = compute_AP_AR(
pred_time, gt_time, scores,
iou_thresholds_ap=np.linspace(0.5, 0.95, 10),
iou_thresholds_ar=np.linspace(0.5, 0.95, 10),
ar_points=(1, 5, 10),
subset=label,
max_avg_nr_proposals=100,
)
return out
def main():
p = argparse.ArgumentParser()
p.add_argument("--out_dir", required=True)
args = p.parse_args()
rows = load_records(args.out_dir)
if not rows:
print(f"NO RESULTS in {args.out_dir}")
return
print(f"=== {len(rows)} evaluated videos (official ActivityForensics metrics) ===\n")
# Overall
print("=" * 70)
print("OVERALL")
print("=" * 70)
res = evaluate_subset(rows, "all")
# Print mAP@tIoU row
map_row = " ".join([f"mAP@{t:.2f}={res.get(f'mAP@{str(round(t,2)).rstrip(chr(48)).rstrip(chr(46))}', float('nan')):.3f}"
for t in np.linspace(0.5, 0.95, 10)])
print(f"average-mAP = {res['mAP']:.4f}")
print(f" per-tIoU : {map_row}")
print(f"average-AR = {res['mAR']:.4f}")
ar_str = " ".join([f"{k}={v:.4f}" for k, v in res.items() if k.startswith("AR@")])
print(f" per-N : {ar_str}")
print()
# Per-generator
by_gen = defaultdict(list)
for r in rows:
by_gen[r["generator"]].append(r)
print("=" * 70)
print("PER-GENERATOR")
print("=" * 70)
print(f"{'gen':<12} {'n':>4} {'mAP':>8} {'mAP@.5':>8} {'mAP@.75':>9} {'mAP@.95':>9} {'AR@1':>8} {'AR@10':>8}")
gen_results = {}
for g in sorted(by_gen.keys()):
rs = by_gen[g]
r = evaluate_subset(rs, g)
gen_results[g] = r
print(
f" {g:<10} {len(rs):>4} "
f"{r['mAP']:>8.4f} "
f"{r.get('mAP@0.5', float('nan')):>8.4f} "
f"{r.get('mAP@0.75', float('nan')):>9.4f} "
f"{r.get('mAP@0.95', float('nan')):>9.4f} "
f"{r.get('AR@1', float('nan')):>8.4f} "
f"{r.get('AR@10', float('nan')):>8.4f}"
)
# Save
summary = {
"n": len(rows),
"overall": res,
"per_generator": gen_results,
"note": (
"Predictions assigned uniform score=1.0 since the generative model "
"emits a single answer per video. AR@N is lower-bounded by the "
"single-shot proposal count (~1-3 per video)."
),
}
out_path = os.path.join(args.out_dir, "summary_official.json")
with open(out_path, "w") as f:
json.dump(summary, f, indent=2)
print(f"\nsaved {out_path}")
if __name__ == "__main__":
main()