import json
import os
import re
import shutil
import tempfile
import zipfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import gradio as gr
import numpy as np
import pandas as pd
from huggingface_hub import HfApi, snapshot_download
from constants import (
ALL_COLUMNS,
LEADERBOARD_INTRO,
LEADERBOARD_REPO,
LOCAL_LEADERBOARD_DIR,
METRIC_COLUMNS,
MODEL_TYPE_CHOICES,
RESULTS_CSV,
SUBMIT_INTRO,
)
from scripts.validate_submission import validate_submission_json
SPACE_ROOT = Path(__file__).resolve().parent
LOCAL_LEADERBOARD_PATH = Path(LOCAL_LEADERBOARD_DIR).resolve()
RESULTS_PATH = Path(RESULTS_CSV).resolve()
SEED_RESULTS_PATH = SPACE_ROOT / "seed" / "results.csv"
PENDING_DIR = LOCAL_LEADERBOARD_PATH / "submissions" / "pending"
VERIFIED_DIR = LOCAL_LEADERBOARD_PATH / "submissions" / "verified"
NUMERIC_COLUMNS = [
"Total M-Score",
"Entity Score",
"Environment Score",
"Causal Score",
*METRIC_COLUMNS,
]
UPLOAD_ALLOW_PATTERNS = ["results.csv", "submissions/**"]
DEFAULT_VISIBLE_METRICS: list[str] = METRIC_COLUMNS.copy()
DISPLAY_INFO_COLUMNS = [
"Rank",
"Model Name",
"Model Type",
"Total M-Score",
"Entity Score",
"Environment Score",
"Causal Score",
"Certification",
"Accessibility",
"Date",
"Model Link",
"Sampled by",
"Evaluated by",
]
def empty_results() -> pd.DataFrame:
return pd.DataFrame(columns=ALL_COLUMNS)
def ensure_columns(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
for column in ALL_COLUMNS:
if column not in df.columns:
df[column] = 0 if column in NUMERIC_COLUMNS or column == "Rank" else ""
return df[ALL_COLUMNS]
def clean_numeric_columns(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
for column in NUMERIC_COLUMNS:
df[column] = pd.to_numeric(df[column], errors="coerce").fillna(0.0)
return df
def read_results_csv(path: Path) -> pd.DataFrame:
df = pd.read_csv(path)
return clean_numeric_columns(ensure_columns(df))
def load_seed_results(reason: str) -> tuple[pd.DataFrame, str]:
if SEED_RESULTS_PATH.exists():
try:
df = read_results_csv(SEED_RESULTS_PATH)
return (
df,
f"{reason}\n\nUsing bundled seed results from Table 2 of the MBench paper.",
)
except Exception as exc:
return empty_results(), f"{reason}\n\nCould not read bundled seed results: {exc}"
return empty_results(), f"{reason}\n\nBundled seed results are missing."
def load_remote_results() -> tuple[pd.DataFrame, str]:
try:
snapshot_download(
repo_id=LEADERBOARD_REPO,
repo_type="dataset",
local_dir=str(LOCAL_LEADERBOARD_PATH),
local_dir_use_symlinks=False,
)
except Exception as exc:
message = (
"Leaderboard data is not available yet. Please run "
"`python scripts/upload_seed_results.py` after setting `HF_TOKEN`."
f"\n\nDetails: {exc}"
)
return load_seed_results(message)
if not RESULTS_PATH.exists():
message = (
"`results.csv` was not found in the leaderboard data repo. Please run "
"`python scripts/upload_seed_results.py` to initialize it."
)
return load_seed_results(message)
try:
df = read_results_csv(RESULTS_PATH)
except Exception as exc:
return load_seed_results(f"Could not read `results.csv`: {exc}")
return df, f"Loaded results from `{LEADERBOARD_REPO}`."
def prepare_leaderboard(
model_type: str,
selected_metrics: list[str] | None,
) -> tuple[pd.DataFrame, str]:
df, status = load_remote_results()
if model_type and model_type != "All" and not df.empty:
df = df[df["Model Type"] == model_type].copy()
if not df.empty:
df = df.sort_values(
by="Total M-Score",
ascending=False,
kind="mergesort",
).reset_index(drop=True)
df["Rank"] = np.arange(1, len(df) + 1)
metrics = [metric for metric in METRIC_COLUMNS if metric in (selected_metrics or [])]
columns = DISPLAY_INFO_COLUMNS[:6] + metrics + DISPLAY_INFO_COLUMNS[6:]
return df[columns], status
def sanitize_filename(value: str) -> str:
cleaned = re.sub(r"[^A-Za-z0-9_.-]+", "_", value.strip())
cleaned = cleaned.strip("._-")
return cleaned or "model"
def get_uploaded_path(file_obj: Any) -> Path | None:
if file_obj is None:
return None
if isinstance(file_obj, (str, os.PathLike)):
return Path(file_obj)
if isinstance(file_obj, dict):
path = file_obj.get("path") or file_obj.get("name")
return Path(path) if path else None
name = getattr(file_obj, "name", None)
return Path(name) if name else None
def safe_extract_zip(zip_path: Path, target_dir: Path) -> None:
target_root = target_dir.resolve()
with zipfile.ZipFile(zip_path, "r") as zip_ref:
for member in zip_ref.infolist():
member_path = (target_root / member.filename).resolve()
try:
member_path.relative_to(target_root)
except ValueError:
raise ValueError("ZIP contains an unsafe path.")
zip_ref.extractall(target_root)
def read_first_json_from_zip(zip_path: Path) -> dict:
if not zipfile.is_zipfile(zip_path):
raise ValueError("Uploaded file must be a valid ZIP archive.")
with tempfile.TemporaryDirectory(prefix="mbench_submission_") as tmp_dir:
extract_dir = Path(tmp_dir)
safe_extract_zip(zip_path, extract_dir)
json_files = sorted(
path for path in extract_dir.rglob("*.json") if path.is_file()
)
if not json_files:
raise ValueError("No JSON file found inside the ZIP archive.")
with json_files[0].open("r", encoding="utf-8") as handle:
data = json.load(handle)
if not isinstance(data, dict):
raise ValueError("The first JSON file must contain a JSON object.")
return data
def require_text(value: str, label: str) -> str:
if value is None or not str(value).strip():
raise ValueError(f"{label} is required.")
return str(value).strip()
def ensure_submission_dirs() -> None:
PENDING_DIR.mkdir(parents=True, exist_ok=True)
VERIFIED_DIR.mkdir(parents=True, exist_ok=True)
(PENDING_DIR / ".gitkeep").touch(exist_ok=True)
(VERIFIED_DIR / ".gitkeep").touch(exist_ok=True)
def ensure_local_results_file() -> None:
if RESULTS_PATH.exists() or not SEED_RESULTS_PATH.exists():
return
RESULTS_PATH.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(SEED_RESULTS_PATH, RESULTS_PATH)
def save_pending_submission(
zip_path: Path,
result_json: dict,
model_name: str,
model_link: str,
team_name: str,
contact_email: str,
model_type: str,
accessibility: str,
) -> tuple[Path, Path]:
ensure_submission_dirs()
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
safe_model_name = sanitize_filename(model_name)
stem = f"{timestamp}_{safe_model_name}"
payload = {
"submitted_at_utc": datetime.now(timezone.utc).isoformat(),
"status": "pending",
"model_name": model_name,
"model_link": model_link,
"team_name": team_name,
"contact_email": contact_email,
"model_type": model_type,
"accessibility": accessibility,
"result_json": result_json,
}
json_path = PENDING_DIR / f"{stem}.json"
raw_zip_path = PENDING_DIR / f"{stem}.zip"
json_path.write_text(
json.dumps(payload, indent=2, ensure_ascii=False),
encoding="utf-8",
)
shutil.copyfile(zip_path, raw_zip_path)
return json_path, raw_zip_path
def upload_local_leaderboard(token: str, model_name: str) -> None:
api = HfApi(token=token)
api.create_repo(repo_id=LEADERBOARD_REPO, repo_type="dataset", exist_ok=True)
ensure_local_results_file()
api.upload_folder(
folder_path=str(LOCAL_LEADERBOARD_PATH),
repo_id=LEADERBOARD_REPO,
repo_type="dataset",
allow_patterns=UPLOAD_ALLOW_PATTERNS,
commit_message=f"Add pending MBench submission for {model_name}",
)
def submit_result(
zip_file: Any,
model_name: str,
model_link: str,
team_name: str,
contact_email: str,
model_type: str,
accessibility: str,
) -> str:
token = os.environ.get("HF_TOKEN")
if not token:
return "HF_TOKEN is not set. Please add it in Space Settings -> Secrets."
try:
model_name = require_text(model_name, "Model name")
model_link = require_text(model_link, "Model link")
contact_email = require_text(contact_email, "Contact email")
team_name = str(team_name or "").strip()
accessibility = str(accessibility or "Unknown").strip()
if model_type not in MODEL_TYPE_CHOICES[1:]:
raise ValueError("Model type must be text-conditioned or action-conditioned.")
zip_path = get_uploaded_path(zip_file)
if zip_path is None or not zip_path.exists():
raise ValueError("Please upload a ZIP file.")
result_json = read_first_json_from_zip(zip_path)
ok, message = validate_submission_json(result_json)
if not ok:
raise ValueError(message)
# Refresh the local dataset checkout before adding the pending submission.
try:
snapshot_download(
repo_id=LEADERBOARD_REPO,
repo_type="dataset",
local_dir=str(LOCAL_LEADERBOARD_PATH),
token=token,
local_dir_use_symlinks=False,
)
except Exception:
ensure_submission_dirs()
ensure_local_results_file()
save_pending_submission(
zip_path=zip_path,
result_json=result_json,
model_name=model_name,
model_link=model_link,
team_name=team_name,
contact_email=contact_email,
model_type=model_type,
accessibility=accessibility,
)
upload_local_leaderboard(token, model_name)
except Exception as exc:
return f"Submission failed: {exc}"
return "Submission received. It is pending official verification."
def about_markdown() -> str:
return """
# About MBench
MBench is a benchmark for evaluating the memory capability of video world models. It focuses on whether a model can preserve a coherent world state across long-horizon video continuation and interaction.
The benchmark is organized around three core memory dimensions:
- **Entity Consistency:** persistent object and human identity, geometry, texture, and appearance.
- **Environment Consistency:** stable spatial layout, reprojection behavior, lighting, and style.
- **Causal Consistency:** reliable state evolution and interaction consequences over time.
*Note: Submitted leaderboard results are not automatically shown. They are saved as 'pending' and only shown here after official verification by the MBench team.*
"""
def build_header_html() -> str:
return f"""
MBench evaluates the memory capability of video world models, focusing on whether a model can preserve a coherent world state across long-horizon video continuation and interaction.🏆 MBench Leaderboard
Here we display official leaderboard scores loaded from {LEADERBOARD_REPO}.