MMOU-Eval / app.py
Lasha
MMOU Eval
12f68b6
from __future__ import annotations
import json
import os
import time
from collections import defaultdict
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from typing import Any
if "GRADIO_TEMP_DIR" not in os.environ:
for candidate in (
Path(__file__).resolve().parent / ".gradio_tmp",
Path.cwd() / ".gradio_tmp",
Path("/tmp") / "gradio",
):
try:
candidate.mkdir(parents=True, exist_ok=True)
probe = candidate / ".write_probe"
probe.write_text("ok", encoding="utf-8")
probe.unlink()
os.environ["GRADIO_TEMP_DIR"] = str(candidate)
break
except OSError:
continue
import gradio as gr
import pandas as pd
from huggingface_hub import hf_hub_download
DEFAULT_GT_LOCAL_PATH = ""
DEFAULT_GT_REPO_ID = "nvidia/mmou-gt"
DEFAULT_GT_FILENAME = "MMOU.json"
DEFAULT_GT_REPO_TYPE = "dataset"
DEFAULT_GT_TOKEN_ENV = "HF_TOKEN"
DOMAINS_ORDER = [
"Sports",
"Travel",
"Video Games",
"Daily Life",
"Academic Lectures",
"Film",
"Pranks",
"Music",
"Animation",
"News",
]
DURATION_BUCKET_ORDER = ["< 5", "5–10", "10–20", "20–30", "> 30", "Overall"]
GT_LETTER_KEYS = (
"correct_option_letter",
"correct_answer_letter",
"label",
"gold_label",
"answer_letter",
)
GT_DOMAIN_KEYS = ("domain", "category")
GT_DURATION_KEYS = ("video_duration", "video_duration_sec", "duration", "duration_sec")
GT_SKILL_KEYS = ("question_type", "skills", "skill", "question_types")
OPTION_LETTERS = set("ABCDEFGHIJ")
APP_INTRO = """
# MMOU Evaluator
Upload a `.json` or `.jsonl` file where each entry contains `question_id` and `answer`.
"""
FORMAT_GUIDE = """
### Submission Format
Each entry must contain:
- `question_id`
- `answer`
`answer` must be a single letter from `A` to `J`. Letter matching is case-insensitive. Extra keys are ignored.
Rows with empty or `null` answers are ignored.
Example JSON:
```json
[
{"question_id": "54aaef4d-2c22-476e-a7e7-37efabde2520", "answer": "C"},
{"question_id": "a7f8790d-7828-4ece-a63a-a5d13edf9026", "answer": "B"}
]
```
Example JSONL:
```json
{"question_id": "54aaef4d-2c22-476e-a7e7-37efabde2520", "answer": "C"}
{"question_id": "a7f8790d-7828-4ece-a63a-a5d13edf9026", "answer": "B"}
```
"""
READY_STATUS_MARKDOWN = "### Ready\nUpload a prediction file and click `Evaluate`."
EMPTY_SUMMARY_MARKDOWN = """
### Summary
Run an evaluation to populate the aggregate summary.
"""
LAYOUT_CSS = """
.gradio-container {
max-width: 1100px !important;
margin: 0 auto !important;
padding-left: 1rem !important;
padding-right: 1rem !important;
font-size: 16px !important;
}
.gradio-container .prose,
.gradio-container .gr-markdown,
.gradio-container .gr-dataframe,
.gradio-container label,
.gradio-container button,
.gradio-container input,
.gradio-container textarea {
font-size: 1rem !important;
}
"""
@dataclass(frozen=True)
class GroundTruthEntry:
correct_letter: str
domain: str
video_duration_sec: float | None
skills: tuple[str, ...]
def stringify(value: Any) -> str:
if value is None:
return ""
if isinstance(value, str):
return value.strip()
if isinstance(value, (int, float, bool)):
return str(value)
return json.dumps(value, ensure_ascii=True)
def coerce_float(value: Any) -> float | None:
if value is None or value == "":
return None
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
try:
return float(value.strip())
except ValueError:
return None
return None
def first_present(record: dict[str, Any], keys: tuple[str, ...]) -> Any:
return next((record[key] for key in keys if record.get(key) not in (None, "", [])), None)
def parse_skill_list(value: Any) -> tuple[str, ...]:
items = value if isinstance(value, list) else ([] if value is None else [value])
cleaned: list[str] = []
seen: set[str] = set()
for item in items:
text = stringify(item).strip().strip("\"'")
if text and text not in seen:
seen.add(text)
cleaned.append(text)
return tuple(cleaned)
def safe_pct(correct: int, total: int) -> float:
return (100.0 * correct / total) if total else 0.0
def duration_bucket(minutes: float) -> str:
if minutes < 5:
return "< 5"
if minutes < 10:
return "5–10"
if minutes < 20:
return "10–20"
if minutes < 30:
return "20–30"
return "> 30"
def normalize_answer(value: Any) -> str:
answer = stringify(value).upper()
if not answer:
return ""
if len(answer) != 1 or answer not in OPTION_LETTERS:
raise ValueError("Each `answer` must be a single letter from A to J.")
return answer
def load_records(path: str | Path, *, allow_data_key: bool = False) -> tuple[list[dict[str, Any]], str]:
file_path = Path(path)
suffix = file_path.suffix.lower()
if suffix in {".jsonl", ".ndjson"}:
records: list[dict[str, Any]] = []
with file_path.open("r", encoding="utf-8") as handle:
for line_number, line in enumerate(handle, start=1):
if not line.strip():
continue
record = json.loads(line)
if not isinstance(record, dict):
raise ValueError(f"Line {line_number} in JSONL must be an object.")
records.append(record)
return records, "jsonl"
with file_path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
if isinstance(payload, list):
records = payload
elif allow_data_key and isinstance(payload, dict) and isinstance(payload.get("data"), list):
records = payload["data"]
else:
raise ValueError("JSON file must contain a list of objects.")
if not all(isinstance(item, dict) for item in records):
raise ValueError("JSON file must contain only objects.")
return records, "json"
def materialize_ground_truth_file() -> Path:
local_path = os.getenv("MMOU_GT_PATH", DEFAULT_GT_LOCAL_PATH).strip()
if local_path:
path = Path(local_path)
if not path.exists():
raise FileNotFoundError(
"MMOU_GT_PATH is set, but the file does not exist. "
"Update the configured path or mount the private file correctly."
)
return path
repo_id = os.getenv("MMOU_GT_REPO_ID", DEFAULT_GT_REPO_ID).strip()
filename = os.getenv("MMOU_GT_FILENAME", DEFAULT_GT_FILENAME).strip()
if repo_id and filename:
repo_type = os.getenv("MMOU_GT_REPO_TYPE", DEFAULT_GT_REPO_TYPE).strip() or "dataset"
token_env = os.getenv("MMOU_GT_TOKEN_ENV", DEFAULT_GT_TOKEN_ENV).strip() or "HF_TOKEN"
token = os.getenv(token_env) or os.getenv("HF_TOKEN", "")
return Path(
hf_hub_download(
repo_id=repo_id,
filename=filename,
repo_type=repo_type,
token=token or None,
)
)
raise RuntimeError(
"Ground truth is not configured. Set MMOU_GT_PATH or "
"MMOU_GT_REPO_ID/MMOU_GT_FILENAME before launching the app."
)
@lru_cache(maxsize=1)
def load_ground_truth() -> dict[str, GroundTruthEntry]:
records, _ = load_records(materialize_ground_truth_file(), allow_data_key=True)
entries: dict[str, GroundTruthEntry] = {}
for record in records:
question_id = stringify(record.get("question_id"))
if not question_id:
continue
correct_letter = next(
(
letter
for key in GT_LETTER_KEYS
if (letter := stringify(record.get(key)).upper()) in OPTION_LETTERS
),
"",
)
if not correct_letter:
continue
entries[question_id] = GroundTruthEntry(
correct_letter=correct_letter,
domain=stringify(first_present(record, GT_DOMAIN_KEYS)) or "Unknown",
video_duration_sec=coerce_float(first_present(record, GT_DURATION_KEYS)),
skills=parse_skill_list(first_present(record, GT_SKILL_KEYS)),
)
if not entries:
raise RuntimeError("No usable ground-truth question IDs were found.")
return entries
def build_prediction_map(records: list[dict[str, Any]]) -> tuple[dict[str, str], int, int]:
predictions: dict[str, str] = {}
duplicates = 0
skipped_empty_answers = 0
for index, record in enumerate(records, start=1):
question_id = stringify(record.get("question_id"))
if not question_id:
raise ValueError(f"Row {index} is missing `question_id`.")
answer = normalize_answer(record.get("answer"))
if not answer:
skipped_empty_answers += 1
continue
if question_id in predictions:
duplicates += 1
predictions[question_id] = answer
return predictions, duplicates, skipped_empty_answers
def bump(stats: dict[str, dict[str, int]], keys: list[str], field: str) -> None:
for key in keys:
stats[key][field] += 1
def make_breakdown_dataframe(
stats: dict[str, dict[str, int]],
label: str,
ordered_labels: list[str] | None = None,
) -> pd.DataFrame:
rows = [
{
label: name,
"Official Accuracy (%)": round(safe_pct(counts["correct"], counts["total"]), 2),
"Answered Accuracy (%)": round(safe_pct(counts["correct"], counts["answered"]), 2),
"Coverage (%)": round(safe_pct(counts["answered"], counts["total"]), 2),
"Correct": counts["correct"],
"Answered": counts["answered"],
"Total": counts["total"],
}
for name, counts in stats.items()
]
if not rows:
return pd.DataFrame(
columns=[
label,
"Official Accuracy (%)",
"Answered Accuracy (%)",
"Coverage (%)",
"Correct",
"Answered",
"Total",
]
)
frame = pd.DataFrame(rows)
if ordered_labels:
rank = {name: idx for idx, name in enumerate(ordered_labels)}
frame["_rank"] = frame[label].map(lambda name: rank.get(name, len(rank)))
return frame.sort_values(["_rank", label]).drop(columns="_rank").reset_index(drop=True)
return frame.sort_values(["Answered Accuracy (%)", "Total"], ascending=[False, False]).reset_index(drop=True)
def build_metrics_markdown(summary: dict[str, Any]) -> str:
return "\n".join(
[
"### Metrics",
f"- Official accuracy: `{summary['official_accuracy_pct']:.2f}%` "
f"(`{summary['correct']} / {summary['total_ground_truth']}`)",
f"- Answered accuracy: `{summary['answered_accuracy_pct']:.2f}%` "
f"(`{summary['correct']} / {summary['answered_predictions']}`)",
f"- Coverage: `{summary['coverage_pct']:.2f}%`",
f"- Matched IDs: `{summary['matched_prediction_ids']}`",
f"- Missing IDs: `{summary['missing_prediction_ids']}`",
f"- Extra IDs: `{summary['extra_prediction_ids']}`",
f"- Duplicate IDs: `{summary['duplicate_prediction_ids']}`",
]
)
def build_summary_markdown(domain_df: pd.DataFrame, duration_df: pd.DataFrame, skill_df: pd.DataFrame) -> str:
accuracy_column = "Answered Accuracy (%)"
best_domain = "n/a"
best_duration = "n/a"
lowest_skill = "n/a"
if not domain_df.empty:
row = domain_df.sort_values([accuracy_column, "Total"], ascending=[False, False]).iloc[0]
best_domain = f"{row['Domain']} ({row[accuracy_column]:.2f}%)"
if not duration_df.empty:
rows = duration_df[duration_df["Duration Bucket"] != "Overall"]
if not rows.empty:
row = rows.sort_values([accuracy_column, "Total"], ascending=[False, False]).iloc[0]
best_duration = f"{row['Duration Bucket']} ({row[accuracy_column]:.2f}%)"
if not skill_df.empty:
rows = skill_df[skill_df["Total"] >= 10]
if rows.empty:
rows = skill_df
row = rows.sort_values([accuracy_column, "Total"], ascending=[True, False]).iloc[0]
lowest_skill = f"{row['Skill']} ({row[accuracy_column]:.2f}%)"
return "\n".join(
[
"### Summary",
f"- Best domain by answered accuracy: `{best_domain}`",
f"- Best duration bucket by answered accuracy: `{best_duration}`",
f"- Lowest skill bucket by answered accuracy: `{lowest_skill}`",
]
)
def empty_result(status: str) -> tuple[str, str, str, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
return status, "", EMPTY_SUMMARY_MARKDOWN, pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
def evaluate_submission(
prediction_file: str | None,
) -> tuple[str, str, str, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
if not prediction_file:
return empty_result(
"### Upload required\nPlease upload a `.json` or `.jsonl` prediction file before evaluating."
)
started_at = time.time()
try:
ground_truth = load_ground_truth()
records, file_format = load_records(prediction_file)
if not records:
raise ValueError("No valid prediction records were found in the uploaded file.")
predictions, duplicate_prediction_ids, skipped_empty_answers = build_prediction_map(records)
domain_stats: dict[str, dict[str, int]] = defaultdict(lambda: {"correct": 0, "answered": 0, "total": 0})
duration_stats: dict[str, dict[str, int]] = defaultdict(lambda: {"correct": 0, "answered": 0, "total": 0})
skill_stats: dict[str, dict[str, int]] = defaultdict(lambda: {"correct": 0, "answered": 0, "total": 0})
correct = 0
answered = 0
gt_ids = set(ground_truth)
pred_ids = set(predictions)
for question_id, gt in ground_truth.items():
duration_key = duration_bucket(gt.video_duration_sec / 60.0) if gt.video_duration_sec is not None else None
scopes = [
(domain_stats, [gt.domain]),
(duration_stats, [duration_key] if duration_key else []),
(skill_stats, list(gt.skills)),
]
for stats, keys in scopes:
bump(stats, keys, "total")
answer = predictions.get(question_id)
if not answer:
continue
answered += 1
for stats, keys in scopes:
bump(stats, keys, "answered")
if answer == gt.correct_letter:
correct += 1
for stats, keys in scopes:
bump(stats, keys, "correct")
total_ground_truth = len(ground_truth)
duration_stats["Overall"] = {"total": total_ground_truth, "answered": answered, "correct": correct}
summary = {
"correct": correct,
"answered_predictions": answered,
"total_ground_truth": total_ground_truth,
"official_accuracy_pct": safe_pct(correct, total_ground_truth),
"answered_accuracy_pct": safe_pct(correct, answered),
"coverage_pct": safe_pct(answered, total_ground_truth),
"matched_prediction_ids": len(pred_ids & gt_ids),
"missing_prediction_ids": total_ground_truth - len(pred_ids & gt_ids),
"extra_prediction_ids": len(pred_ids - gt_ids),
"duplicate_prediction_ids": duplicate_prediction_ids,
}
domain_df = make_breakdown_dataframe(domain_stats, "Domain", ordered_labels=DOMAINS_ORDER)
duration_df = make_breakdown_dataframe(
duration_stats,
"Duration Bucket",
ordered_labels=DURATION_BUCKET_ORDER,
)
skill_df = make_breakdown_dataframe(skill_stats, "Skill")
status_markdown = (
"### Evaluation complete\n"
f"- Parsed file format: `{file_format}`\n"
f"- Uploaded rows: `{len(records)}`\n"
f"- Skipped empty answers: `{skipped_empty_answers}`\n"
f"- Evaluation time: `{time.time() - started_at:.2f}s`"
)
return (
status_markdown,
build_metrics_markdown(summary),
build_summary_markdown(domain_df, duration_df, skill_df),
domain_df,
duration_df,
skill_df,
)
except Exception as exc:
return empty_result(f"### Evaluation failed\n`{type(exc).__name__}: {exc}`")
def clear_outputs() -> tuple[None, str, str, str, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
return None, READY_STATUS_MARKDOWN, "", EMPTY_SUMMARY_MARKDOWN, pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
with gr.Blocks(title="MMOU Evaluator", fill_width=False) as demo:
gr.Markdown(APP_INTRO)
prediction_file = gr.File(label="Upload prediction file", file_types=[".json", ".jsonl"], type="filepath")
with gr.Row():
evaluate_button = gr.Button("Evaluate", variant="primary")
clear_button = gr.Button("Clear")
status_markdown = gr.Markdown(READY_STATUS_MARKDOWN)
metrics_markdown = gr.Markdown("")
summary_markdown = gr.Markdown(EMPTY_SUMMARY_MARKDOWN)
gr.Markdown(FORMAT_GUIDE)
with gr.Tabs():
with gr.Tab("Domain Breakdown"):
domain_dataframe = gr.Dataframe(label="Domain breakdown", interactive=False, wrap=True)
with gr.Tab("Duration Breakdown"):
duration_dataframe = gr.Dataframe(label="Duration breakdown", interactive=False, wrap=True)
with gr.Tab("Skill Breakdown"):
skill_dataframe = gr.Dataframe(label="Skill breakdown", interactive=False, wrap=True)
evaluate_button.click(
fn=evaluate_submission,
inputs=[prediction_file],
outputs=[
status_markdown,
metrics_markdown,
summary_markdown,
domain_dataframe,
duration_dataframe,
skill_dataframe,
],
)
clear_button.click(
fn=clear_outputs,
outputs=[
prediction_file,
status_markdown,
metrics_markdown,
summary_markdown,
domain_dataframe,
duration_dataframe,
skill_dataframe,
],
)
if __name__ == "__main__":
demo.launch(theme=gr.themes.Default(), css=LAYOUT_CSS)