Spaces:
Sleeping
Sleeping
| """MBench-V annotation UI (Gradio Space). | |
| Reads videos streaming from the `studyOverflow/TempMemoryData` dataset repo, | |
| writes annotations back to the same repo under `annotations/`, batched via | |
| `CommitScheduler`. | |
| Design notes | |
| ------------ | |
| - Videos are NOT copied into this Space. We build CDN URLs with | |
| `hf_hub_url(..., repo_type="dataset")` and let the browser stream them. | |
| - Submissions are appended to a per-process JSONL file under `annotations/`; | |
| `CommitScheduler` pushes the directory to the dataset repo every 5 min. | |
| - Allocation is intentionally simple in this template: at start-up we build | |
| a single shuffled pool of `(model, task_id)` pairs, and each user session | |
| maintains its own index into that pool. Multi-annotator deduplication is | |
| out of scope for the first iteration. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import random | |
| import time | |
| import uuid | |
| from pathlib import Path | |
| from typing import Any | |
| import gradio as gr | |
| from huggingface_hub import CommitScheduler, hf_hub_download, hf_hub_url | |
| # --------------------------------------------------------------------------- | |
| # Config | |
| # --------------------------------------------------------------------------- | |
| DATASET_REPO = "studyOverflow/TempMemoryData" | |
| MERGED_JSON_PATH = "MBench-V/merged.json" | |
| # 6 models that are already fully reorganized on HF (584 videos each). | |
| # `skyreels` and `longcat` are excluded until their 0422 runs finish. | |
| MODELS: list[str] = [ | |
| "causal_forcing", | |
| "self_forcing", | |
| "cosmos", | |
| "helios", | |
| "longlive", | |
| "memflow", | |
| ] | |
| HF_TOKEN = os.environ.get("HF_TOKEN") # must be set in Space secrets for writes | |
| # Local staging directory that CommitScheduler will sync to the dataset repo. | |
| ANN_DIR = Path("annotations_local") | |
| ANN_DIR.mkdir(exist_ok=True) | |
| # Each Space process writes to its own JSONL so concurrent replicas don't | |
| # clobber each other's writes. `CommitScheduler` pushes the whole directory. | |
| PROCESS_ID = uuid.uuid4().hex[:8] | |
| ANN_FILE = ANN_DIR / f"ann_{PROCESS_ID}.jsonl" | |
| COMMIT_INTERVAL_MIN = 5 | |
| # --------------------------------------------------------------------------- | |
| # Load merged.json (584 task records) once at startup | |
| # --------------------------------------------------------------------------- | |
| def _load_merged() -> list[dict[str, Any]]: | |
| local = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename=MERGED_JSON_PATH, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| ) | |
| with open(local, encoding="utf-8") as f: | |
| return json.load(f) | |
| TASKS: list[dict[str, Any]] = _load_merged() | |
| TASK_BY_ID: dict[str, dict[str, Any]] = {t["task_id"]: t for t in TASKS} | |
| def _extract_prompt(task: dict[str, Any]) -> str: | |
| """Return the first non-empty prompt string found in the task record.""" | |
| gp = task.get("generation_prompts") or {} | |
| prompts = gp.get("prompts") or {} | |
| for level in ("level_1", "level_2", "level_3"): | |
| val = prompts.get(level) | |
| if isinstance(val, list) and val: | |
| return val[0] | |
| if isinstance(val, str) and val: | |
| return val | |
| return "(no prompt found)" | |
| # --------------------------------------------------------------------------- | |
| # Build the (model, task_id) pool | |
| # --------------------------------------------------------------------------- | |
| def _build_pool() -> list[tuple[str, str]]: | |
| pool: list[tuple[str, str]] = [] | |
| for m in MODELS: | |
| for t in TASKS: | |
| pool.append((m, t["task_id"])) | |
| return pool | |
| POOL: list[tuple[str, str]] = _build_pool() | |
| print(f"[mbench-ann] loaded {len(TASKS)} tasks × {len(MODELS)} models = {len(POOL)} items") | |
| def _video_url(model: str, task_id: str) -> str: | |
| return hf_hub_url( | |
| DATASET_REPO, | |
| filename=f"MBench-V/{model}/videos/{task_id}.mp4", | |
| repo_type="dataset", | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # CommitScheduler — pushes annotations_local/ to DATASET_REPO every 5 min | |
| # --------------------------------------------------------------------------- | |
| scheduler: CommitScheduler | None = None | |
| if HF_TOKEN: | |
| scheduler = CommitScheduler( | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| folder_path=str(ANN_DIR), | |
| path_in_repo="annotations", | |
| every=COMMIT_INTERVAL_MIN, | |
| token=HF_TOKEN, | |
| private=False, | |
| squash_history=False, | |
| ) | |
| print(f"[mbench-ann] CommitScheduler started (every {COMMIT_INTERVAL_MIN} min)") | |
| else: | |
| print("[mbench-ann] WARNING: HF_TOKEN not set — annotations will stay local only") | |
| def _append_annotation(record: dict[str, Any]) -> None: | |
| line = json.dumps(record, ensure_ascii=False) | |
| if scheduler is not None: | |
| with scheduler.lock: | |
| with ANN_FILE.open("a", encoding="utf-8") as f: | |
| f.write(line + "\n") | |
| else: | |
| with ANN_FILE.open("a", encoding="utf-8") as f: | |
| f.write(line + "\n") | |
| # --------------------------------------------------------------------------- | |
| # UI helpers | |
| # --------------------------------------------------------------------------- | |
| def _format_meta(model: str, task: dict[str, Any], idx: int, total: int) -> str: | |
| lines = [ | |
| f"**Progress**: {idx + 1} / {total}", | |
| f"**Model**: `{model}`", | |
| f"**task_id**: `{task['task_id']}`", | |
| f"**category**: `{task.get('category', '?')}` • **subcategory**: `{task.get('subcategory', '?')}`", | |
| f"**source_task**: `{task.get('source_task', '?')}`", | |
| ] | |
| if task.get("task_type"): | |
| lines.append(f"**task_type**: `{task['task_type']}`") | |
| return "\n\n".join(lines) | |
| def _load_item(pool_order: list[int], idx: int) -> tuple[str, str, str]: | |
| """Return (video_url, meta_markdown, prompt_text) for position `idx`.""" | |
| if idx < 0 or idx >= len(pool_order): | |
| return "", "**All done!** No more items.", "" | |
| model, task_id = POOL[pool_order[idx]] | |
| task = TASK_BY_ID[task_id] | |
| return ( | |
| _video_url(model, task_id), | |
| _format_meta(model, task, idx, len(pool_order)), | |
| _extract_prompt(task), | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Gradio callbacks | |
| # --------------------------------------------------------------------------- | |
| def start_session(annotator: str, state: dict | None): | |
| annotator = (annotator or "").strip() | |
| if not annotator: | |
| return ( | |
| state, | |
| gr.update(visible=True), # login panel stays | |
| gr.update(visible=False), # annotation panel hidden | |
| "", | |
| "", | |
| "", | |
| gr.update(value="Please enter a name first."), | |
| ) | |
| # Build this user's shuffled order | |
| order = list(range(len(POOL))) | |
| rng = random.Random(f"{annotator}-{int(time.time())}") | |
| rng.shuffle(order) | |
| state = {"annotator": annotator, "order": order, "idx": 0} | |
| video, meta, prompt = _load_item(order, 0) | |
| return ( | |
| state, | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| video, | |
| meta, | |
| prompt, | |
| gr.update(value=f"Logged in as `{annotator}`"), | |
| ) | |
| def _advance(state: dict, record_submitted: bool): | |
| state["idx"] += 1 | |
| video, meta, prompt = _load_item(state["order"], state["idx"]) | |
| status = ( | |
| f"Submitted ({state['idx']} done). Next →" | |
| if record_submitted | |
| else f"Skipped. Next →" | |
| ) | |
| # Reset score + note controls | |
| return state, video, meta, prompt, 3, "", status | |
| def submit_and_next(state: dict, score: int, note: str): | |
| if state is None or state.get("idx") is None: | |
| return state, "", "", "", 3, "", "Not logged in." | |
| order = state["order"] | |
| idx = state["idx"] | |
| if idx >= len(order): | |
| return state, "", "**All done!**", "", 3, "", "No more items." | |
| model, task_id = POOL[order[idx]] | |
| record = { | |
| "timestamp": time.time(), | |
| "timestamp_iso": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()), | |
| "annotator": state["annotator"], | |
| "process_id": PROCESS_ID, | |
| "model": model, | |
| "task_id": task_id, | |
| "score": int(score), | |
| "note": (note or "").strip(), | |
| } | |
| _append_annotation(record) | |
| return _advance(state, record_submitted=True) | |
| def skip_and_next(state: dict): | |
| if state is None or state.get("idx") is None: | |
| return state, "", "", "", 3, "", "Not logged in." | |
| return _advance(state, record_submitted=False) | |
| # --------------------------------------------------------------------------- | |
| # Gradio UI | |
| # --------------------------------------------------------------------------- | |
| THEME = gr.themes.Soft(primary_hue="indigo") | |
| with gr.Blocks(theme=THEME, title="MBench-V Annotation") as demo: | |
| gr.Markdown( | |
| """ | |
| # 🎬 MBench-V Annotation | |
| Watch each generated video and rate it **1–5** (5 = best). Click **Submit & Next** to save. | |
| Your submissions are auto-committed to the dataset repo every 5 minutes. | |
| """ | |
| ) | |
| session_state = gr.State(value=None) | |
| # ---- Login panel ---- | |
| with gr.Group(visible=True) as login_panel: | |
| with gr.Row(): | |
| annotator_in = gr.Textbox( | |
| label="Annotator name", placeholder="e.g. alice", | |
| scale=4, autofocus=True, | |
| ) | |
| login_btn = gr.Button("Start annotating", variant="primary", scale=1) | |
| # ---- Annotation panel ---- | |
| with gr.Group(visible=False) as ann_panel: | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| video = gr.Video(label="Generated video", autoplay=True, loop=True) | |
| with gr.Column(scale=2): | |
| meta_md = gr.Markdown() | |
| prompt_tb = gr.Textbox( | |
| label="Generation prompt", | |
| lines=10, max_lines=20, interactive=False, | |
| ) | |
| with gr.Column(scale=1): | |
| score = gr.Slider(1, 5, value=3, step=1, label="Score (1 worst – 5 best)") | |
| note = gr.Textbox(label="Note (optional)", lines=4) | |
| submit_btn = gr.Button("✅ Submit & Next", variant="primary") | |
| skip_btn = gr.Button("⏭️ Skip") | |
| status = gr.Markdown("") | |
| # ---- Wiring ---- | |
| login_btn.click( | |
| start_session, | |
| inputs=[annotator_in, session_state], | |
| outputs=[session_state, login_panel, ann_panel, video, meta_md, prompt_tb, status], | |
| ) | |
| annotator_in.submit( | |
| start_session, | |
| inputs=[annotator_in, session_state], | |
| outputs=[session_state, login_panel, ann_panel, video, meta_md, prompt_tb, status], | |
| ) | |
| submit_btn.click( | |
| submit_and_next, | |
| inputs=[session_state, score, note], | |
| outputs=[session_state, video, meta_md, prompt_tb, score, note, status], | |
| ) | |
| skip_btn.click( | |
| skip_and_next, | |
| inputs=[session_state], | |
| outputs=[session_state, video, meta_md, prompt_tb, score, note, status], | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue(default_concurrency_limit=8).launch() | |