"""MBench-V annotation UI (Gradio Space). Reads videos streaming from the `studyOverflow/TempMemoryData` dataset repo, writes annotations back to the same repo under `annotations/`, batched via `CommitScheduler`. Design notes ------------ - Videos are NOT copied into this Space. We build CDN URLs with `hf_hub_url(..., repo_type="dataset")` and let the browser stream them. - Submissions are appended to a per-process JSONL file under `annotations/`; `CommitScheduler` pushes the directory to the dataset repo every 5 min. - Allocation is intentionally simple in this template: at start-up we build a single shuffled pool of `(model, task_id)` pairs, and each user session maintains its own index into that pool. Multi-annotator deduplication is out of scope for the first iteration. """ from __future__ import annotations import json import os import random import time import uuid from pathlib import Path from typing import Any import gradio as gr from huggingface_hub import CommitScheduler, hf_hub_download, hf_hub_url # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- DATASET_REPO = "studyOverflow/TempMemoryData" MERGED_JSON_PATH = "MBench-V/merged.json" # 6 models that are already fully reorganized on HF (584 videos each). # `skyreels` and `longcat` are excluded until their 0422 runs finish. MODELS: list[str] = [ "causal_forcing", "self_forcing", "cosmos", "helios", "longlive", "memflow", ] HF_TOKEN = os.environ.get("HF_TOKEN") # must be set in Space secrets for writes # Local staging directory that CommitScheduler will sync to the dataset repo. ANN_DIR = Path("annotations_local") ANN_DIR.mkdir(exist_ok=True) # Each Space process writes to its own JSONL so concurrent replicas don't # clobber each other's writes. `CommitScheduler` pushes the whole directory. PROCESS_ID = uuid.uuid4().hex[:8] ANN_FILE = ANN_DIR / f"ann_{PROCESS_ID}.jsonl" COMMIT_INTERVAL_MIN = 5 # --------------------------------------------------------------------------- # Load merged.json (584 task records) once at startup # --------------------------------------------------------------------------- def _load_merged() -> list[dict[str, Any]]: local = hf_hub_download( repo_id=DATASET_REPO, filename=MERGED_JSON_PATH, repo_type="dataset", token=HF_TOKEN, ) with open(local, encoding="utf-8") as f: return json.load(f) TASKS: list[dict[str, Any]] = _load_merged() TASK_BY_ID: dict[str, dict[str, Any]] = {t["task_id"]: t for t in TASKS} def _extract_prompt(task: dict[str, Any]) -> str: """Return the first non-empty prompt string found in the task record.""" gp = task.get("generation_prompts") or {} prompts = gp.get("prompts") or {} for level in ("level_1", "level_2", "level_3"): val = prompts.get(level) if isinstance(val, list) and val: return val[0] if isinstance(val, str) and val: return val return "(no prompt found)" # --------------------------------------------------------------------------- # Build the (model, task_id) pool # --------------------------------------------------------------------------- def _build_pool() -> list[tuple[str, str]]: pool: list[tuple[str, str]] = [] for m in MODELS: for t in TASKS: pool.append((m, t["task_id"])) return pool POOL: list[tuple[str, str]] = _build_pool() print(f"[mbench-ann] loaded {len(TASKS)} tasks × {len(MODELS)} models = {len(POOL)} items") def _video_url(model: str, task_id: str) -> str: return hf_hub_url( DATASET_REPO, filename=f"MBench-V/{model}/videos/{task_id}.mp4", repo_type="dataset", ) # --------------------------------------------------------------------------- # CommitScheduler — pushes annotations_local/ to DATASET_REPO every 5 min # --------------------------------------------------------------------------- scheduler: CommitScheduler | None = None if HF_TOKEN: scheduler = CommitScheduler( repo_id=DATASET_REPO, repo_type="dataset", folder_path=str(ANN_DIR), path_in_repo="annotations", every=COMMIT_INTERVAL_MIN, token=HF_TOKEN, private=False, squash_history=False, ) print(f"[mbench-ann] CommitScheduler started (every {COMMIT_INTERVAL_MIN} min)") else: print("[mbench-ann] WARNING: HF_TOKEN not set — annotations will stay local only") def _append_annotation(record: dict[str, Any]) -> None: line = json.dumps(record, ensure_ascii=False) if scheduler is not None: with scheduler.lock: with ANN_FILE.open("a", encoding="utf-8") as f: f.write(line + "\n") else: with ANN_FILE.open("a", encoding="utf-8") as f: f.write(line + "\n") # --------------------------------------------------------------------------- # UI helpers # --------------------------------------------------------------------------- def _format_meta(model: str, task: dict[str, Any], idx: int, total: int) -> str: lines = [ f"**Progress**: {idx + 1} / {total}", f"**Model**: `{model}`", f"**task_id**: `{task['task_id']}`", f"**category**: `{task.get('category', '?')}` • **subcategory**: `{task.get('subcategory', '?')}`", f"**source_task**: `{task.get('source_task', '?')}`", ] if task.get("task_type"): lines.append(f"**task_type**: `{task['task_type']}`") return "\n\n".join(lines) def _load_item(pool_order: list[int], idx: int) -> tuple[str, str, str]: """Return (video_url, meta_markdown, prompt_text) for position `idx`.""" if idx < 0 or idx >= len(pool_order): return "", "**All done!** No more items.", "" model, task_id = POOL[pool_order[idx]] task = TASK_BY_ID[task_id] return ( _video_url(model, task_id), _format_meta(model, task, idx, len(pool_order)), _extract_prompt(task), ) # --------------------------------------------------------------------------- # Gradio callbacks # --------------------------------------------------------------------------- def start_session(annotator: str, state: dict | None): annotator = (annotator or "").strip() if not annotator: return ( state, gr.update(visible=True), # login panel stays gr.update(visible=False), # annotation panel hidden "", "", "", gr.update(value="Please enter a name first."), ) # Build this user's shuffled order order = list(range(len(POOL))) rng = random.Random(f"{annotator}-{int(time.time())}") rng.shuffle(order) state = {"annotator": annotator, "order": order, "idx": 0} video, meta, prompt = _load_item(order, 0) return ( state, gr.update(visible=False), gr.update(visible=True), video, meta, prompt, gr.update(value=f"Logged in as `{annotator}`"), ) def _advance(state: dict, record_submitted: bool): state["idx"] += 1 video, meta, prompt = _load_item(state["order"], state["idx"]) status = ( f"Submitted ({state['idx']} done). Next →" if record_submitted else f"Skipped. Next →" ) # Reset score + note controls return state, video, meta, prompt, 3, "", status def submit_and_next(state: dict, score: int, note: str): if state is None or state.get("idx") is None: return state, "", "", "", 3, "", "Not logged in." order = state["order"] idx = state["idx"] if idx >= len(order): return state, "", "**All done!**", "", 3, "", "No more items." model, task_id = POOL[order[idx]] record = { "timestamp": time.time(), "timestamp_iso": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()), "annotator": state["annotator"], "process_id": PROCESS_ID, "model": model, "task_id": task_id, "score": int(score), "note": (note or "").strip(), } _append_annotation(record) return _advance(state, record_submitted=True) def skip_and_next(state: dict): if state is None or state.get("idx") is None: return state, "", "", "", 3, "", "Not logged in." return _advance(state, record_submitted=False) # --------------------------------------------------------------------------- # Gradio UI # --------------------------------------------------------------------------- THEME = gr.themes.Soft(primary_hue="indigo") with gr.Blocks(theme=THEME, title="MBench-V Annotation") as demo: gr.Markdown( """ # 🎬 MBench-V Annotation Watch each generated video and rate it **1–5** (5 = best). Click **Submit & Next** to save. Your submissions are auto-committed to the dataset repo every 5 minutes. """ ) session_state = gr.State(value=None) # ---- Login panel ---- with gr.Group(visible=True) as login_panel: with gr.Row(): annotator_in = gr.Textbox( label="Annotator name", placeholder="e.g. alice", scale=4, autofocus=True, ) login_btn = gr.Button("Start annotating", variant="primary", scale=1) # ---- Annotation panel ---- with gr.Group(visible=False) as ann_panel: with gr.Row(): with gr.Column(scale=3): video = gr.Video(label="Generated video", autoplay=True, loop=True) with gr.Column(scale=2): meta_md = gr.Markdown() prompt_tb = gr.Textbox( label="Generation prompt", lines=10, max_lines=20, interactive=False, ) with gr.Column(scale=1): score = gr.Slider(1, 5, value=3, step=1, label="Score (1 worst – 5 best)") note = gr.Textbox(label="Note (optional)", lines=4) submit_btn = gr.Button("✅ Submit & Next", variant="primary") skip_btn = gr.Button("⏭️ Skip") status = gr.Markdown("") # ---- Wiring ---- login_btn.click( start_session, inputs=[annotator_in, session_state], outputs=[session_state, login_panel, ann_panel, video, meta_md, prompt_tb, status], ) annotator_in.submit( start_session, inputs=[annotator_in, session_state], outputs=[session_state, login_panel, ann_panel, video, meta_md, prompt_tb, status], ) submit_btn.click( submit_and_next, inputs=[session_state, score, note], outputs=[session_state, video, meta_md, prompt_tb, score, note, status], ) skip_btn.click( skip_and_next, inputs=[session_state], outputs=[session_state, video, meta_md, prompt_tb, score, note, status], ) if __name__ == "__main__": demo.queue(default_concurrency_limit=8).launch()