import json import os import re import shutil import tempfile import zipfile from datetime import datetime, timezone from pathlib import Path from typing import Any import gradio as gr import numpy as np import pandas as pd from huggingface_hub import HfApi, snapshot_download from constants import ( ALL_COLUMNS, LEADERBOARD_INTRO, LEADERBOARD_REPO, LOCAL_LEADERBOARD_DIR, METRIC_COLUMNS, MODEL_TYPE_CHOICES, RESULTS_CSV, SUBMIT_INTRO, ) from scripts.validate_submission import validate_submission_json SPACE_ROOT = Path(__file__).resolve().parent LOCAL_LEADERBOARD_PATH = Path(LOCAL_LEADERBOARD_DIR).resolve() RESULTS_PATH = Path(RESULTS_CSV).resolve() SEED_RESULTS_PATH = SPACE_ROOT / "seed" / "results.csv" PENDING_DIR = LOCAL_LEADERBOARD_PATH / "submissions" / "pending" VERIFIED_DIR = LOCAL_LEADERBOARD_PATH / "submissions" / "verified" NUMERIC_COLUMNS = [ "Total M-Score", "Entity Score", "Environment Score", "Causal Score", *METRIC_COLUMNS, ] UPLOAD_ALLOW_PATTERNS = ["results.csv", "submissions/**"] DEFAULT_VISIBLE_METRICS: list[str] = METRIC_COLUMNS.copy() DISPLAY_INFO_COLUMNS = [ "Rank", "Model Name", "Model Type", "Total M-Score", "Entity Score", "Environment Score", "Causal Score", "Certification", "Accessibility", "Date", "Model Link", "Sampled by", "Evaluated by", ] def empty_results() -> pd.DataFrame: return pd.DataFrame(columns=ALL_COLUMNS) def ensure_columns(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() for column in ALL_COLUMNS: if column not in df.columns: df[column] = 0 if column in NUMERIC_COLUMNS or column == "Rank" else "" return df[ALL_COLUMNS] def clean_numeric_columns(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() for column in NUMERIC_COLUMNS: df[column] = pd.to_numeric(df[column], errors="coerce").fillna(0.0) return df def read_results_csv(path: Path) -> pd.DataFrame: df = pd.read_csv(path) return clean_numeric_columns(ensure_columns(df)) def load_seed_results(reason: str) -> tuple[pd.DataFrame, str]: if SEED_RESULTS_PATH.exists(): try: df = read_results_csv(SEED_RESULTS_PATH) return ( df, f"{reason}\n\nUsing bundled seed results from Table 2 of the MBench paper.", ) except Exception as exc: return empty_results(), f"{reason}\n\nCould not read bundled seed results: {exc}" return empty_results(), f"{reason}\n\nBundled seed results are missing." def load_remote_results() -> tuple[pd.DataFrame, str]: try: snapshot_download( repo_id=LEADERBOARD_REPO, repo_type="dataset", local_dir=str(LOCAL_LEADERBOARD_PATH), local_dir_use_symlinks=False, ) except Exception as exc: message = ( "Leaderboard data is not available yet. Please run " "`python scripts/upload_seed_results.py` after setting `HF_TOKEN`." f"\n\nDetails: {exc}" ) return load_seed_results(message) if not RESULTS_PATH.exists(): message = ( "`results.csv` was not found in the leaderboard data repo. Please run " "`python scripts/upload_seed_results.py` to initialize it." ) return load_seed_results(message) try: df = read_results_csv(RESULTS_PATH) except Exception as exc: return load_seed_results(f"Could not read `results.csv`: {exc}") return df, f"Loaded results from `{LEADERBOARD_REPO}`." def prepare_leaderboard( model_type: str, selected_metrics: list[str] | None, ) -> tuple[pd.DataFrame, str]: df, status = load_remote_results() if model_type and model_type != "All" and not df.empty: df = df[df["Model Type"] == model_type].copy() if not df.empty: df = df.sort_values( by="Total M-Score", ascending=False, kind="mergesort", ).reset_index(drop=True) df["Rank"] = np.arange(1, len(df) + 1) metrics = [metric for metric in METRIC_COLUMNS if metric in (selected_metrics or [])] columns = DISPLAY_INFO_COLUMNS[:6] + metrics + DISPLAY_INFO_COLUMNS[6:] return df[columns], status def sanitize_filename(value: str) -> str: cleaned = re.sub(r"[^A-Za-z0-9_.-]+", "_", value.strip()) cleaned = cleaned.strip("._-") return cleaned or "model" def get_uploaded_path(file_obj: Any) -> Path | None: if file_obj is None: return None if isinstance(file_obj, (str, os.PathLike)): return Path(file_obj) if isinstance(file_obj, dict): path = file_obj.get("path") or file_obj.get("name") return Path(path) if path else None name = getattr(file_obj, "name", None) return Path(name) if name else None def safe_extract_zip(zip_path: Path, target_dir: Path) -> None: target_root = target_dir.resolve() with zipfile.ZipFile(zip_path, "r") as zip_ref: for member in zip_ref.infolist(): member_path = (target_root / member.filename).resolve() try: member_path.relative_to(target_root) except ValueError: raise ValueError("ZIP contains an unsafe path.") zip_ref.extractall(target_root) def read_first_json_from_zip(zip_path: Path) -> dict: if not zipfile.is_zipfile(zip_path): raise ValueError("Uploaded file must be a valid ZIP archive.") with tempfile.TemporaryDirectory(prefix="mbench_submission_") as tmp_dir: extract_dir = Path(tmp_dir) safe_extract_zip(zip_path, extract_dir) json_files = sorted( path for path in extract_dir.rglob("*.json") if path.is_file() ) if not json_files: raise ValueError("No JSON file found inside the ZIP archive.") with json_files[0].open("r", encoding="utf-8") as handle: data = json.load(handle) if not isinstance(data, dict): raise ValueError("The first JSON file must contain a JSON object.") return data def require_text(value: str, label: str) -> str: if value is None or not str(value).strip(): raise ValueError(f"{label} is required.") return str(value).strip() def ensure_submission_dirs() -> None: PENDING_DIR.mkdir(parents=True, exist_ok=True) VERIFIED_DIR.mkdir(parents=True, exist_ok=True) (PENDING_DIR / ".gitkeep").touch(exist_ok=True) (VERIFIED_DIR / ".gitkeep").touch(exist_ok=True) def ensure_local_results_file() -> None: if RESULTS_PATH.exists() or not SEED_RESULTS_PATH.exists(): return RESULTS_PATH.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(SEED_RESULTS_PATH, RESULTS_PATH) def save_pending_submission( zip_path: Path, result_json: dict, model_name: str, model_link: str, team_name: str, contact_email: str, model_type: str, accessibility: str, ) -> tuple[Path, Path]: ensure_submission_dirs() timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") safe_model_name = sanitize_filename(model_name) stem = f"{timestamp}_{safe_model_name}" payload = { "submitted_at_utc": datetime.now(timezone.utc).isoformat(), "status": "pending", "model_name": model_name, "model_link": model_link, "team_name": team_name, "contact_email": contact_email, "model_type": model_type, "accessibility": accessibility, "result_json": result_json, } json_path = PENDING_DIR / f"{stem}.json" raw_zip_path = PENDING_DIR / f"{stem}.zip" json_path.write_text( json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8", ) shutil.copyfile(zip_path, raw_zip_path) return json_path, raw_zip_path def upload_local_leaderboard(token: str, model_name: str) -> None: api = HfApi(token=token) api.create_repo(repo_id=LEADERBOARD_REPO, repo_type="dataset", exist_ok=True) ensure_local_results_file() api.upload_folder( folder_path=str(LOCAL_LEADERBOARD_PATH), repo_id=LEADERBOARD_REPO, repo_type="dataset", allow_patterns=UPLOAD_ALLOW_PATTERNS, commit_message=f"Add pending MBench submission for {model_name}", ) def submit_result( zip_file: Any, model_name: str, model_link: str, team_name: str, contact_email: str, model_type: str, accessibility: str, ) -> str: token = os.environ.get("HF_TOKEN") if not token: return "HF_TOKEN is not set. Please add it in Space Settings -> Secrets." try: model_name = require_text(model_name, "Model name") model_link = require_text(model_link, "Model link") contact_email = require_text(contact_email, "Contact email") team_name = str(team_name or "").strip() accessibility = str(accessibility or "Unknown").strip() if model_type not in MODEL_TYPE_CHOICES[1:]: raise ValueError("Model type must be text-conditioned or action-conditioned.") zip_path = get_uploaded_path(zip_file) if zip_path is None or not zip_path.exists(): raise ValueError("Please upload a ZIP file.") result_json = read_first_json_from_zip(zip_path) ok, message = validate_submission_json(result_json) if not ok: raise ValueError(message) # Refresh the local dataset checkout before adding the pending submission. try: snapshot_download( repo_id=LEADERBOARD_REPO, repo_type="dataset", local_dir=str(LOCAL_LEADERBOARD_PATH), token=token, local_dir_use_symlinks=False, ) except Exception: ensure_submission_dirs() ensure_local_results_file() save_pending_submission( zip_path=zip_path, result_json=result_json, model_name=model_name, model_link=model_link, team_name=team_name, contact_email=contact_email, model_type=model_type, accessibility=accessibility, ) upload_local_leaderboard(token, model_name) except Exception as exc: return f"Submission failed: {exc}" return "Submission received. It is pending official verification." def about_markdown() -> str: return """ # About MBench MBench is a benchmark for evaluating the memory capability of video world models. It focuses on whether a model can preserve a coherent world state across long-horizon video continuation and interaction. The benchmark is organized around three core memory dimensions: - **Entity Consistency:** persistent object and human identity, geometry, texture, and appearance. - **Environment Consistency:** stable spatial layout, reprojection behavior, lighting, and style. - **Causal Consistency:** reliable state evolution and interaction consequences over time. *Note: Submitted leaderboard results are not automatically shown. They are saved as 'pending' and only shown here after official verification by the MBench team.* """ def build_header_html() -> str: return f"""
Video World Model Memory Benchmark

🏆 MBench Leaderboard

MBench evaluates the memory capability of video world models, focusing on whether a model can preserve a coherent world state across long-horizon video continuation and interaction.
Here we display official leaderboard scores loaded from {LEADERBOARD_REPO}.

""" def build_summary_html(df: pd.DataFrame) -> str: if df.empty: return """
Models0
Top M-Score-
Text-conditioned0
Action-conditioned0
""" top_score = pd.to_numeric(df["Total M-Score"], errors="coerce").max() text_count = int((df["Model Type"] == "text-conditioned").sum()) action_count = int((df["Model Type"] == "action-conditioned").sum()) return f"""
Models{len(df)}
Top M-Score{top_score:.2f}
Text-conditioned{text_count}
Action-conditioned{action_count}
""" def build_demo() -> gr.Blocks: initial_df, initial_status = prepare_leaderboard("All", DEFAULT_VISIBLE_METRICS) css = """ .mbench-hero { padding-bottom: 20px; border-bottom: 1px solid #eaeaea; margin-bottom: 20px; } .mbench-kicker { font-size: 12px; font-weight: bold; text-transform: uppercase; color: #888; margin-bottom: 5px; } .mbench-hero h1 { font-size: 2.5rem; margin: 0 0 10px 0; font-weight: 800; } .mbench-hero p { font-size: 1rem; color: #444; margin: 0 0 15px 0; max-width: 800px; } .mbench-links { display: flex; gap: 10px; flex-wrap: wrap; } .mbench-links a { text-decoration: none; padding: 6px 12px; border: 1px solid #ddd; background: #fafafa; border-radius: 6px; color: #333; font-weight: 500; } .mbench-links a:hover { background: #eee; } /* Decrease line height in the dataframe */ #leaderboard-table table td, #leaderboard-table table th { padding: 6px 10px !important; line-height: 1.3 !important; } #leaderboard-table table th:nth-child(2), #leaderboard-table table td:nth-child(2) { min-width: 240px !important; max-width: 320px !important; white-space: normal !important; word-break: break-word !important; } /* Model Type */ #leaderboard-table table th:nth-child(3), #leaderboard-table table td:nth-child(3) { min-width: 140px !important; max-width: 160px !important; white-space: normal !important; } /* Total M-Score */ #leaderboard-table table th:nth-child(4), #leaderboard-table table td:nth-child(4) { min-width: 130px !important; font-weight: 600 !important; } #controls-row { align-items: end; } .toggle-btn { margin-bottom: 2px !important; } /* Make the whole column header clickable for sorting */ #leaderboard-table table th { position: relative; } #leaderboard-table table th .sort-button::after { content: ""; position: absolute; top: 0; left: 0; width: 100%; height: 100%; cursor: pointer; } /* Prevent the descending sort class from shrinking the clickable area by forming a new containing block */ #leaderboard-table table th .sort-button.des { transform: none !important; } #leaderboard-table table th .sort-button.des svg { transform: scaleY(-1); } """ with gr.Blocks( title="MBench Leaderboard", theme=gr.themes.Default(primary_hue="purple"), css=css, ) as demo: with gr.Tab("Leaderboard"): gr.HTML(build_header_html()) gr.Markdown(about_markdown()) status = gr.Markdown(initial_status, elem_id="status-line") with gr.Row(elem_id="controls-row"): with gr.Column(scale=5): model_type_filter = gr.Radio( choices=MODEL_TYPE_CHOICES, value="All", label="Model Type (Filter)", ) metric_selector = gr.CheckboxGroup( choices=METRIC_COLUMNS, value=DEFAULT_VISIBLE_METRICS, label="Detailed Metrics (Select to show in table)", ) with gr.Column(scale=1, min_width=120): toggle_metrics_btn = gr.Button("✗ Deselect All", size="sm", elem_classes=["toggle-btn"]) refresh_button = gr.Button("↻ Refresh", size="sm") def toggle_metrics(current): if len(current) == len(METRIC_COLUMNS): return gr.update(value=[]), "✓ Select All" else: return gr.update(value=METRIC_COLUMNS), "✗ Deselect All" toggle_metrics_btn.click( fn=toggle_metrics, inputs=[metric_selector], outputs=[metric_selector, toggle_metrics_btn], ) leaderboard_table = gr.Dataframe( value=initial_df, label="MBench Results", interactive=False, wrap=True, height=560, elem_id="leaderboard-table", ) refresh_button.click( fn=prepare_leaderboard, inputs=[model_type_filter, metric_selector], outputs=[leaderboard_table, status], api_name="refresh_leaderboard", ) model_type_filter.change( fn=prepare_leaderboard, inputs=[model_type_filter, metric_selector], outputs=[leaderboard_table, status], api_name=False, ) metric_selector.change( fn=prepare_leaderboard, inputs=[model_type_filter, metric_selector], outputs=[leaderboard_table, status], api_name=False, ) with gr.Tab("Submit"): gr.Markdown(SUBMIT_INTRO, elem_id="submit-intro") with gr.Row(elem_id="submit-panel"): with gr.Column(): zip_input = gr.File( label="Submission ZIP", file_types=[".zip"], type="filepath", ) model_name_input = gr.Textbox(label="Model Name") model_link_input = gr.Textbox(label="Model Link") team_name_input = gr.Textbox(label="Team Name") contact_email_input = gr.Textbox(label="Contact Email") model_type_input = gr.Dropdown( choices=MODEL_TYPE_CHOICES[1:], value="text-conditioned", label="Model Type", ) accessibility_input = gr.Dropdown( choices=[ "Open weights", "API only", "Closed", "Research preview", "Unknown", ], value="Unknown", label="Accessibility", ) submit_button = gr.Button( "Submit", variant="primary", elem_id="submit-button", ) with gr.Column(): submit_status = gr.Markdown() submit_button.click( fn=submit_result, inputs=[ zip_input, model_name_input, model_link_input, team_name_input, contact_email_input, model_type_input, accessibility_input, ], outputs=submit_status, api_name=False, ) return demo demo = build_demo() if __name__ == "__main__": demo.launch(show_api=True)