DW-KhotTaeVL-2B-QueryFrames / eval_videomme.py
commandeaw's picture
Add --start-idx for chunked evaluation (MPS resilience)
50d1d87 verified
"""Standalone Video-MME mini eval for DW-KhotTaeVL-2B-QueryFrames.
This script reproduces the MCQ-mode (no task_type) QA-frame numbers
reported in the model card. It is fully self-contained — only
depends on the `dw_queryframes.py` module shipped in this same
directory plus publicly-available datasets / models from Hugging Face.
Usage::
pip install torch transformers pillow decord huggingface_hub pandas pyarrow
# MCQ mode (query-aware frame selection, no task_type)
python eval_videomme.py --mode mcq --n-questions 50
# Stock baseline (uniform 8 frames; matches the stock numbers
# in the model card)
python eval_videomme.py --mode stock-uniform --n-questions 50
For task-aware MCQ mode (uses Video-MME's own task_type label to
route Object/Temporal Reasoning questions to uniform sampling),
run both modes above then combine via ``build_hybrid.py``.
The legacy CLI value ``--mode wild`` is accepted as a deprecated
alias for ``--mode mcq``.
Outputs JSON with ``summary`` + ``results`` keys.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import time
import zipfile
from pathlib import Path
import pandas as pd
from huggingface_hub import hf_hub_download
from PIL import Image
# ---------------------------------------------------------------------------
# Public Video-MME mini assets (lmms-lab/Video-MME on Hugging Face).
# ---------------------------------------------------------------------------
REPO_ID = "lmms-lab/Video-MME"
REPO_TYPE = "dataset"
DEFAULT_CHUNKS = ["videos_chunked_01.zip"]
PARQUET_NAME = "videomme/test-00000-of-00001.parquet"
# Cache lives next to this script so a fresh ``git clone`` of the HF
# repo can reproduce results without touching the user's home directory.
CACHE_DIR = Path(__file__).resolve().parent / "cache" / "videomme_mini"
CACHE_DIR.mkdir(parents=True, exist_ok=True)
PROMPT_TEMPLATE = (
"This is a representative frame from a video.\n"
"Select the best answer based on the video.\n\n"
"Question: {question}\n"
"Options:\n{options}\n"
"Answer with only the letter."
)
ANSWER_RE = re.compile(r"\b([ABCD])\b", re.IGNORECASE)
ALPTD_ANSWER_RE = re.compile(r"Answer:\s*([ABCD])\b", re.IGNORECASE)
# ---------------------------------------------------------------------------
# Asset management — fetch + unzip into CACHE_DIR.
# ---------------------------------------------------------------------------
def download_assets(chunks: list[str]) -> tuple[Path, list[Path]]:
print(f"[eval] ensuring {PARQUET_NAME} ...")
pq_path = Path(hf_hub_download(
repo_id=REPO_ID, repo_type=REPO_TYPE, filename=PARQUET_NAME,
cache_dir=str(CACHE_DIR / "hf"),
))
zip_paths: list[Path] = []
for name in chunks:
zp = Path(hf_hub_download(
repo_id=REPO_ID, repo_type=REPO_TYPE, filename=name,
cache_dir=str(CACHE_DIR / "hf"),
))
zip_paths.append(zp)
return pq_path, zip_paths
def unzip_chunks(zip_paths: list[Path]) -> Path:
video_dir = CACHE_DIR / "video"
video_dir.mkdir(parents=True, exist_ok=True)
for zp in zip_paths:
existing = {p.stem for p in video_dir.glob("*.mp4")}
with zipfile.ZipFile(zp, "r") as zf:
to_extract = [
m for m in zf.namelist()
if m.endswith(".mp4") and Path(m).stem not in existing
]
if to_extract:
print(f"[eval] extracting {len(to_extract)} mp4s from {zp.name}")
for m in to_extract:
with zf.open(m) as src, open(video_dir / Path(m).name, "wb") as dst:
dst.write(src.read())
return video_dir
def load_questions(pq_path: Path, video_dir: Path, limit: int,
start_idx: int = 0) -> pd.DataFrame:
"""Load questions filtered to videos on disk.
``start_idx`` skips the first N rows after the videoID filter, which
is useful for chunked / resumable evaluation when the underlying
accelerator (e.g. Apple MPS) corrupts state on long runs.
"""
df = pd.read_parquet(pq_path)
ids = {p.stem for p in video_dir.glob("*.mp4")}
df = df[df["videoID"].isin(ids)].reset_index(drop=True)
total_avail = len(df)
if start_idx > 0:
df = df.iloc[start_idx:].reset_index(drop=True)
if limit > 0 and len(df) > limit:
df = df.iloc[:limit].copy()
print(f"[eval] using {len(df)} questions "
f"(start_idx={start_idx}, total_available={total_avail})")
return df
def format_options(options) -> str:
return "\n".join(str(o).strip() for o in options)
def extract_letter(text: str) -> str | None:
s = text or ""
m = ALPTD_ANSWER_RE.search(s)
if m:
return m.group(1).upper()
m = ANSWER_RE.search(s)
return m.group(1).upper() if m else None
# ---------------------------------------------------------------------------
# Frame selection lives in the local QueryFrames module.
# ---------------------------------------------------------------------------
sys.path.insert(0, str(Path(__file__).resolve().parent))
from dw_queryframes import QueryFrames # noqa: E402
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--base", default="Qwen/Qwen3-VL-2B-Instruct")
ap.add_argument("--clip-model", default="openai/clip-vit-large-patch14")
ap.add_argument("--mode", choices=["mcq", "wild", "stock-uniform"],
default="mcq",
help="'mcq' = query-aware MCQ mode (default); "
"'wild' = deprecated alias for 'mcq'; "
"'stock-uniform' = stock baseline (uniform 8 frames)")
ap.add_argument("--tag", default="")
ap.add_argument("--n-questions", type=int, default=50,
help="number of questions to score in this run (after start-idx)")
ap.add_argument("--start-idx", type=int, default=0,
help="skip the first N filtered questions; useful for "
"chunked / resumable evaluation when the accelerator "
"(e.g. Apple MPS) corrupts state on long runs")
ap.add_argument("--n-frames", type=int, default=8)
ap.add_argument("--n-candidates", type=int, default=32)
ap.add_argument("--max-pixels", type=int, default=262144)
ap.add_argument("--max-new-tokens", type=int, default=8)
ap.add_argument("--out-json", default=None,
help="output JSON path (auto-named if omitted)")
ap.add_argument("--chunks", nargs="+", default=DEFAULT_CHUNKS)
args = ap.parse_args()
# Legacy alias: 'wild' → 'mcq' (deprecated).
if args.mode == "wild":
args.mode = "mcq"
pq_path, zip_paths = download_assets(args.chunks)
video_dir = unzip_chunks(zip_paths)
df = load_questions(pq_path, video_dir, args.n_questions,
start_idx=args.start_idx)
os.environ.setdefault("PYTORCH_ENABLE_MPS_FALLBACK", "1")
fv = QueryFrames(
base_model=args.base,
clip_model=args.clip_model,
device="auto",
max_pixels=args.max_pixels,
max_new_tokens=args.max_new_tokens,
n_frames=args.n_frames,
n_candidates=args.n_candidates,
)
results = []
correct = 0
t0 = time.time()
for i, row in df.iterrows():
# Absolute index into the full filtered df (so chunks have unique idx).
abs_idx = int(i) + args.start_idx
video_path = video_dir / f"{row['videoID']}.mp4"
# MCQ mode = query-aware (task_type=None lets QA path run).
# Stock-uniform = pass a known no-frame-gain task name to force
# the uniform-fallback path (matches stock 8f
# baseline behavior).
forced_uniform = (args.mode == "stock-uniform")
try:
out = fv.answer_mcq(
video_path=video_path,
question=row["question"],
options=list(row["options"]),
task_type=("Object Reasoning" if forced_uniform else None),
)
except Exception as e:
# MPS / accelerator state corruption sometimes triggers
# mid-run on long inference. Save what we have and exit so
# an outer chunked-runner can pick up from start-idx + i.
print(f"[eval] FATAL at q {abs_idx}: {type(e).__name__}: {e}",
flush=True)
print(f"[eval] saving partial results ({len(results)}) "
f"and exiting so caller can resume.", flush=True)
break
gold = row["answer"].strip().upper()
ok = out["pred"] == gold
correct += int(ok)
results.append({
"index": abs_idx,
"videoID": row["videoID"],
"task_type": row.get("task_type", ""),
"gold": gold,
"pred": out["pred"],
"raw": out["raw"][:200],
"frames_used": out["frames_used"],
"latency_clip_s": out["latency_clip_s"],
"latency_gen_s": out["latency_gen_s"],
"correct": ok,
})
run = correct / (i + 1)
print(f"[eval] [{abs_idx+1}/{args.start_idx + len(df)}] "
f"gold={gold} pred={out['pred']} "
f"acc_so_far={run:.3f} clip={out['latency_clip_s']}s "
f"gen={out['latency_gen_s']}s", flush=True)
n = len(results)
acc = correct / n if n else 0.0
summary = {
"model_base": args.base,
"clip_model": args.clip_model,
"mode": args.mode,
"tag": args.tag,
"start_idx": args.start_idx,
"n_questions_attempted": len(df),
"n_questions": n,
"n_frames": args.n_frames,
"n_candidates": args.n_candidates,
"max_pixels": args.max_pixels,
"max_new_tokens": args.max_new_tokens,
"accuracy": round(acc, 4),
"wall_time_s": round(time.time() - t0, 1),
}
out_path = args.out_json
if out_path is None:
tag = (args.tag or args.mode)
out_path = str(CACHE_DIR.parent / f"eval_{tag}_{n}q.json")
Path(out_path).parent.mkdir(parents=True, exist_ok=True)
Path(out_path).write_text(json.dumps(
{"summary": summary, "results": results}, indent=2))
print(f"\n[eval] mode={args.mode} acc={acc:.4f} ({correct}/{n}) saved {out_path}")
return 0
if __name__ == "__main__":
sys.exit(main())