| |
| """Evaluate Qwen2.5-VL on VideoHallucer + EventHallusion via vLLM. |
| |
| VideoHallucer : paired basic/hallucinated yes-no, regex word match, final_hit = basic AND halluc. |
| EventHallusion : binary yes-no across entire/mix/misleading, startswith('yes'/'no') match. |
| |
| Frames: 32 per video. Generation: greedy, max_new_tokens=16. |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import hashlib |
| import json |
| import os |
| import re |
| import sys |
| import time |
| from collections import defaultdict |
| from pathlib import Path |
| from typing import Any |
|
|
|
|
| def _stable_shard(key: str, num_shards: int) -> int: |
| """Deterministic shard assignment across processes (avoids PYTHONHASHSEED nondeterminism).""" |
| h = hashlib.md5(key.encode("utf-8")).digest() |
| return int.from_bytes(h[:8], "big") % num_shards |
|
|
| |
| os.environ.setdefault("VLLM_WORKER_MULTIPROC_METHOD", "spawn") |
|
|
| import numpy as np |
| import torch |
| from transformers import AutoProcessor |
| from vllm import LLM, SamplingParams |
|
|
| |
| |
| VIDEO_MAX_PIXELS = 128 * 28 * 28 |
| VIDEO_MIN_PIXELS = 16 * 28 * 28 |
| NFRAMES = 32 |
|
|
|
|
| |
|
|
| VH_CATEGORIES = [ |
| "object_relation", |
| "temporal", |
| "semantic_detail", |
| "external_factual", |
| "external_nonfactual", |
| ] |
| VH_PROMPT_SUFFIX = "\nAnswer the question using 'yes' or 'no'." |
|
|
|
|
| def load_videohallucer(root: Path) -> list[dict[str, Any]]: |
| """Flatten paired questions into a list of (pair_id, side, video, question, gold).""" |
| items: list[dict[str, Any]] = [] |
| for cat in VH_CATEGORIES: |
| json_path = root / cat / f"{cat}.json" |
| if not json_path.exists(): |
| print(f"[WARN] missing {json_path}", file=sys.stderr) |
| continue |
| with open(json_path) as f: |
| data = json.load(f) |
| video_dir = root / cat / "videos" |
| for idx, pair in enumerate(data): |
| pair_id = f"{cat}/{idx}" |
| for side in ("basic", "hallucination"): |
| q = pair[side] |
| items.append({ |
| "bench": "videohallucer", |
| "category": cat, |
| "pair_id": pair_id, |
| "side": side, |
| "video": str(video_dir / q["video"]), |
| "question": q["question"], |
| "answer": q["answer"].strip().lower(), |
| }) |
| return items |
|
|
|
|
| def vh_extract(pred: str, gold: str) -> bool: |
| return bool(re.search(r"\b" + re.escape(gold) + r"\b", pred, re.IGNORECASE)) |
|
|
|
|
| def vh_score(rows: list[dict[str, Any]]) -> dict[str, Any]: |
| """Reproduce evaluation_utils.py: per-category {basic_acc, halluc_acc, pair_acc}.""" |
| by_pair: dict[str, dict[str, dict[str, Any]]] = defaultdict(dict) |
| for r in rows: |
| by_pair[r["pair_id"]][r["side"]] = r |
|
|
| per_cat: dict[str, dict[str, int]] = defaultdict(lambda: {"n": 0, "basic_hit": 0, "halluc_hit": 0, "pair_hit": 0}) |
| for pair_id, sides in by_pair.items(): |
| cat = pair_id.split("/")[0] |
| b = sides.get("basic") |
| h = sides.get("hallucination") |
| if b is None or h is None: |
| continue |
| b_hit = vh_extract(b["prediction"], b["answer"]) |
| h_hit = vh_extract(h["prediction"], h["answer"]) |
| c = per_cat[cat] |
| c["n"] += 1 |
| c["basic_hit"] += int(b_hit) |
| c["halluc_hit"] += int(h_hit) |
| c["pair_hit"] += int(b_hit and h_hit) |
|
|
| summary: dict[str, Any] = {} |
| cat_pair_accs = [] |
| for cat in VH_CATEGORIES: |
| if cat not in per_cat: |
| continue |
| c = per_cat[cat] |
| n = max(c["n"], 1) |
| summary[cat] = { |
| "n_pairs": c["n"], |
| "basic_acc": c["basic_hit"] / n, |
| "halluc_acc": c["halluc_hit"] / n, |
| "pair_acc": c["pair_hit"] / n, |
| } |
| cat_pair_accs.append(c["pair_hit"] / n) |
| summary["overall"] = { |
| "pair_acc_macro": sum(cat_pair_accs) / max(len(cat_pair_accs), 1), |
| "n_categories": len(cat_pair_accs), |
| } |
| return summary |
|
|
|
|
| |
|
|
| EH_SPLITS = ["entire", "mix", "misleading"] |
| |
| EH_FOLDER = {"entire": "entire", "mix": "interleave", "misleading": "misleading"} |
| EH_FILE_PREFIX = {"entire": "entire", "mix": "interleave", "misleading": "misleading"} |
| EH_PROMPT_SUFFIX = "\nPlease answer yes or no:" |
|
|
|
|
| def load_eventhallusion(repo_root: Path, video_root: Path) -> list[dict[str, Any]]: |
| items: list[dict[str, Any]] = [] |
| for split in EH_SPLITS: |
| json_path = repo_root / "questions" / f"{split}_questions.json" |
| with open(json_path) as f: |
| data = json.load(f) |
| for vid_entry in data: |
| vid_id = vid_entry["id"] |
| num = vid_id.split("_", 1)[1] |
| video_file = video_root / EH_FOLDER[split] / f"{EH_FILE_PREFIX[split]}_{num}.mp4" |
| for q_idx, q in enumerate(vid_entry["questions"]): |
| items.append({ |
| "bench": "eventhallusion", |
| "split": split, |
| "video_id": vid_id, |
| "q_idx": q_idx, |
| "video": str(video_file), |
| "question": q["question"], |
| "answer": q["answer"].strip(), |
| }) |
| return items |
|
|
|
|
| def eh_extract(pred: str) -> str | None: |
| p = pred.strip().lower() |
| if p.startswith("yes"): |
| return "Yes." |
| if p.startswith("no"): |
| return "No." |
| return None |
|
|
|
|
| def eh_score(rows: list[dict[str, Any]]) -> dict[str, Any]: |
| per_split: dict[str, dict[str, int]] = defaultdict(lambda: {"n": 0, "correct": 0, "no_match": 0}) |
| for r in rows: |
| per_split[r["split"]]["n"] += 1 |
| pred_norm = eh_extract(r["prediction"]) |
| if pred_norm is None: |
| per_split[r["split"]]["no_match"] += 1 |
| if pred_norm == r["answer"]: |
| per_split[r["split"]]["correct"] += 1 |
|
|
| summary: dict[str, Any] = {} |
| total_n = total_c = 0 |
| for split in EH_SPLITS: |
| s = per_split.get(split) |
| if not s: |
| continue |
| summary[split] = {"n": s["n"], "correct": s["correct"], "acc": s["correct"] / max(s["n"], 1), "no_match": s["no_match"]} |
| total_n += s["n"] |
| total_c += s["correct"] |
| summary["overall"] = {"n": total_n, "correct": total_c, "acc": total_c / max(total_n, 1)} |
| return summary |
|
|
|
|
| |
|
|
| def _decode_video_32(path: str) -> tuple[torch.Tensor, float] | None: |
| """Read a video, sample exactly NFRAMES frames (with repetition for short videos), resize. |
| |
| Returns (tensor [T, C, H, W] uint8, sample_fps) or None on failure. |
| """ |
| try: |
| from decord import VideoReader, cpu |
| vr = VideoReader(path, ctx=cpu(0), num_threads=1) |
| total = len(vr) |
| if total < 1: |
| return None |
| try: |
| video_fps = float(vr.get_avg_fps()) |
| except Exception: |
| video_fps = 30.0 |
| |
| indices = np.linspace(0, total - 1, NFRAMES).round().astype(int).clip(0, total - 1).tolist() |
| frames = vr.get_batch(indices).asnumpy() |
| except Exception: |
| try: |
| import torchvision.io as tvio |
| tv_frames, _, info = tvio.read_video(path, pts_unit="sec", output_format="THWC") |
| if tv_frames.shape[0] < 1: |
| return None |
| total = int(tv_frames.shape[0]) |
| video_fps = float(info.get("video_fps", 30.0)) |
| indices = np.linspace(0, total - 1, NFRAMES).round().astype(int).clip(0, total - 1).tolist() |
| frames = tv_frames[indices].numpy() |
| except Exception: |
| return None |
|
|
| t = torch.from_numpy(np.ascontiguousarray(frames)).permute(0, 3, 1, 2).contiguous() |
| T, C, H, W = t.shape |
|
|
| |
| pix = H * W |
| if pix > VIDEO_MAX_PIXELS: |
| scale = (VIDEO_MAX_PIXELS / pix) ** 0.5 |
| elif pix < VIDEO_MIN_PIXELS: |
| scale = (VIDEO_MIN_PIXELS / pix) ** 0.5 |
| else: |
| scale = 1.0 |
| new_h = max(28, int(round(H * scale / 28)) * 28) |
| new_w = max(28, int(round(W * scale / 28)) * 28) |
| if (new_h, new_w) != (H, W): |
| import torchvision.transforms.functional as TF |
| t = TF.resize(t, [new_h, new_w], antialias=True) |
| t = t.to(torch.uint8) |
|
|
| |
| sample_fps = NFRAMES / max(total, 1) * video_fps |
| return t, float(sample_fps) |
|
|
|
|
| def _decode_one(idx_path_q_suffix): |
| """Worker: decode one video to a Qwen2.5-VL-ready tensor.""" |
| idx, video_path, _question, _suffix = idx_path_q_suffix |
| if not os.path.exists(video_path): |
| return idx, None, "missing" |
| out = _decode_video_32(video_path) |
| if out is None: |
| return idx, None, "decode_error" |
| video_tensor, sample_fps = out |
| return idx, (video_tensor, sample_fps), None |
|
|
|
|
| def build_inputs(items: list[dict[str, Any]], processor, suffix: str, max_workers: int = 32): |
| """Parallel-decode all videos via ThreadPoolExecutor (decord releases the GIL).""" |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
| args = [(i, it["video"], it["question"], suffix) for i, it in enumerate(items)] |
| results: dict[int, Any] = {} |
| missing = 0 |
| done = 0 |
| with ThreadPoolExecutor(max_workers=max_workers) as ex: |
| futs = [ex.submit(_decode_one, a) for a in args] |
| for fut in as_completed(futs): |
| idx, payload, err = fut.result() |
| done += 1 |
| if done % 200 == 0: |
| print(f"[INFO] decoded {done}/{len(args)}", file=sys.stderr, flush=True) |
| if err: |
| missing += 1 |
| if err == "decode_error": |
| print(f"[WARN] decode failed: {items[idx]['video']}", file=sys.stderr) |
| continue |
| video_tensor, sample_fps = payload |
| items[idx]["nframes_used"] = NFRAMES |
| results[idx] = (video_tensor, sample_fps) |
|
|
| |
| inputs = [] |
| kept = [] |
| for i, it in enumerate(items): |
| if i not in results: |
| continue |
| video_tensor, sample_fps = results[i] |
| messages = [{ |
| "role": "user", |
| "content": [ |
| {"type": "video", "video": it["video"]}, |
| {"type": "text", "text": it["question"] + suffix}, |
| ], |
| }] |
| prompt = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
| inputs.append({ |
| "prompt": prompt, |
| "multi_modal_data": {"video": video_tensor}, |
| "mm_processor_kwargs": {"fps": sample_fps}, |
| }) |
| kept.append(i) |
| if missing: |
| print(f"[INFO] {missing} items skipped (missing video or decode error)", file=sys.stderr) |
| return inputs, kept |
|
|
|
|
| def main(): |
| ap = argparse.ArgumentParser() |
| ap.add_argument("--model", required=True) |
| ap.add_argument("--bench", choices=["videohallucer", "eventhallusion"], required=True) |
| ap.add_argument("--vh_root", default="/mnt/local-fast/opd_zt/data/benchmarks/VideoHallucer") |
| ap.add_argument("--eh_repo", default="/mnt/local-fast/opd_zt/data/benchmarks/EventHallusion") |
| ap.add_argument("--eh_videos", default="/mnt/local-fast/opd_zt/data/benchmarks/EventHallusion/videos") |
| ap.add_argument("--out", required=True, help="Predictions JSONL output path") |
| ap.add_argument("--summary", required=True, help="Summary JSON path") |
| ap.add_argument("--tp", type=int, default=4) |
| ap.add_argument("--max_model_len", type=int, default=8192) |
| ap.add_argument("--gpu_mem", type=float, default=0.85) |
| ap.add_argument("--max_new_tokens", type=int, default=16) |
| ap.add_argument("--limit", type=int, default=0, help="0=no limit; truncate items for smoke tests") |
| ap.add_argument("--shard", type=int, default=0, help="This worker's shard index (0..num_shards-1)") |
| ap.add_argument("--num_shards", type=int, default=1, help=">1 to data-parallel across N workers") |
| args = ap.parse_args() |
|
|
| |
| if args.bench == "videohallucer": |
| items = load_videohallucer(Path(args.vh_root)) |
| suffix = VH_PROMPT_SUFFIX |
| scorer = vh_score |
| else: |
| items = load_eventhallusion(Path(args.eh_repo), Path(args.eh_videos)) |
| suffix = EH_PROMPT_SUFFIX |
| scorer = eh_score |
| if args.limit > 0: |
| items = items[: args.limit] |
| if args.num_shards > 1: |
| |
| |
| |
| def _shard_key(it): |
| return it.get("pair_id") or it.get("video_id") or it["video"] |
| items = [it for it in items if _stable_shard(_shard_key(it), args.num_shards) == args.shard] |
| print(f"[INFO] {args.bench}: shard {args.shard}/{args.num_shards}, {len(items)} items") |
| else: |
| print(f"[INFO] {args.bench}: {len(items)} items") |
|
|
| |
| processor = AutoProcessor.from_pretrained(args.model, trust_remote_code=True) |
| inputs, kept = build_inputs(items, processor, suffix) |
|
|
| |
| llm = LLM( |
| model=args.model, |
| tensor_parallel_size=args.tp, |
| max_model_len=args.max_model_len, |
| gpu_memory_utilization=args.gpu_mem, |
| limit_mm_per_prompt={"image": 0, "video": 1}, |
| trust_remote_code=True, |
| enforce_eager=False, |
| dtype="bfloat16", |
| ) |
| sp = SamplingParams(temperature=0.0, top_p=1.0, max_tokens=args.max_new_tokens) |
|
|
| t0 = time.time() |
| outputs = llm.generate(inputs, sp) |
| print(f"[INFO] generation: {time.time() - t0:.1f}s for {len(outputs)} prompts") |
|
|
| |
| Path(args.out).parent.mkdir(parents=True, exist_ok=True) |
| with open(args.out, "w") as fo: |
| for k, out in zip(kept, outputs): |
| items[k]["prediction"] = out.outputs[0].text |
| fo.write(json.dumps(items[k]) + "\n") |
|
|
| rows = [items[k] for k in kept] |
| summary = scorer(rows) |
| Path(args.summary).parent.mkdir(parents=True, exist_ok=True) |
| with open(args.summary, "w") as fs: |
| json.dump({"bench": args.bench, "model": args.model, "n": len(rows), "metrics": summary}, fs, indent=2) |
| print(json.dumps(summary, indent=2)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|