"""Standalone Video-MME mini eval for DW-KhotTaeVL-2B-QueryFrames. This script reproduces the MCQ-mode (no task_type) QA-frame numbers reported in the model card. It is fully self-contained — only depends on the `dw_queryframes.py` module shipped in this same directory plus publicly-available datasets / models from Hugging Face. Usage:: pip install torch transformers pillow decord huggingface_hub pandas pyarrow # MCQ mode (query-aware frame selection, no task_type) python eval_videomme.py --mode mcq --n-questions 50 # Stock baseline (uniform 8 frames; matches the stock numbers # in the model card) python eval_videomme.py --mode stock-uniform --n-questions 50 For task-aware MCQ mode (uses Video-MME's own task_type label to route Object/Temporal Reasoning questions to uniform sampling), run both modes above then combine via ``build_hybrid.py``. The legacy CLI value ``--mode wild`` is accepted as a deprecated alias for ``--mode mcq``. Outputs JSON with ``summary`` + ``results`` keys. """ from __future__ import annotations import argparse import json import os import re import sys import time import zipfile from pathlib import Path import pandas as pd from huggingface_hub import hf_hub_download from PIL import Image # --------------------------------------------------------------------------- # Public Video-MME mini assets (lmms-lab/Video-MME on Hugging Face). # --------------------------------------------------------------------------- REPO_ID = "lmms-lab/Video-MME" REPO_TYPE = "dataset" DEFAULT_CHUNKS = ["videos_chunked_01.zip"] PARQUET_NAME = "videomme/test-00000-of-00001.parquet" # Cache lives next to this script so a fresh ``git clone`` of the HF # repo can reproduce results without touching the user's home directory. CACHE_DIR = Path(__file__).resolve().parent / "cache" / "videomme_mini" CACHE_DIR.mkdir(parents=True, exist_ok=True) PROMPT_TEMPLATE = ( "This is a representative frame from a video.\n" "Select the best answer based on the video.\n\n" "Question: {question}\n" "Options:\n{options}\n" "Answer with only the letter." ) ANSWER_RE = re.compile(r"\b([ABCD])\b", re.IGNORECASE) ALPTD_ANSWER_RE = re.compile(r"Answer:\s*([ABCD])\b", re.IGNORECASE) # --------------------------------------------------------------------------- # Asset management — fetch + unzip into CACHE_DIR. # --------------------------------------------------------------------------- def download_assets(chunks: list[str]) -> tuple[Path, list[Path]]: print(f"[eval] ensuring {PARQUET_NAME} ...") pq_path = Path(hf_hub_download( repo_id=REPO_ID, repo_type=REPO_TYPE, filename=PARQUET_NAME, cache_dir=str(CACHE_DIR / "hf"), )) zip_paths: list[Path] = [] for name in chunks: zp = Path(hf_hub_download( repo_id=REPO_ID, repo_type=REPO_TYPE, filename=name, cache_dir=str(CACHE_DIR / "hf"), )) zip_paths.append(zp) return pq_path, zip_paths def unzip_chunks(zip_paths: list[Path]) -> Path: video_dir = CACHE_DIR / "video" video_dir.mkdir(parents=True, exist_ok=True) for zp in zip_paths: existing = {p.stem for p in video_dir.glob("*.mp4")} with zipfile.ZipFile(zp, "r") as zf: to_extract = [ m for m in zf.namelist() if m.endswith(".mp4") and Path(m).stem not in existing ] if to_extract: print(f"[eval] extracting {len(to_extract)} mp4s from {zp.name}") for m in to_extract: with zf.open(m) as src, open(video_dir / Path(m).name, "wb") as dst: dst.write(src.read()) return video_dir def load_questions(pq_path: Path, video_dir: Path, limit: int, start_idx: int = 0) -> pd.DataFrame: """Load questions filtered to videos on disk. ``start_idx`` skips the first N rows after the videoID filter, which is useful for chunked / resumable evaluation when the underlying accelerator (e.g. Apple MPS) corrupts state on long runs. """ df = pd.read_parquet(pq_path) ids = {p.stem for p in video_dir.glob("*.mp4")} df = df[df["videoID"].isin(ids)].reset_index(drop=True) total_avail = len(df) if start_idx > 0: df = df.iloc[start_idx:].reset_index(drop=True) if limit > 0 and len(df) > limit: df = df.iloc[:limit].copy() print(f"[eval] using {len(df)} questions " f"(start_idx={start_idx}, total_available={total_avail})") return df def format_options(options) -> str: return "\n".join(str(o).strip() for o in options) def extract_letter(text: str) -> str | None: s = text or "" m = ALPTD_ANSWER_RE.search(s) if m: return m.group(1).upper() m = ANSWER_RE.search(s) return m.group(1).upper() if m else None # --------------------------------------------------------------------------- # Frame selection lives in the local QueryFrames module. # --------------------------------------------------------------------------- sys.path.insert(0, str(Path(__file__).resolve().parent)) from dw_queryframes import QueryFrames # noqa: E402 def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--base", default="Qwen/Qwen3-VL-2B-Instruct") ap.add_argument("--clip-model", default="openai/clip-vit-large-patch14") ap.add_argument("--mode", choices=["mcq", "wild", "stock-uniform"], default="mcq", help="'mcq' = query-aware MCQ mode (default); " "'wild' = deprecated alias for 'mcq'; " "'stock-uniform' = stock baseline (uniform 8 frames)") ap.add_argument("--tag", default="") ap.add_argument("--n-questions", type=int, default=50, help="number of questions to score in this run (after start-idx)") ap.add_argument("--start-idx", type=int, default=0, help="skip the first N filtered questions; useful for " "chunked / resumable evaluation when the accelerator " "(e.g. Apple MPS) corrupts state on long runs") ap.add_argument("--n-frames", type=int, default=8) ap.add_argument("--n-candidates", type=int, default=32) ap.add_argument("--max-pixels", type=int, default=262144) ap.add_argument("--max-new-tokens", type=int, default=8) ap.add_argument("--out-json", default=None, help="output JSON path (auto-named if omitted)") ap.add_argument("--chunks", nargs="+", default=DEFAULT_CHUNKS) args = ap.parse_args() # Legacy alias: 'wild' → 'mcq' (deprecated). if args.mode == "wild": args.mode = "mcq" pq_path, zip_paths = download_assets(args.chunks) video_dir = unzip_chunks(zip_paths) df = load_questions(pq_path, video_dir, args.n_questions, start_idx=args.start_idx) os.environ.setdefault("PYTORCH_ENABLE_MPS_FALLBACK", "1") fv = QueryFrames( base_model=args.base, clip_model=args.clip_model, device="auto", max_pixels=args.max_pixels, max_new_tokens=args.max_new_tokens, n_frames=args.n_frames, n_candidates=args.n_candidates, ) results = [] correct = 0 t0 = time.time() for i, row in df.iterrows(): # Absolute index into the full filtered df (so chunks have unique idx). abs_idx = int(i) + args.start_idx video_path = video_dir / f"{row['videoID']}.mp4" # MCQ mode = query-aware (task_type=None lets QA path run). # Stock-uniform = pass a known no-frame-gain task name to force # the uniform-fallback path (matches stock 8f # baseline behavior). forced_uniform = (args.mode == "stock-uniform") try: out = fv.answer_mcq( video_path=video_path, question=row["question"], options=list(row["options"]), task_type=("Object Reasoning" if forced_uniform else None), ) except Exception as e: # MPS / accelerator state corruption sometimes triggers # mid-run on long inference. Save what we have and exit so # an outer chunked-runner can pick up from start-idx + i. print(f"[eval] FATAL at q {abs_idx}: {type(e).__name__}: {e}", flush=True) print(f"[eval] saving partial results ({len(results)}) " f"and exiting so caller can resume.", flush=True) break gold = row["answer"].strip().upper() ok = out["pred"] == gold correct += int(ok) results.append({ "index": abs_idx, "videoID": row["videoID"], "task_type": row.get("task_type", ""), "gold": gold, "pred": out["pred"], "raw": out["raw"][:200], "frames_used": out["frames_used"], "latency_clip_s": out["latency_clip_s"], "latency_gen_s": out["latency_gen_s"], "correct": ok, }) run = correct / (i + 1) print(f"[eval] [{abs_idx+1}/{args.start_idx + len(df)}] " f"gold={gold} pred={out['pred']} " f"acc_so_far={run:.3f} clip={out['latency_clip_s']}s " f"gen={out['latency_gen_s']}s", flush=True) n = len(results) acc = correct / n if n else 0.0 summary = { "model_base": args.base, "clip_model": args.clip_model, "mode": args.mode, "tag": args.tag, "start_idx": args.start_idx, "n_questions_attempted": len(df), "n_questions": n, "n_frames": args.n_frames, "n_candidates": args.n_candidates, "max_pixels": args.max_pixels, "max_new_tokens": args.max_new_tokens, "accuracy": round(acc, 4), "wall_time_s": round(time.time() - t0, 1), } out_path = args.out_json if out_path is None: tag = (args.tag or args.mode) out_path = str(CACHE_DIR.parent / f"eval_{tag}_{n}q.json") Path(out_path).parent.mkdir(parents=True, exist_ok=True) Path(out_path).write_text(json.dumps( {"summary": summary, "results": results}, indent=2)) print(f"\n[eval] mode={args.mode} acc={acc:.4f} ({correct}/{n}) saved {out_path}") return 0 if __name__ == "__main__": sys.exit(main())