studyOverflow's picture
init: minimal Gradio annotation template (6 models, 3504 items)
1357e34 verified
raw
history blame
11 kB
"""MBench-V annotation UI (Gradio Space).
Reads videos streaming from the `studyOverflow/TempMemoryData` dataset repo,
writes annotations back to the same repo under `annotations/`, batched via
`CommitScheduler`.
Design notes
------------
- Videos are NOT copied into this Space. We build CDN URLs with
`hf_hub_url(..., repo_type="dataset")` and let the browser stream them.
- Submissions are appended to a per-process JSONL file under `annotations/`;
`CommitScheduler` pushes the directory to the dataset repo every 5 min.
- Allocation is intentionally simple in this template: at start-up we build
a single shuffled pool of `(model, task_id)` pairs, and each user session
maintains its own index into that pool. Multi-annotator deduplication is
out of scope for the first iteration.
"""
from __future__ import annotations
import json
import os
import random
import time
import uuid
from pathlib import Path
from typing import Any
import gradio as gr
from huggingface_hub import CommitScheduler, hf_hub_download, hf_hub_url
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DATASET_REPO = "studyOverflow/TempMemoryData"
MERGED_JSON_PATH = "MBench-V/merged.json"
# 6 models that are already fully reorganized on HF (584 videos each).
# `skyreels` and `longcat` are excluded until their 0422 runs finish.
MODELS: list[str] = [
"causal_forcing",
"self_forcing",
"cosmos",
"helios",
"longlive",
"memflow",
]
HF_TOKEN = os.environ.get("HF_TOKEN") # must be set in Space secrets for writes
# Local staging directory that CommitScheduler will sync to the dataset repo.
ANN_DIR = Path("annotations_local")
ANN_DIR.mkdir(exist_ok=True)
# Each Space process writes to its own JSONL so concurrent replicas don't
# clobber each other's writes. `CommitScheduler` pushes the whole directory.
PROCESS_ID = uuid.uuid4().hex[:8]
ANN_FILE = ANN_DIR / f"ann_{PROCESS_ID}.jsonl"
COMMIT_INTERVAL_MIN = 5
# ---------------------------------------------------------------------------
# Load merged.json (584 task records) once at startup
# ---------------------------------------------------------------------------
def _load_merged() -> list[dict[str, Any]]:
local = hf_hub_download(
repo_id=DATASET_REPO,
filename=MERGED_JSON_PATH,
repo_type="dataset",
token=HF_TOKEN,
)
with open(local, encoding="utf-8") as f:
return json.load(f)
TASKS: list[dict[str, Any]] = _load_merged()
TASK_BY_ID: dict[str, dict[str, Any]] = {t["task_id"]: t for t in TASKS}
def _extract_prompt(task: dict[str, Any]) -> str:
"""Return the first non-empty prompt string found in the task record."""
gp = task.get("generation_prompts") or {}
prompts = gp.get("prompts") or {}
for level in ("level_1", "level_2", "level_3"):
val = prompts.get(level)
if isinstance(val, list) and val:
return val[0]
if isinstance(val, str) and val:
return val
return "(no prompt found)"
# ---------------------------------------------------------------------------
# Build the (model, task_id) pool
# ---------------------------------------------------------------------------
def _build_pool() -> list[tuple[str, str]]:
pool: list[tuple[str, str]] = []
for m in MODELS:
for t in TASKS:
pool.append((m, t["task_id"]))
return pool
POOL: list[tuple[str, str]] = _build_pool()
print(f"[mbench-ann] loaded {len(TASKS)} tasks × {len(MODELS)} models = {len(POOL)} items")
def _video_url(model: str, task_id: str) -> str:
return hf_hub_url(
DATASET_REPO,
filename=f"MBench-V/{model}/videos/{task_id}.mp4",
repo_type="dataset",
)
# ---------------------------------------------------------------------------
# CommitScheduler — pushes annotations_local/ to DATASET_REPO every 5 min
# ---------------------------------------------------------------------------
scheduler: CommitScheduler | None = None
if HF_TOKEN:
scheduler = CommitScheduler(
repo_id=DATASET_REPO,
repo_type="dataset",
folder_path=str(ANN_DIR),
path_in_repo="annotations",
every=COMMIT_INTERVAL_MIN,
token=HF_TOKEN,
private=False,
squash_history=False,
)
print(f"[mbench-ann] CommitScheduler started (every {COMMIT_INTERVAL_MIN} min)")
else:
print("[mbench-ann] WARNING: HF_TOKEN not set — annotations will stay local only")
def _append_annotation(record: dict[str, Any]) -> None:
line = json.dumps(record, ensure_ascii=False)
if scheduler is not None:
with scheduler.lock:
with ANN_FILE.open("a", encoding="utf-8") as f:
f.write(line + "\n")
else:
with ANN_FILE.open("a", encoding="utf-8") as f:
f.write(line + "\n")
# ---------------------------------------------------------------------------
# UI helpers
# ---------------------------------------------------------------------------
def _format_meta(model: str, task: dict[str, Any], idx: int, total: int) -> str:
lines = [
f"**Progress**: {idx + 1} / {total}",
f"**Model**: `{model}`",
f"**task_id**: `{task['task_id']}`",
f"**category**: `{task.get('category', '?')}` • **subcategory**: `{task.get('subcategory', '?')}`",
f"**source_task**: `{task.get('source_task', '?')}`",
]
if task.get("task_type"):
lines.append(f"**task_type**: `{task['task_type']}`")
return "\n\n".join(lines)
def _load_item(pool_order: list[int], idx: int) -> tuple[str, str, str]:
"""Return (video_url, meta_markdown, prompt_text) for position `idx`."""
if idx < 0 or idx >= len(pool_order):
return "", "**All done!** No more items.", ""
model, task_id = POOL[pool_order[idx]]
task = TASK_BY_ID[task_id]
return (
_video_url(model, task_id),
_format_meta(model, task, idx, len(pool_order)),
_extract_prompt(task),
)
# ---------------------------------------------------------------------------
# Gradio callbacks
# ---------------------------------------------------------------------------
def start_session(annotator: str, state: dict | None):
annotator = (annotator or "").strip()
if not annotator:
return (
state,
gr.update(visible=True), # login panel stays
gr.update(visible=False), # annotation panel hidden
"",
"",
"",
gr.update(value="Please enter a name first."),
)
# Build this user's shuffled order
order = list(range(len(POOL)))
rng = random.Random(f"{annotator}-{int(time.time())}")
rng.shuffle(order)
state = {"annotator": annotator, "order": order, "idx": 0}
video, meta, prompt = _load_item(order, 0)
return (
state,
gr.update(visible=False),
gr.update(visible=True),
video,
meta,
prompt,
gr.update(value=f"Logged in as `{annotator}`"),
)
def _advance(state: dict, record_submitted: bool):
state["idx"] += 1
video, meta, prompt = _load_item(state["order"], state["idx"])
status = (
f"Submitted ({state['idx']} done). Next →"
if record_submitted
else f"Skipped. Next →"
)
# Reset score + note controls
return state, video, meta, prompt, 3, "", status
def submit_and_next(state: dict, score: int, note: str):
if state is None or state.get("idx") is None:
return state, "", "", "", 3, "", "Not logged in."
order = state["order"]
idx = state["idx"]
if idx >= len(order):
return state, "", "**All done!**", "", 3, "", "No more items."
model, task_id = POOL[order[idx]]
record = {
"timestamp": time.time(),
"timestamp_iso": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()),
"annotator": state["annotator"],
"process_id": PROCESS_ID,
"model": model,
"task_id": task_id,
"score": int(score),
"note": (note or "").strip(),
}
_append_annotation(record)
return _advance(state, record_submitted=True)
def skip_and_next(state: dict):
if state is None or state.get("idx") is None:
return state, "", "", "", 3, "", "Not logged in."
return _advance(state, record_submitted=False)
# ---------------------------------------------------------------------------
# Gradio UI
# ---------------------------------------------------------------------------
THEME = gr.themes.Soft(primary_hue="indigo")
with gr.Blocks(theme=THEME, title="MBench-V Annotation") as demo:
gr.Markdown(
"""
# 🎬 MBench-V Annotation
Watch each generated video and rate it **1–5** (5 = best). Click **Submit & Next** to save.
Your submissions are auto-committed to the dataset repo every 5 minutes.
"""
)
session_state = gr.State(value=None)
# ---- Login panel ----
with gr.Group(visible=True) as login_panel:
with gr.Row():
annotator_in = gr.Textbox(
label="Annotator name", placeholder="e.g. alice",
scale=4, autofocus=True,
)
login_btn = gr.Button("Start annotating", variant="primary", scale=1)
# ---- Annotation panel ----
with gr.Group(visible=False) as ann_panel:
with gr.Row():
with gr.Column(scale=3):
video = gr.Video(label="Generated video", autoplay=True, loop=True)
with gr.Column(scale=2):
meta_md = gr.Markdown()
prompt_tb = gr.Textbox(
label="Generation prompt",
lines=10, max_lines=20, interactive=False,
)
with gr.Column(scale=1):
score = gr.Slider(1, 5, value=3, step=1, label="Score (1 worst – 5 best)")
note = gr.Textbox(label="Note (optional)", lines=4)
submit_btn = gr.Button("✅ Submit & Next", variant="primary")
skip_btn = gr.Button("⏭️ Skip")
status = gr.Markdown("")
# ---- Wiring ----
login_btn.click(
start_session,
inputs=[annotator_in, session_state],
outputs=[session_state, login_panel, ann_panel, video, meta_md, prompt_tb, status],
)
annotator_in.submit(
start_session,
inputs=[annotator_in, session_state],
outputs=[session_state, login_panel, ann_panel, video, meta_md, prompt_tb, status],
)
submit_btn.click(
submit_and_next,
inputs=[session_state, score, note],
outputs=[session_state, video, meta_md, prompt_tb, score, note, status],
)
skip_btn.click(
skip_and_next,
inputs=[session_state],
outputs=[session_state, video, meta_md, prompt_tb, score, note, status],
)
if __name__ == "__main__":
demo.queue(default_concurrency_limit=8).launch()