#!/usr/bin/env python3 """Evaluate Qwen2.5-VL on VideoHallucer + EventHallusion via vLLM. VideoHallucer : paired basic/hallucinated yes-no, regex word match, final_hit = basic AND halluc. EventHallusion : binary yes-no across entire/mix/misleading, startswith('yes'/'no') match. Frames: 32 per video. Generation: greedy, max_new_tokens=16. """ from __future__ import annotations import argparse import hashlib import json import os import re import sys import time from collections import defaultdict from pathlib import Path from typing import Any def _stable_shard(key: str, num_shards: int) -> int: """Deterministic shard assignment across processes (avoids PYTHONHASHSEED nondeterminism).""" h = hashlib.md5(key.encode("utf-8")).digest() return int.from_bytes(h[:8], "big") % num_shards # vLLM workers must use 'spawn' since qwen_vl_utils imports torch in the parent. os.environ.setdefault("VLLM_WORKER_MULTIPROC_METHOD", "spawn") import numpy as np import torch from transformers import AutoProcessor from vllm import LLM, SamplingParams # Constrain per-frame patches so 32 frames + text fit comfortably in max_model_len. # 128 * 28 * 28 ≈ 100k pixels per frame -> ~128 vision tokens / frame -> ~4096 video tokens. VIDEO_MAX_PIXELS = 128 * 28 * 28 VIDEO_MIN_PIXELS = 16 * 28 * 28 NFRAMES = 32 # ---------- VideoHallucer ---------- VH_CATEGORIES = [ "object_relation", "temporal", "semantic_detail", "external_factual", "external_nonfactual", ] VH_PROMPT_SUFFIX = "\nAnswer the question using 'yes' or 'no'." def load_videohallucer(root: Path) -> list[dict[str, Any]]: """Flatten paired questions into a list of (pair_id, side, video, question, gold).""" items: list[dict[str, Any]] = [] for cat in VH_CATEGORIES: json_path = root / cat / f"{cat}.json" if not json_path.exists(): print(f"[WARN] missing {json_path}", file=sys.stderr) continue with open(json_path) as f: data = json.load(f) video_dir = root / cat / "videos" for idx, pair in enumerate(data): pair_id = f"{cat}/{idx}" for side in ("basic", "hallucination"): q = pair[side] items.append({ "bench": "videohallucer", "category": cat, "pair_id": pair_id, "side": side, "video": str(video_dir / q["video"]), "question": q["question"], "answer": q["answer"].strip().lower(), }) return items def vh_extract(pred: str, gold: str) -> bool: return bool(re.search(r"\b" + re.escape(gold) + r"\b", pred, re.IGNORECASE)) def vh_score(rows: list[dict[str, Any]]) -> dict[str, Any]: """Reproduce evaluation_utils.py: per-category {basic_acc, halluc_acc, pair_acc}.""" by_pair: dict[str, dict[str, dict[str, Any]]] = defaultdict(dict) for r in rows: by_pair[r["pair_id"]][r["side"]] = r per_cat: dict[str, dict[str, int]] = defaultdict(lambda: {"n": 0, "basic_hit": 0, "halluc_hit": 0, "pair_hit": 0}) for pair_id, sides in by_pair.items(): cat = pair_id.split("/")[0] b = sides.get("basic") h = sides.get("hallucination") if b is None or h is None: continue b_hit = vh_extract(b["prediction"], b["answer"]) h_hit = vh_extract(h["prediction"], h["answer"]) c = per_cat[cat] c["n"] += 1 c["basic_hit"] += int(b_hit) c["halluc_hit"] += int(h_hit) c["pair_hit"] += int(b_hit and h_hit) summary: dict[str, Any] = {} cat_pair_accs = [] for cat in VH_CATEGORIES: if cat not in per_cat: continue c = per_cat[cat] n = max(c["n"], 1) summary[cat] = { "n_pairs": c["n"], "basic_acc": c["basic_hit"] / n, "halluc_acc": c["halluc_hit"] / n, "pair_acc": c["pair_hit"] / n, } cat_pair_accs.append(c["pair_hit"] / n) summary["overall"] = { "pair_acc_macro": sum(cat_pair_accs) / max(len(cat_pair_accs), 1), "n_categories": len(cat_pair_accs), } return summary # ---------- EventHallusion ---------- EH_SPLITS = ["entire", "mix", "misleading"] # The released video zip stores the "mix" split as "interleave" — IDs map mix_NNN <-> interleave_NNN. EH_FOLDER = {"entire": "entire", "mix": "interleave", "misleading": "misleading"} EH_FILE_PREFIX = {"entire": "entire", "mix": "interleave", "misleading": "misleading"} EH_PROMPT_SUFFIX = "\nPlease answer yes or no:" def load_eventhallusion(repo_root: Path, video_root: Path) -> list[dict[str, Any]]: items: list[dict[str, Any]] = [] for split in EH_SPLITS: json_path = repo_root / "questions" / f"{split}_questions.json" with open(json_path) as f: data = json.load(f) for vid_entry in data: vid_id = vid_entry["id"] num = vid_id.split("_", 1)[1] # e.g. "001" video_file = video_root / EH_FOLDER[split] / f"{EH_FILE_PREFIX[split]}_{num}.mp4" for q_idx, q in enumerate(vid_entry["questions"]): items.append({ "bench": "eventhallusion", "split": split, "video_id": vid_id, "q_idx": q_idx, "video": str(video_file), "question": q["question"], "answer": q["answer"].strip(), # "Yes." / "No." }) return items def eh_extract(pred: str) -> str | None: p = pred.strip().lower() if p.startswith("yes"): return "Yes." if p.startswith("no"): return "No." return None def eh_score(rows: list[dict[str, Any]]) -> dict[str, Any]: per_split: dict[str, dict[str, int]] = defaultdict(lambda: {"n": 0, "correct": 0, "no_match": 0}) for r in rows: per_split[r["split"]]["n"] += 1 pred_norm = eh_extract(r["prediction"]) if pred_norm is None: per_split[r["split"]]["no_match"] += 1 if pred_norm == r["answer"]: per_split[r["split"]]["correct"] += 1 summary: dict[str, Any] = {} total_n = total_c = 0 for split in EH_SPLITS: s = per_split.get(split) if not s: continue summary[split] = {"n": s["n"], "correct": s["correct"], "acc": s["correct"] / max(s["n"], 1), "no_match": s["no_match"]} total_n += s["n"] total_c += s["correct"] summary["overall"] = {"n": total_n, "correct": total_c, "acc": total_c / max(total_n, 1)} return summary # ---------- vLLM driver ---------- def _decode_video_32(path: str) -> tuple[torch.Tensor, float] | None: """Read a video, sample exactly NFRAMES frames (with repetition for short videos), resize. Returns (tensor [T, C, H, W] uint8, sample_fps) or None on failure. """ try: from decord import VideoReader, cpu vr = VideoReader(path, ctx=cpu(0), num_threads=1) total = len(vr) if total < 1: return None try: video_fps = float(vr.get_avg_fps()) except Exception: video_fps = 30.0 # np.linspace allows repeats when total= VIDEO_MIN_PIXELS, and H/W multiples of 28. pix = H * W if pix > VIDEO_MAX_PIXELS: scale = (VIDEO_MAX_PIXELS / pix) ** 0.5 elif pix < VIDEO_MIN_PIXELS: scale = (VIDEO_MIN_PIXELS / pix) ** 0.5 else: scale = 1.0 new_h = max(28, int(round(H * scale / 28)) * 28) new_w = max(28, int(round(W * scale / 28)) * 28) if (new_h, new_w) != (H, W): import torchvision.transforms.functional as TF t = TF.resize(t, [new_h, new_w], antialias=True) t = t.to(torch.uint8) # Sample fps for temporal positional encoding (matches Qwen2.5-VL convention: nframes / total * orig_fps). sample_fps = NFRAMES / max(total, 1) * video_fps return t, float(sample_fps) def _decode_one(idx_path_q_suffix): """Worker: decode one video to a Qwen2.5-VL-ready tensor.""" idx, video_path, _question, _suffix = idx_path_q_suffix if not os.path.exists(video_path): return idx, None, "missing" out = _decode_video_32(video_path) if out is None: return idx, None, "decode_error" video_tensor, sample_fps = out return idx, (video_tensor, sample_fps), None def build_inputs(items: list[dict[str, Any]], processor, suffix: str, max_workers: int = 32): """Parallel-decode all videos via ThreadPoolExecutor (decord releases the GIL).""" from concurrent.futures import ThreadPoolExecutor, as_completed args = [(i, it["video"], it["question"], suffix) for i, it in enumerate(items)] results: dict[int, Any] = {} missing = 0 done = 0 with ThreadPoolExecutor(max_workers=max_workers) as ex: futs = [ex.submit(_decode_one, a) for a in args] for fut in as_completed(futs): idx, payload, err = fut.result() done += 1 if done % 200 == 0: print(f"[INFO] decoded {done}/{len(args)}", file=sys.stderr, flush=True) if err: missing += 1 if err == "decode_error": print(f"[WARN] decode failed: {items[idx]['video']}", file=sys.stderr) continue video_tensor, sample_fps = payload items[idx]["nframes_used"] = NFRAMES results[idx] = (video_tensor, sample_fps) # Build vLLM inputs in original order. inputs = [] kept = [] for i, it in enumerate(items): if i not in results: continue video_tensor, sample_fps = results[i] messages = [{ "role": "user", "content": [ {"type": "video", "video": it["video"]}, {"type": "text", "text": it["question"] + suffix}, ], }] prompt = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs.append({ "prompt": prompt, "multi_modal_data": {"video": video_tensor}, "mm_processor_kwargs": {"fps": sample_fps}, }) kept.append(i) if missing: print(f"[INFO] {missing} items skipped (missing video or decode error)", file=sys.stderr) return inputs, kept def main(): ap = argparse.ArgumentParser() ap.add_argument("--model", required=True) ap.add_argument("--bench", choices=["videohallucer", "eventhallusion"], required=True) ap.add_argument("--vh_root", default="/mnt/local-fast/opd_zt/data/benchmarks/VideoHallucer") ap.add_argument("--eh_repo", default="/mnt/local-fast/opd_zt/data/benchmarks/EventHallusion") ap.add_argument("--eh_videos", default="/mnt/local-fast/opd_zt/data/benchmarks/EventHallusion/videos") ap.add_argument("--out", required=True, help="Predictions JSONL output path") ap.add_argument("--summary", required=True, help="Summary JSON path") ap.add_argument("--tp", type=int, default=4) ap.add_argument("--max_model_len", type=int, default=8192) ap.add_argument("--gpu_mem", type=float, default=0.85) ap.add_argument("--max_new_tokens", type=int, default=16) ap.add_argument("--limit", type=int, default=0, help="0=no limit; truncate items for smoke tests") ap.add_argument("--shard", type=int, default=0, help="This worker's shard index (0..num_shards-1)") ap.add_argument("--num_shards", type=int, default=1, help=">1 to data-parallel across N workers") args = ap.parse_args() # Load items. if args.bench == "videohallucer": items = load_videohallucer(Path(args.vh_root)) suffix = VH_PROMPT_SUFFIX scorer = vh_score else: items = load_eventhallusion(Path(args.eh_repo), Path(args.eh_videos)) suffix = EH_PROMPT_SUFFIX scorer = eh_score if args.limit > 0: items = items[: args.limit] if args.num_shards > 1: # Shard by a group key so paired/related items stay together: # VH: pair_id (basic+halluc must be in the same shard for pair_acc) # EH: video_id (questions on same video together for cache efficiency) def _shard_key(it): return it.get("pair_id") or it.get("video_id") or it["video"] items = [it for it in items if _stable_shard(_shard_key(it), args.num_shards) == args.shard] print(f"[INFO] {args.bench}: shard {args.shard}/{args.num_shards}, {len(items)} items") else: print(f"[INFO] {args.bench}: {len(items)} items") # Build processor + inputs. processor = AutoProcessor.from_pretrained(args.model, trust_remote_code=True) inputs, kept = build_inputs(items, processor, suffix) # Spin up vLLM. llm = LLM( model=args.model, tensor_parallel_size=args.tp, max_model_len=args.max_model_len, gpu_memory_utilization=args.gpu_mem, limit_mm_per_prompt={"image": 0, "video": 1}, trust_remote_code=True, enforce_eager=False, dtype="bfloat16", ) sp = SamplingParams(temperature=0.0, top_p=1.0, max_tokens=args.max_new_tokens) t0 = time.time() outputs = llm.generate(inputs, sp) print(f"[INFO] generation: {time.time() - t0:.1f}s for {len(outputs)} prompts") # Attach predictions and write. Path(args.out).parent.mkdir(parents=True, exist_ok=True) with open(args.out, "w") as fo: for k, out in zip(kept, outputs): items[k]["prediction"] = out.outputs[0].text fo.write(json.dumps(items[k]) + "\n") rows = [items[k] for k in kept] summary = scorer(rows) Path(args.summary).parent.mkdir(parents=True, exist_ok=True) with open(args.summary, "w") as fs: json.dump({"bench": args.bench, "model": args.model, "n": len(rows), "metrics": summary}, fs, indent=2) print(json.dumps(summary, indent=2)) if __name__ == "__main__": main()