opd_zt / scripts /eval_hallu.py
sdzt's picture
Add files using upload-large-folder tool
bf46e5d verified
Raw
History Blame Contribute Delete
14.9 kB
#!/usr/bin/env python3
"""Evaluate Qwen2.5-VL on VideoHallucer + EventHallusion via vLLM.
VideoHallucer : paired basic/hallucinated yes-no, regex word match, final_hit = basic AND halluc.
EventHallusion : binary yes-no across entire/mix/misleading, startswith('yes'/'no') match.
Frames: 32 per video. Generation: greedy, max_new_tokens=16.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import re
import sys
import time
from collections import defaultdict
from pathlib import Path
from typing import Any
def _stable_shard(key: str, num_shards: int) -> int:
"""Deterministic shard assignment across processes (avoids PYTHONHASHSEED nondeterminism)."""
h = hashlib.md5(key.encode("utf-8")).digest()
return int.from_bytes(h[:8], "big") % num_shards
# vLLM workers must use 'spawn' since qwen_vl_utils imports torch in the parent.
os.environ.setdefault("VLLM_WORKER_MULTIPROC_METHOD", "spawn")
import numpy as np
import torch
from transformers import AutoProcessor
from vllm import LLM, SamplingParams
# Constrain per-frame patches so 32 frames + text fit comfortably in max_model_len.
# 128 * 28 * 28 ≈ 100k pixels per frame -> ~128 vision tokens / frame -> ~4096 video tokens.
VIDEO_MAX_PIXELS = 128 * 28 * 28
VIDEO_MIN_PIXELS = 16 * 28 * 28
NFRAMES = 32
# ---------- VideoHallucer ----------
VH_CATEGORIES = [
"object_relation",
"temporal",
"semantic_detail",
"external_factual",
"external_nonfactual",
]
VH_PROMPT_SUFFIX = "\nAnswer the question using 'yes' or 'no'."
def load_videohallucer(root: Path) -> list[dict[str, Any]]:
"""Flatten paired questions into a list of (pair_id, side, video, question, gold)."""
items: list[dict[str, Any]] = []
for cat in VH_CATEGORIES:
json_path = root / cat / f"{cat}.json"
if not json_path.exists():
print(f"[WARN] missing {json_path}", file=sys.stderr)
continue
with open(json_path) as f:
data = json.load(f)
video_dir = root / cat / "videos"
for idx, pair in enumerate(data):
pair_id = f"{cat}/{idx}"
for side in ("basic", "hallucination"):
q = pair[side]
items.append({
"bench": "videohallucer",
"category": cat,
"pair_id": pair_id,
"side": side,
"video": str(video_dir / q["video"]),
"question": q["question"],
"answer": q["answer"].strip().lower(),
})
return items
def vh_extract(pred: str, gold: str) -> bool:
return bool(re.search(r"\b" + re.escape(gold) + r"\b", pred, re.IGNORECASE))
def vh_score(rows: list[dict[str, Any]]) -> dict[str, Any]:
"""Reproduce evaluation_utils.py: per-category {basic_acc, halluc_acc, pair_acc}."""
by_pair: dict[str, dict[str, dict[str, Any]]] = defaultdict(dict)
for r in rows:
by_pair[r["pair_id"]][r["side"]] = r
per_cat: dict[str, dict[str, int]] = defaultdict(lambda: {"n": 0, "basic_hit": 0, "halluc_hit": 0, "pair_hit": 0})
for pair_id, sides in by_pair.items():
cat = pair_id.split("/")[0]
b = sides.get("basic")
h = sides.get("hallucination")
if b is None or h is None:
continue
b_hit = vh_extract(b["prediction"], b["answer"])
h_hit = vh_extract(h["prediction"], h["answer"])
c = per_cat[cat]
c["n"] += 1
c["basic_hit"] += int(b_hit)
c["halluc_hit"] += int(h_hit)
c["pair_hit"] += int(b_hit and h_hit)
summary: dict[str, Any] = {}
cat_pair_accs = []
for cat in VH_CATEGORIES:
if cat not in per_cat:
continue
c = per_cat[cat]
n = max(c["n"], 1)
summary[cat] = {
"n_pairs": c["n"],
"basic_acc": c["basic_hit"] / n,
"halluc_acc": c["halluc_hit"] / n,
"pair_acc": c["pair_hit"] / n,
}
cat_pair_accs.append(c["pair_hit"] / n)
summary["overall"] = {
"pair_acc_macro": sum(cat_pair_accs) / max(len(cat_pair_accs), 1),
"n_categories": len(cat_pair_accs),
}
return summary
# ---------- EventHallusion ----------
EH_SPLITS = ["entire", "mix", "misleading"]
# The released video zip stores the "mix" split as "interleave" — IDs map mix_NNN <-> interleave_NNN.
EH_FOLDER = {"entire": "entire", "mix": "interleave", "misleading": "misleading"}
EH_FILE_PREFIX = {"entire": "entire", "mix": "interleave", "misleading": "misleading"}
EH_PROMPT_SUFFIX = "\nPlease answer yes or no:"
def load_eventhallusion(repo_root: Path, video_root: Path) -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
for split in EH_SPLITS:
json_path = repo_root / "questions" / f"{split}_questions.json"
with open(json_path) as f:
data = json.load(f)
for vid_entry in data:
vid_id = vid_entry["id"]
num = vid_id.split("_", 1)[1] # e.g. "001"
video_file = video_root / EH_FOLDER[split] / f"{EH_FILE_PREFIX[split]}_{num}.mp4"
for q_idx, q in enumerate(vid_entry["questions"]):
items.append({
"bench": "eventhallusion",
"split": split,
"video_id": vid_id,
"q_idx": q_idx,
"video": str(video_file),
"question": q["question"],
"answer": q["answer"].strip(), # "Yes." / "No."
})
return items
def eh_extract(pred: str) -> str | None:
p = pred.strip().lower()
if p.startswith("yes"):
return "Yes."
if p.startswith("no"):
return "No."
return None
def eh_score(rows: list[dict[str, Any]]) -> dict[str, Any]:
per_split: dict[str, dict[str, int]] = defaultdict(lambda: {"n": 0, "correct": 0, "no_match": 0})
for r in rows:
per_split[r["split"]]["n"] += 1
pred_norm = eh_extract(r["prediction"])
if pred_norm is None:
per_split[r["split"]]["no_match"] += 1
if pred_norm == r["answer"]:
per_split[r["split"]]["correct"] += 1
summary: dict[str, Any] = {}
total_n = total_c = 0
for split in EH_SPLITS:
s = per_split.get(split)
if not s:
continue
summary[split] = {"n": s["n"], "correct": s["correct"], "acc": s["correct"] / max(s["n"], 1), "no_match": s["no_match"]}
total_n += s["n"]
total_c += s["correct"]
summary["overall"] = {"n": total_n, "correct": total_c, "acc": total_c / max(total_n, 1)}
return summary
# ---------- vLLM driver ----------
def _decode_video_32(path: str) -> tuple[torch.Tensor, float] | None:
"""Read a video, sample exactly NFRAMES frames (with repetition for short videos), resize.
Returns (tensor [T, C, H, W] uint8, sample_fps) or None on failure.
"""
try:
from decord import VideoReader, cpu
vr = VideoReader(path, ctx=cpu(0), num_threads=1)
total = len(vr)
if total < 1:
return None
try:
video_fps = float(vr.get_avg_fps())
except Exception:
video_fps = 30.0
# np.linspace allows repeats when total<NFRAMES.
indices = np.linspace(0, total - 1, NFRAMES).round().astype(int).clip(0, total - 1).tolist()
frames = vr.get_batch(indices).asnumpy() # [T, H, W, C] uint8
except Exception:
try:
import torchvision.io as tvio
tv_frames, _, info = tvio.read_video(path, pts_unit="sec", output_format="THWC")
if tv_frames.shape[0] < 1:
return None
total = int(tv_frames.shape[0])
video_fps = float(info.get("video_fps", 30.0))
indices = np.linspace(0, total - 1, NFRAMES).round().astype(int).clip(0, total - 1).tolist()
frames = tv_frames[indices].numpy()
except Exception:
return None
t = torch.from_numpy(np.ascontiguousarray(frames)).permute(0, 3, 1, 2).contiguous() # [T, C, H, W]
T, C, H, W = t.shape
# Resize so per-frame pixel count <= VIDEO_MAX_PIXELS and >= VIDEO_MIN_PIXELS, and H/W multiples of 28.
pix = H * W
if pix > VIDEO_MAX_PIXELS:
scale = (VIDEO_MAX_PIXELS / pix) ** 0.5
elif pix < VIDEO_MIN_PIXELS:
scale = (VIDEO_MIN_PIXELS / pix) ** 0.5
else:
scale = 1.0
new_h = max(28, int(round(H * scale / 28)) * 28)
new_w = max(28, int(round(W * scale / 28)) * 28)
if (new_h, new_w) != (H, W):
import torchvision.transforms.functional as TF
t = TF.resize(t, [new_h, new_w], antialias=True)
t = t.to(torch.uint8)
# Sample fps for temporal positional encoding (matches Qwen2.5-VL convention: nframes / total * orig_fps).
sample_fps = NFRAMES / max(total, 1) * video_fps
return t, float(sample_fps)
def _decode_one(idx_path_q_suffix):
"""Worker: decode one video to a Qwen2.5-VL-ready tensor."""
idx, video_path, _question, _suffix = idx_path_q_suffix
if not os.path.exists(video_path):
return idx, None, "missing"
out = _decode_video_32(video_path)
if out is None:
return idx, None, "decode_error"
video_tensor, sample_fps = out
return idx, (video_tensor, sample_fps), None
def build_inputs(items: list[dict[str, Any]], processor, suffix: str, max_workers: int = 32):
"""Parallel-decode all videos via ThreadPoolExecutor (decord releases the GIL)."""
from concurrent.futures import ThreadPoolExecutor, as_completed
args = [(i, it["video"], it["question"], suffix) for i, it in enumerate(items)]
results: dict[int, Any] = {}
missing = 0
done = 0
with ThreadPoolExecutor(max_workers=max_workers) as ex:
futs = [ex.submit(_decode_one, a) for a in args]
for fut in as_completed(futs):
idx, payload, err = fut.result()
done += 1
if done % 200 == 0:
print(f"[INFO] decoded {done}/{len(args)}", file=sys.stderr, flush=True)
if err:
missing += 1
if err == "decode_error":
print(f"[WARN] decode failed: {items[idx]['video']}", file=sys.stderr)
continue
video_tensor, sample_fps = payload
items[idx]["nframes_used"] = NFRAMES
results[idx] = (video_tensor, sample_fps)
# Build vLLM inputs in original order.
inputs = []
kept = []
for i, it in enumerate(items):
if i not in results:
continue
video_tensor, sample_fps = results[i]
messages = [{
"role": "user",
"content": [
{"type": "video", "video": it["video"]},
{"type": "text", "text": it["question"] + suffix},
],
}]
prompt = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs.append({
"prompt": prompt,
"multi_modal_data": {"video": video_tensor},
"mm_processor_kwargs": {"fps": sample_fps},
})
kept.append(i)
if missing:
print(f"[INFO] {missing} items skipped (missing video or decode error)", file=sys.stderr)
return inputs, kept
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--model", required=True)
ap.add_argument("--bench", choices=["videohallucer", "eventhallusion"], required=True)
ap.add_argument("--vh_root", default="/mnt/local-fast/opd_zt/data/benchmarks/VideoHallucer")
ap.add_argument("--eh_repo", default="/mnt/local-fast/opd_zt/data/benchmarks/EventHallusion")
ap.add_argument("--eh_videos", default="/mnt/local-fast/opd_zt/data/benchmarks/EventHallusion/videos")
ap.add_argument("--out", required=True, help="Predictions JSONL output path")
ap.add_argument("--summary", required=True, help="Summary JSON path")
ap.add_argument("--tp", type=int, default=4)
ap.add_argument("--max_model_len", type=int, default=8192)
ap.add_argument("--gpu_mem", type=float, default=0.85)
ap.add_argument("--max_new_tokens", type=int, default=16)
ap.add_argument("--limit", type=int, default=0, help="0=no limit; truncate items for smoke tests")
ap.add_argument("--shard", type=int, default=0, help="This worker's shard index (0..num_shards-1)")
ap.add_argument("--num_shards", type=int, default=1, help=">1 to data-parallel across N workers")
args = ap.parse_args()
# Load items.
if args.bench == "videohallucer":
items = load_videohallucer(Path(args.vh_root))
suffix = VH_PROMPT_SUFFIX
scorer = vh_score
else:
items = load_eventhallusion(Path(args.eh_repo), Path(args.eh_videos))
suffix = EH_PROMPT_SUFFIX
scorer = eh_score
if args.limit > 0:
items = items[: args.limit]
if args.num_shards > 1:
# Shard by a group key so paired/related items stay together:
# VH: pair_id (basic+halluc must be in the same shard for pair_acc)
# EH: video_id (questions on same video together for cache efficiency)
def _shard_key(it):
return it.get("pair_id") or it.get("video_id") or it["video"]
items = [it for it in items if _stable_shard(_shard_key(it), args.num_shards) == args.shard]
print(f"[INFO] {args.bench}: shard {args.shard}/{args.num_shards}, {len(items)} items")
else:
print(f"[INFO] {args.bench}: {len(items)} items")
# Build processor + inputs.
processor = AutoProcessor.from_pretrained(args.model, trust_remote_code=True)
inputs, kept = build_inputs(items, processor, suffix)
# Spin up vLLM.
llm = LLM(
model=args.model,
tensor_parallel_size=args.tp,
max_model_len=args.max_model_len,
gpu_memory_utilization=args.gpu_mem,
limit_mm_per_prompt={"image": 0, "video": 1},
trust_remote_code=True,
enforce_eager=False,
dtype="bfloat16",
)
sp = SamplingParams(temperature=0.0, top_p=1.0, max_tokens=args.max_new_tokens)
t0 = time.time()
outputs = llm.generate(inputs, sp)
print(f"[INFO] generation: {time.time() - t0:.1f}s for {len(outputs)} prompts")
# Attach predictions and write.
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
with open(args.out, "w") as fo:
for k, out in zip(kept, outputs):
items[k]["prediction"] = out.outputs[0].text
fo.write(json.dumps(items[k]) + "\n")
rows = [items[k] for k in kept]
summary = scorer(rows)
Path(args.summary).parent.mkdir(parents=True, exist_ok=True)
with open(args.summary, "w") as fs:
json.dump({"bench": args.bench, "model": args.model, "n": len(rows), "metrics": summary}, fs, indent=2)
print(json.dumps(summary, indent=2))
if __name__ == "__main__":
main()