datvo06's picture
Fix: capture-phase keydown beats browser scroll, event delegation for grid clicks (no onclick attrs)
1d87bd8 verified
#!/usr/bin/env python3
"""MARA Results Explorer β€” Gradio app for browsing and critiquing runs.
Browse all pipeline results from the Basis-MARA/mara-adversarial-results
HuggingFace dataset. Compare world models across pipelines, step through
rounds, read agent reasoning/scratchpad, and annotate results.
Usage:
python scripts/critique_app.py
python scripts/critique_app.py --port 7861
Requires: pip install gradio huggingface_hub
"""
import argparse
import datetime
import json
import os
import random
import tempfile
import traceback
from pathlib import Path
import gradio as gr
from huggingface_hub import HfApi, hf_hub_download
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
HF_REPO = "Basis-MARA/mara-adversarial-results"
REPO_TYPE = "dataset"
_env_cache: dict = {}
_hf_token = os.environ.get("HF_TOKEN")
_api = HfApi(token=_hf_token)
# Folders to exclude from pipeline listing
EXCLUDE_FOLDERS = {"planning_videos", "annotations", "__pycache__"}
# Pipeline type detection
def _pipeline_type(name: str) -> str:
"""Classify a pipeline folder into a type for appropriate UI handling."""
if "solver" in name or name.startswith("direct_solver"):
return "solver"
if "autumn_synth" in name:
return "autumn_synth"
if "adversarial" in name:
return "adversarial"
return "unknown"
# Pipeline metadata (label + model). Auto-discovered pipelines not here get generic labels.
PIPELINE_META = {
"adversarial_results": ("Adversarial (early)", "Claude Sonnet"),
"adversarial_results_raw": ("Adversarial (Claude, raw)", "Claude Sonnet"),
"adversarial_results_inference": ("Adversarial (inference)", "Claude Sonnet"),
"adversarial_results_raw_remaining_envs": ("Adversarial (Claude, remaining)", "Claude Sonnet"),
"adversarial_results_raw_inference_with_buffer": ("Adversarial (Claude, inf+buffer) β˜…", "Claude Sonnet"),
"adversarial_results_raw_inference": ("Adversarial (Claude, inference)", "Claude Sonnet"),
"adversarial_results_raw_inference_boed": ("Adversarial (Claude, inf+BOED)", "Claude Sonnet"),
"adversarial_results_raw_stochastic": ("Adversarial (Claude, stochastic)", "Claude Sonnet"),
"adversarial_results_raw_stochastic_boed": ("Adversarial (Claude, stoch+BOED)", "Claude Sonnet"),
"adversarial_results_raw_effectful_gpt-4o": ("Adversarial (GPT-4o)", "GPT-4o"),
"adversarial_results_raw_effectful_gpt-5.4": ("Adversarial (GPT-5.4)", "GPT-5.4"),
"adversarial_results_raw_inference_with_buffer_effectful_gpt-4o": ("Adversarial (GPT-4o, inf+buf)", "GPT-4o"),
"adversarial_results_raw_inference_with_buffer_effectful_gpt-5.4": ("Adversarial (GPT-5.4, inf+buf)", "GPT-5.4"),
"adversarial_synthesis_solver_results_raw_inference_v9": ("Adv + Solver (v9)", "Claude Sonnet"),
"adversarial_synthesis_solver_results_raw_protocol_v9_roundselect": ("Adv + Solver (v9 round-select)", "Claude Sonnet"),
"adversarial_synthesis_solver_results_raw_remaining_envs": ("Adv + Solver (remaining)", "Claude Sonnet"),
"autumn_synth_results": ("AutumnSynth (bottom-up)", "Claude Sonnet"),
"autumn_synth_results_encoder_next": ("AutumnSynth (encoder-next)", "Claude Sonnet"),
"autumn_synth_results_old": ("AutumnSynth (old)", "Claude Sonnet"),
"direct_solver_results_v1": ("Direct Solver v1", "Claude Sonnet"),
"direct_solver_results_v2": ("Direct Solver v2", "Claude Sonnet"),
}
# ---------------------------------------------------------------------------
# Data loading helpers
# ---------------------------------------------------------------------------
def discover_pipelines() -> list[str]:
"""Auto-discover all pipeline folders on HF."""
cache_key = "pipelines"
if cache_key in _env_cache:
return _env_cache[cache_key]
try:
items = list(_api.list_repo_tree(HF_REPO, repo_type=REPO_TYPE, path_in_repo=""))
folders = sorted([
f.path for f in items
if not f.path.startswith(".")
and f.path not in EXCLUDE_FOLDERS
# Filter out files (have known extensions)
and not any(f.path.endswith(ext) for ext in (
".json", ".py", ".png", ".csv", ".pkl", ".log", ".gif",
".gitattributes", ".md", ".txt",
))
])
_env_cache[cache_key] = folders
return folders
except Exception as e:
return [f"Error: {e}"]
def pipeline_label(pipeline: str) -> str:
meta = PIPELINE_META.get(pipeline)
return meta[0] if meta else pipeline
def list_envs_for_pipeline(pipeline: str) -> list[str]:
"""List environments available under a pipeline folder."""
cache_key = f"envs:{pipeline}"
if cache_key in _env_cache:
return _env_cache[cache_key]
try:
items = list(_api.list_repo_tree(HF_REPO, repo_type=REPO_TYPE, path_in_repo=pipeline))
envs = sorted([
f.path.split("/")[-1] for f in items
if "/" in f.path
and not any(f.path.endswith(ext) for ext in (
".json", ".py", ".png", ".csv", ".pkl", ".log", ".gif",
".gitattributes", ".md", ".txt",
))
and "__pycache__" not in f.path
and ".DS_Store" not in f.path
])
_env_cache[cache_key] = envs
return envs
except Exception:
_env_cache[cache_key] = []
return []
def list_solver_tasks(pipeline: str, env: str) -> list[str]:
"""List task types (cd, mfp, planning) for solver results."""
items = safe_list_tree(f"{pipeline}/{env}")
return sorted([f.path.split("/")[-1] for f in items
if "/" in f.path and f.path.split("/")[-1] in ("cd", "mfp", "planning")])
def download_file(path: str) -> str | None:
try:
return hf_hub_download(HF_REPO, path, repo_type=REPO_TYPE)
except Exception:
return None
def load_json(path: str) -> dict | list | None:
local = download_file(path)
if local is None:
return None
try:
return json.loads(Path(local).read_text())
except Exception:
return None
def load_text(path: str) -> str:
local = download_file(path)
if local is None:
return ""
try:
return Path(local).read_text()
except Exception:
return ""
def safe_list_tree(path: str) -> list:
"""List files/dirs at a path, returning empty list on 404."""
try:
return list(_api.list_repo_tree(HF_REPO, repo_type=REPO_TYPE, path_in_repo=path))
except Exception:
return []
# ---------------------------------------------------------------------------
# Extract agent reasoning from thoughts JSON
# ---------------------------------------------------------------------------
def extract_reasoning(thoughts: dict | list | None) -> str:
"""Extract human-readable reasoning from a *_thoughts.json file.
These files contain the full Claude Code conversation. We pull out:
- thinking blocks (chain-of-thought)
- text blocks (agent's spoken reasoning)
- tool_use summaries (what tools were called)
"""
if not thoughts or not isinstance(thoughts, dict):
return "(no thoughts data)"
msgs = thoughts.get("messages", [])
if not msgs:
return "(empty messages)"
sections = []
turn = 0
for m in msgs:
mtype = m.get("type", "")
if mtype == "assistant":
msg_data = m.get("message", m)
content = msg_data.get("content", [])
if isinstance(content, str):
if len(content.strip()) > 0:
sections.append(f"**Agent:** {content[:2000]}")
continue
if not isinstance(content, list):
continue
for block in content:
if not isinstance(block, dict):
continue
btype = block.get("type", "")
if btype == "thinking":
text = block.get("thinking", "")
if text.strip():
display = text[:3000] + ("..." if len(text) > 3000 else "")
sections.append(f"<details><summary>Thinking (turn {turn})</summary>\n\n{display}\n\n</details>")
elif btype == "text":
text = block.get("text", "")
if text.strip():
sections.append(f"**Agent:** {text[:2000]}")
elif btype == "tool_use":
name = block.get("name", "?")
inp = block.get("input", {})
if isinstance(inp, dict):
summary = inp.get("command", inp.get("description", inp.get("pattern", str(inp)[:200])))
else:
summary = str(inp)[:200]
sections.append(f"Tool `{name}`: `{summary}`")
turn += 1
elif mtype == "user":
msg_data = m.get("message", m)
content = msg_data.get("content", [])
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_result":
result_content = block.get("content", "")
if isinstance(result_content, str) and len(result_content) > 0:
preview = result_content[:500] + ("..." if len(result_content) > 500 else "")
sections.append(f"<details><summary>Tool result</summary>\n\n```\n{preview}\n```\n\n</details>")
if not sections:
result = thoughts.get("result", "")
if result:
return f"**Final result:**\n\n{result[:5000]}"
return "(could not extract reasoning from this format)"
return "\n\n".join(sections)
# ---------------------------------------------------------------------------
# Overview tab β€” adapts to pipeline type
# ---------------------------------------------------------------------------
def build_overview(pipeline: str) -> str:
if not pipeline:
return "Select a pipeline."
envs = list_envs_for_pipeline(pipeline)
if not envs:
return f"No environments found for `{pipeline}`."
label = pipeline_label(pipeline)
meta = PIPELINE_META.get(pipeline)
model = meta[1] if meta else "unknown"
ptype = _pipeline_type(pipeline)
lines = [f"# {label}\n", f"**Model:** {model} | **Type:** {ptype} | **Environments:** {len(envs)}\n"]
if ptype == "adversarial":
# Adversarial synthesis β€” show rounds, match score, cost
lines.append("| Environment | Rounds | Final Match | Cost (USD) | Discrepancies |")
lines.append("|------------|--------|-------------|------------|---------------|")
for env in envs:
summary = load_json(f"{pipeline}/{env}/experiment_summary.json")
if summary:
n_rounds = summary.get("num_rounds", "?")
match = summary.get("final_match_score", "?")
cost = summary.get("total_cost_usd", 0)
discs = sum(r.get("num_discrepancies", 0) for r in summary.get("rounds", []))
match_str = f"{match:.2f}" if isinstance(match, (int, float)) else str(match)
cost_str = f"${cost:.2f}" if isinstance(cost, (int, float)) else str(cost)
lines.append(f"| {env} | {n_rounds} | {match_str} | {cost_str} | {discs} |")
else:
lines.append(f"| {env} | - | - | - | - |")
elif ptype == "solver":
# Solver β€” show which task types exist
lines.append("| Environment | Tasks Available |")
lines.append("|------------|----------------|")
for env in envs:
tasks = list_solver_tasks(pipeline, env)
tasks_str = ", ".join(tasks) if tasks else "-"
lines.append(f"| {env} | {tasks_str} |")
elif ptype == "autumn_synth":
# AutumnSynth β€” show what components exist
lines.append("| Environment | Components |")
lines.append("|------------|-----------|")
for env in envs:
items = safe_list_tree(f"{pipeline}/{env}")
subdirs = [f.path.split("/")[-1] for f in items
if "/" in f.path and not f.path.endswith((".json", ".py", ".pkl", ".log"))]
subdirs = [s for s in subdirs if s not in ("__pycache__", ".DS_Store")]
lines.append(f"| {env} | {', '.join(sorted(subdirs)) if subdirs else '-'} |")
else:
# Generic fallback
lines.append("| Environment |")
lines.append("|------------|")
for env in envs:
lines.append(f"| {env} |")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Environment detail β€” adapts to pipeline type
# ---------------------------------------------------------------------------
def load_env_detail(pipeline: str, env: str) -> tuple[str, str, str, str]:
if not pipeline or not env:
return ("Select a pipeline and environment.", "", "", "")
ptype = _pipeline_type(pipeline)
if ptype == "adversarial":
return _load_adversarial_detail(pipeline, env)
elif ptype == "solver":
return _load_solver_overview(pipeline, env)
elif ptype == "autumn_synth":
return _load_autumn_synth_detail(pipeline, env)
else:
return (f"## {env}\n\nUnknown pipeline type for `{pipeline}`.", "", "", "")
def _load_adversarial_detail(pipeline: str, env: str) -> tuple[str, str, str, str]:
"""Load adversarial synthesis detail: summary, code, discrepancies, rounds."""
# Summary
summary = load_json(f"{pipeline}/{env}/experiment_summary.json")
if summary:
summary_md = f"## {env}\n\n```json\n{json.dumps(summary, indent=2)}\n```"
else:
summary_md = f"## {env}\n\nNo experiment_summary.json found."
# Final world code
code = load_text(f"{pipeline}/{env}/final_world.py")
if not code:
code = load_text(f"{pipeline}/{env}/code/world.py")
if not code:
code = "(no world.py found)"
# Discrepancies
disc_lines = []
for round_n in range(20):
disc_items = safe_list_tree(f"{pipeline}/{env}/round_{round_n}/challenger/discrepancies")
for item in disc_items:
if item.path.endswith(".json"):
disc = load_json(item.path)
if disc and isinstance(disc, dict):
disc_lines.append(f"### Round {round_n} β€” {item.path.split('/')[-1]}")
disc_lines.append(f"**Description:** {disc.get('description', '(none)')}\n")
disc_lines.append(f"**Actions:** `{disc.get('actions', '')}`\n")
if not disc_items and round_n > 0:
break
discrepancies_md = "\n".join(disc_lines) if disc_lines else "No discrepancies found."
# Rounds summary
round_lines = []
for round_n in range(20):
metrics = load_json(f"{pipeline}/{env}/round_{round_n}/round_metrics.json")
if metrics is None:
break
match_score = metrics.get("observation_match_score", "?")
n_disc = metrics.get("num_discrepancies", "?")
cost_s = metrics.get("synthesizer_cost_usd", 0)
cost_c = metrics.get("challenger_cost_usd", 0)
cov = metrics.get("coverage", {})
round_lines.append(f"### Round {round_n}")
round_lines.append(f"- Match score: **{match_score}**")
round_lines.append(f"- Discrepancies: {n_disc}")
if isinstance(cost_s, (int, float)) and isinstance(cost_c, (int, float)):
round_lines.append(f"- Cost: synth ${cost_s:.2f} + challenger ${cost_c:.2f}")
if cov:
round_lines.append(f"- Coverage: {cov.get('action_types_used', '?')} action types, "
f"{cov.get('unique_states_seen', '?')} unique states, "
f"{cov.get('total_steps', '?')} steps")
round_lines.append("")
rounds_md = "\n".join(round_lines) if round_lines else "No round data found."
return (summary_md, code, discrepancies_md, rounds_md)
def _load_solver_overview(pipeline: str, env: str) -> tuple[str, str, str, str]:
"""Load solver pipeline detail: shows all available tasks with scratchpad/answer previews."""
tasks = list_solver_tasks(pipeline, env)
summary_parts = [f"## {env} β€” Solver Tasks\n"]
scratchpad_parts = []
answer_parts = []
instructions_parts = []
if not tasks:
summary_parts.append("No solver tasks (cd/mfp/planning) found for this environment.")
else:
for task in tasks:
base = f"{pipeline}/{env}/{task}"
summary_parts.append(f"### {task.upper()}")
# Task prompt
prompt = load_json(f"{base}/task_prompt.json")
if prompt:
summary_parts.append(f"```json\n{json.dumps(prompt, indent=2)[:3000]}\n```\n")
else:
summary_parts.append("(no task_prompt.json)\n")
# Scratchpad
sp = load_text(f"{base}/scratchpad.md")
if sp:
scratchpad_parts.append(f"### {task.upper()}\n\n{sp[:5000]}\n")
else:
scratchpad_parts.append(f"### {task.upper()}\n\n(no scratchpad.md)\n")
# Answer
ans = load_json(f"{base}/answer.json")
if ans:
answer_parts.append(f"### {task.upper()}\n\n```json\n{json.dumps(ans, indent=2)[:3000]}\n```\n")
else:
answer_parts.append(f"### {task.upper()}\n\n(no answer.json)\n")
# Instructions
inst = load_text(f"{base}/INSTRUCTIONS.md")
if inst:
instructions_parts.append(f"### {task.upper()}\n\n{inst[:5000]}\n")
summary_md = "\n".join(summary_parts)
scratchpad_md = "\n".join(scratchpad_parts) if scratchpad_parts else "(no scratchpads found)"
answer_md = "\n".join(answer_parts) if answer_parts else "(no answers found)"
instructions_md = "\n".join(instructions_parts) if instructions_parts else "(no instructions found)"
return (summary_md, scratchpad_md, answer_md, instructions_md)
def _load_autumn_synth_detail(pipeline: str, env: str) -> tuple[str, str, str, str]:
"""Load AutumnSynth detail: coverage, encoders, transitions, logs."""
# Coverage report
cov = load_json(f"{pipeline}/{env}/coverage_report.json")
if cov:
summary_md = f"## {env} β€” AutumnSynth\n\n```json\n{json.dumps(cov, indent=2)[:5000]}\n```"
else:
summary_md = f"## {env} β€” AutumnSynth\n\nNo coverage_report.json found."
# List encoders
encoder_items = safe_list_tree(f"{pipeline}/{env}/encoders")
encoder_names = [i.path.split("/")[-1] for i in encoder_items if i.path.endswith(".py")]
if encoder_names:
# Load first encoder as sample
sample = load_text(f"{pipeline}/{env}/encoders/{encoder_names[0]}")
code_md = f"### Encoders ({len(encoder_names)})\n\n"
code_md += ", ".join(f"`{n}`" for n in encoder_names) + "\n\n"
code_md += f"#### Sample: {encoder_names[0]}\n```python\n{sample[:5000]}\n```"
else:
code_md = "(no encoders found)"
# Transitions
trans_items = safe_list_tree(f"{pipeline}/{env}/transitions")
trans_names = [i.path.split("/")[-1] for i in trans_items if i.path.endswith(".py")]
if trans_names:
sample = load_text(f"{pipeline}/{env}/transitions/{trans_names[0]}")
transitions_md = f"### Transitions ({len(trans_names)})\n\n"
transitions_md += ", ".join(f"`{n}`" for n in trans_names) + "\n\n"
transitions_md += f"#### Sample: {trans_names[0]}\n```python\n{sample[:5000]}\n```"
else:
transitions_md = "(no transitions found)"
# Logs
logs_md = ""
for logname in ("joint_synthesis.log", "joint_dependency_synthesis.log", "dependency_visualize.log"):
log = load_text(f"{pipeline}/{env}/{logname}")
if log:
logs_md += f"### {logname}\n\n```\n{log[:5000]}\n```\n\n"
if not logs_md:
logs_md = "(no logs found)"
return (summary_md, code_md, transitions_md, logs_md)
# ---------------------------------------------------------------------------
# Agent reasoning / scratchpad
# ---------------------------------------------------------------------------
def load_agent_reasoning(pipeline: str, env: str, round_n: int, agent_type: str) -> str:
"""Load and format agent reasoning for a specific round."""
if not pipeline or not env:
return "Select a pipeline and environment."
ptype = _pipeline_type(pipeline)
if ptype == "solver":
# For solver, agent_type maps to task type
task_type = agent_type # Will be cd/mfp/planning from the radio
sp = load_text(f"{pipeline}/{env}/{task_type}/scratchpad.md")
if sp:
return f"## {env} / {task_type} β€” Scratchpad\n\n{sp}"
inst = load_text(f"{pipeline}/{env}/{task_type}/INSTRUCTIONS.md")
if inst:
return f"## {env} / {task_type} β€” Instructions\n\n{inst[:10000]}"
return f"No scratchpad or instructions found for {env}/{task_type}."
# Adversarial / other β€” load thoughts JSON
thoughts = load_json(f"{pipeline}/{env}/round_{round_n}/{agent_type}_thoughts.json")
if thoughts:
header = f"## {agent_type.capitalize()} β€” Round {round_n}\n\n"
n_turns = thoughts.get("num_turns", "?")
cost = thoughts.get("total_cost_usd", 0)
is_error = thoughts.get("is_error", False)
error_msg = thoughts.get("error", "")
stop = thoughts.get("stop_reason", "")
if isinstance(cost, (int, float)):
header += f"**Turns:** {n_turns} | **Cost:** ${cost:.2f} | **Error:** {is_error}"
else:
header += f"**Turns:** {n_turns} | **Error:** {is_error}"
if error_msg:
header += f" (`{error_msg}`)"
if stop:
header += f" | **Stop:** {stop}"
header += "\n\n---\n\n"
return header + extract_reasoning(thoughts)
# Try INSTRUCTIONS.md as fallback
instructions = load_text(f"{pipeline}/{env}/round_{round_n}/{agent_type}/INSTRUCTIONS.md")
if instructions:
return f"## {agent_type.capitalize()} Instructions β€” Round {round_n}\n\n{instructions[:10000]}"
return f"No {agent_type} data found for round {round_n}."
# ---------------------------------------------------------------------------
# Solver task detail (standalone)
# ---------------------------------------------------------------------------
def load_solver_detail(pipeline: str, env: str, task: str) -> tuple[str, str, str, str]:
"""Load solver task detail: (summary, scratchpad, answer, instructions)."""
if not pipeline or not env or not task:
return ("Select pipeline, environment, and task.", "", "", "")
base = f"{pipeline}/{env}/{task}"
# Task prompt
prompt = load_json(f"{base}/task_prompt.json")
if prompt:
summary_md = f"## {env} / {task}\n\n```json\n{json.dumps(prompt, indent=2)[:5000]}\n```"
else:
summary_md = f"## {env} / {task}\n\nNo task_prompt.json found."
# Scratchpad
scratchpad = load_text(f"{base}/scratchpad.md")
if not scratchpad:
scratchpad = "(no scratchpad.md found)"
# Answer
answer = load_json(f"{base}/answer.json")
if answer:
answer_md = f"```json\n{json.dumps(answer, indent=2)[:5000]}\n```"
else:
answer_md = "(no answer.json found)"
# Instructions
instructions = load_text(f"{base}/INSTRUCTIONS.md")
if not instructions:
instructions = "(no INSTRUCTIONS.md found)"
return (summary_md, scratchpad, answer_md, instructions)
# ---------------------------------------------------------------------------
# Planning videos β€” recursive search across all subdirs
# ---------------------------------------------------------------------------
def _collect_video_dirs() -> list[str]:
"""Discover all subdirectories under planning_videos/ recursively."""
cache_key = "video_dirs"
if cache_key in _env_cache:
return _env_cache[cache_key]
dirs = ["planning_videos"]
to_visit = ["planning_videos"]
visited = set()
while to_visit:
current = to_visit.pop()
if current in visited:
continue
visited.add(current)
items = safe_list_tree(current)
for item in items:
name = item.path.split("/")[-1]
# If it looks like a directory (no file extension), add it
if not any(name.endswith(ext) for ext in (".gif", ".png", ".json", ".csv", ".DS_Store")):
dirs.append(item.path)
to_visit.append(item.path)
_env_cache[cache_key] = dirs
return dirs
def list_planning_videos(env: str) -> list[tuple[str, str]]:
"""Find all planning videos/images for an environment across all video folders."""
results = []
for dirpath in _collect_video_dirs():
items = safe_list_tree(dirpath)
for item in items:
fname = item.path.split("/")[-1]
if env in fname and (fname.endswith(".gif") or fname.endswith(".png")):
results.append((item.path, fname))
return results
def list_all_video_envs() -> list[str]:
"""Get all unique environment names that have planning videos."""
cache_key = "video_envs"
if cache_key in _env_cache:
return _env_cache[cache_key]
envs = set()
for dirpath in _collect_video_dirs():
items = safe_list_tree(dirpath)
for item in items:
fname = item.path.split("/")[-1]
if fname.endswith(".gif") or fname.endswith(".png"):
# Extract env name: everything before _planning, _mfp, _cd, etc.
for sep in ("_planning", "_mfp", "_cd"):
if sep in fname:
envs.add(fname.split(sep)[0])
break
result = sorted(envs)
_env_cache[cache_key] = result
return result
def load_planning_video_md(env: str) -> str:
"""Load planning video links as markdown."""
if not env:
return "Enter an environment name."
videos = list_planning_videos(env)
if not videos:
return f"No planning videos found for `{env}`."
lines = [f"## Planning Videos for {env}\n"]
for hf_path, fname in videos:
# Group by subfolder
parts = hf_path.split("/")
subfolder = "/".join(parts[1:-1]) if len(parts) > 2 else "(root)"
url = f"https://huggingface.co/datasets/{HF_REPO}/resolve/main/{hf_path}"
if fname.endswith(".gif"):
lines.append(f"### [{subfolder}] {fname}\n![{fname}]({url})\n")
else:
lines.append(f"### [{subfolder}] {fname}\n![{fname}]({url})\n")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Code evolution
# ---------------------------------------------------------------------------
def show_round_code(pipeline: str, env: str, round_idx: int) -> str:
if not pipeline or not env:
return ""
ptype = _pipeline_type(pipeline)
if ptype == "adversarial":
code = load_text(f"{pipeline}/{env}/round_{round_idx}/synthesizer_code.py")
if not code:
code = load_text(f"{pipeline}/{env}/round_{round_idx}/synthesizer/code/world.py")
if not code and round_idx == 0:
code = load_text(f"{pipeline}/{env}/final_world.py")
if code:
code = f"# (final_world.py β€” no per-round code found)\n{code}"
if not code:
code = load_text(f"{pipeline}/{env}/code/world.py")
return code if code else "(no code found for this round)"
elif ptype == "autumn_synth":
# Show encoders for the env
encoder_items = safe_list_tree(f"{pipeline}/{env}/encoders")
encoder_names = [i.path.split("/")[-1] for i in encoder_items if i.path.endswith(".py")]
if round_idx < len(encoder_names):
return load_text(f"{pipeline}/{env}/encoders/{encoder_names[round_idx]}")
return "(no more encoders to show)"
elif ptype == "solver":
# Show code from solver tasks
tasks = list_solver_tasks(pipeline, env)
if round_idx < len(tasks):
task = tasks[round_idx]
code_items = safe_list_tree(f"{pipeline}/{env}/{task}/code")
py_files = [i for i in code_items if i.path.endswith(".py")]
if py_files:
return load_text(py_files[0].path)
return "(no solver code found)"
return "(unsupported pipeline type for code view)"
# ---------------------------------------------------------------------------
# Cross-pipeline comparison
# ---------------------------------------------------------------------------
def compare_env_across_pipelines(env: str) -> str:
if not env:
return "Enter an environment name."
lines = [f"# {env} β€” Cross-Pipeline Comparison\n"]
# Adversarial pipelines with experiment_summary
adv_rows = []
for pipeline in discover_pipelines():
ptype = _pipeline_type(pipeline)
if ptype == "adversarial":
summary = load_json(f"{pipeline}/{env}/experiment_summary.json")
if summary:
label = pipeline_label(pipeline)
n_rounds = summary.get("num_rounds", "?")
match = summary.get("final_match_score", "?")
cost = summary.get("total_cost_usd", 0)
discs = sum(r.get("num_discrepancies", 0) for r in summary.get("rounds", []))
match_str = f"{match:.2f}" if isinstance(match, (int, float)) else str(match)
cost_str = f"${cost:.2f}" if isinstance(cost, (int, float)) else str(cost)
adv_rows.append(f"| {label} | {n_rounds} | {match_str} | {cost_str} | {discs} |")
if adv_rows:
lines.append("### Adversarial Synthesis\n")
lines.append("| Pipeline | Rounds | Final Match | Cost | Discrepancies |")
lines.append("|----------|--------|-------------|------|---------------|")
lines.extend(adv_rows)
lines.append("")
# Solver pipelines
solver_rows = []
for pipeline in discover_pipelines():
ptype = _pipeline_type(pipeline)
if ptype == "solver":
tasks = list_solver_tasks(pipeline, env)
if tasks:
label = pipeline_label(pipeline)
solver_rows.append(f"| {label} | {', '.join(tasks)} |")
if solver_rows:
lines.append("### Solver Results\n")
lines.append("| Pipeline | Tasks |")
lines.append("|----------|-------|")
lines.extend(solver_rows)
lines.append("")
# AutumnSynth
autumn_rows = []
for pipeline in discover_pipelines():
ptype = _pipeline_type(pipeline)
if ptype == "autumn_synth":
items = safe_list_tree(f"{pipeline}/{env}")
if items:
label = pipeline_label(pipeline)
subdirs = [f.path.split("/")[-1] for f in items
if not any(f.path.endswith(ext) for ext in (".json", ".py", ".pkl", ".log"))
and f.path.split("/")[-1] not in ("__pycache__", ".DS_Store")]
autumn_rows.append(f"| {label} | {', '.join(sorted(subdirs)) if subdirs else '-'} |")
if autumn_rows:
lines.append("### AutumnSynth\n")
lines.append("| Pipeline | Components |")
lines.append("|----------|-----------|")
lines.extend(autumn_rows)
lines.append("")
if not adv_rows and not solver_rows and not autumn_rows:
lines.append(f"No results found for `{env}` in any pipeline.")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Annotations
# ---------------------------------------------------------------------------
ANNOTATION_PATH = "annotations/annotations.json"
_annotations_cache: dict | None = None
def load_annotations() -> dict:
global _annotations_cache
if _annotations_cache is not None:
return _annotations_cache
data = load_json(ANNOTATION_PATH)
_annotations_cache = data if isinstance(data, dict) else {}
return _annotations_cache
def save_annotation(pipeline: str, env: str, label: str, comment: str, reviewer: str) -> str:
global _annotations_cache
if not reviewer.strip():
return "Please enter your name / Slack handle."
if not comment.strip():
return "Please enter a comment."
existing = load_annotations().copy()
key = f"{pipeline}/{env}"
if key not in existing:
existing[key] = []
existing[key].append({
"label": label,
"comment": comment.strip(),
"reviewer": reviewer.strip(),
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
})
content = json.dumps(existing, indent=2)
try:
_api.upload_file(
path_or_fileobj=content.encode("utf-8"),
path_in_repo=ANNOTATION_PATH,
repo_id=HF_REPO,
repo_type=REPO_TYPE,
commit_message=f"Annotation: {label} on {key} by {reviewer.strip()}",
)
_annotations_cache = existing
total = sum(len(v) for v in existing.values())
return f"Saved to HuggingFace! Total annotations: {total}"
except Exception as e:
local_path = Path("annotations_local.json")
local_path.write_text(content)
_annotations_cache = existing
return f"Saved locally (HF push failed: {e})."
def format_annotations(pipeline: str, env: str) -> str:
annotations = load_annotations()
key = f"{pipeline}/{env}"
entries = annotations.get(key, [])
if not entries:
return "No annotations yet."
lines = [f"### Annotations ({len(entries)})\n"]
for ann in entries:
ts = ann.get("timestamp", "")[:10]
lines.append(f"- **[{ann['label']}]** {ann['comment']} β€” _{ann['reviewer']}_ ({ts})")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Play Synth World β€” interactive simulator
# ---------------------------------------------------------------------------
# CSS color name β†’ hex mapping for grid rendering
COLOR_MAP = {
"black": "#111111", "white": "#ffffff", "red": "#ff0000", "green": "#00cc00",
"blue": "#0066ff", "yellow": "#ffff00", "gold": "#ffd700", "orange": "#ff8800",
"darkorange": "#ff8c00", "purple": "#9933ff", "mediumpurple": "#9370db",
"gray": "#888888", "grey": "#888888", "brown": "#8b4513", "pink": "#ff69b4",
"cyan": "#00cccc", "magenta": "#ff00ff", "lime": "#00ff00",
"darkgreen": "#006400", "darkblue": "#00008b", "darkred": "#8b0000",
"lightblue": "#add8e6", "lightgreen": "#90ee90", "maroon": "#800000",
"olive": "#808000", "teal": "#008080", "navy": "#000080",
}
def _state_to_text_grid(state) -> tuple[list[list[str]], int]:
"""Convert world state (dict or str) to a 2D color matrix + grid_size."""
if isinstance(state, str):
rows = [line.split() for line in state.strip().split("\n") if line.strip()]
gs = len(rows)
return rows, gs
elif isinstance(state, dict):
gs = state.get("GRID_SIZE", 16)
matrix = [["black"] * gs for _ in range(gs)]
for key, items in state.items():
if key == "GRID_SIZE" or not isinstance(items, list):
continue
for item in items:
if not isinstance(item, dict):
continue
pos = item.get("position", item)
x = pos.get("x", 0)
y = pos.get("y", 0)
if 0 <= x < gs and 0 <= y < gs:
matrix[y][x] = item.get("color", key).lower()
return matrix, gs
return [["black"] * 16 for _ in range(16)], 16
def render_grid_html(state, step_num: int = 0, action: str = "") -> str:
"""Render a world state as an HTML table with clickable colored cells."""
matrix, gs = _state_to_text_grid(state)
cell_px = max(16, min(40, 640 // gs))
html = '<div style="font-family:monospace;margin:8px 0" id="mara-grid-container">'
if action:
html += f'<div style="margin-bottom:4px"><b>Step {step_num}</b> β€” action: <code>{action}</code></div>'
elif step_num == 0:
html += '<div style="margin-bottom:4px"><b>Initial state</b> (after reset)</div>'
html += '<table style="border-collapse:collapse;border:1px solid #444;cursor:crosshair">'
for y, row in enumerate(matrix):
html += "<tr>"
for x, color in enumerate(row):
hex_c = COLOR_MAP.get(color.lower(), color if color.startswith("#") else "#ff00ff")
html += (f'<td data-x="{x}" data-y="{y}" '
f'style="width:{cell_px}px;height:{cell_px}px;'
f'background:{hex_c};border:1px solid #333;padding:0" '
f'title="({x},{y}) {color}"></td>')
html += "</tr>"
html += "</table></div>"
return html
# Minimal stochastic base class for worlds that import it
_STOCHASTIC_BASE = '''
import random as _random
class StochasticWorld:
def __init__(self, seed=42):
self._rng = _random.Random(seed)
self.params = {}
def multinomial(self, options):
items = list(options.items())
weights = [float(w) for _, w in items]
total = sum(weights)
r = self._rng.random() * total
cumul = 0.0
for val, w in items:
cumul += float(w)
if r <= cumul:
return val
return items[-1][0]
def uniform_int(self, lo, hi):
return self._rng.randint(lo, hi)
def bernoulli(self, p):
return self._rng.random() < p
def reseed(self, seed):
self._rng = _random.Random(seed)
class SamplingHandler:
def __init__(self, seed=42):
self._rng = _random.Random(seed)
def multinomial(self, options):
items = list(options.items())
weights = [float(w) for _, w in items]
total = sum(weights)
r = self._rng.random() * total
cumul = 0.0
for val, w in items:
cumul += float(w)
if r <= cumul:
return val
return items[-1][0]
def uniform_int(self, lo, hi):
return self._rng.randint(lo, hi)
def bernoulli(self, p):
return self._rng.random() < p
def reseed(self, seed):
self._rng = _random.Random(seed)
'''
def _load_world_from_code(code_text: str, seed: int = 42):
"""Load a SynthesizedWorld class from code text, exec it, return instance."""
# Write stochastic.py to a temp dir so imports work
tmpdir = tempfile.mkdtemp(prefix="mara_play_")
stochastic_path = Path(tmpdir) / "stochastic.py"
stochastic_path.write_text(_STOCHASTIC_BASE)
import sys
if tmpdir not in sys.path:
sys.path.insert(0, tmpdir)
namespace = {"__builtins__": __builtins__}
try:
exec(compile(code_text, "<world.py>", "exec"), namespace)
except Exception as e:
raise RuntimeError(f"Failed to compile world code: {e}")
finally:
# Clean up sys.path but leave tmpdir for imports during runtime
pass
# Find the world class
cls = namespace.get("SynthesizedWorld")
if cls is None:
for name, obj in namespace.items():
if isinstance(obj, type) and hasattr(obj, "reset") and hasattr(obj, "step"):
cls = obj
break
if cls is None:
raise RuntimeError("No SynthesizedWorld class found in the code.")
return cls(seed=seed)
# Session state for play tab
_play_sessions: dict[str, dict] = {}
def play_load_world(pipeline: str, env: str, seed: int) -> tuple[str, str, str]:
"""Load a world from HF and return (grid_html, status, code)."""
if not pipeline or not env:
return ("", "Select a pipeline and environment.", "")
# Try to find world code
code = load_text(f"{pipeline}/{env}/final_world.py")
if not code:
code = load_text(f"{pipeline}/{env}/code/world.py")
if not code:
# For solver pipelines, try the synthesized code from the adversarial prefix
return ("", f"No world.py found for {pipeline}/{env}.", "")
try:
world = _load_world_from_code(code, seed=int(seed))
state = world.reset()
except Exception as e:
tb = traceback.format_exc()
return ("", f"Error loading world: {e}\n\n```\n{tb[-1000:]}\n```", code)
session_key = f"{pipeline}/{env}"
_play_sessions[session_key] = {
"world": world,
"state": state,
"step": 0,
"history": [],
}
grid_html = render_grid_html(state, step_num=0)
return (grid_html, f"World loaded! Grid ready. Use action buttons to step.", code)
def play_step(pipeline: str, env: str, action: str) -> tuple[str, str]:
"""Execute one step and return (grid_html, status)."""
session_key = f"{pipeline}/{env}"
session = _play_sessions.get(session_key)
if not session:
return ("", "No world loaded. Click 'Load World' first.")
try:
state = session["world"].step(action)
session["state"] = state
session["step"] += 1
session["history"].append(action)
except Exception as e:
return (render_grid_html(session["state"], session["step"], f"ERROR: {action}"),
f"Error on step: {e}")
grid_html = render_grid_html(state, step_num=session["step"], action=action)
return (grid_html, f"Step {session['step']} β€” action: {action}")
def play_reset(pipeline: str, env: str, seed: int) -> tuple[str, str]:
"""Reset the world and return (grid_html, status)."""
session_key = f"{pipeline}/{env}"
session = _play_sessions.get(session_key)
if not session:
return ("", "No world loaded. Click 'Load World' first.")
try:
if hasattr(session["world"], "reseed"):
session["world"].reseed(int(seed))
state = session["world"].reset()
session["state"] = state
session["step"] = 0
session["history"] = []
except Exception as e:
return ("", f"Error on reset: {e}")
grid_html = render_grid_html(state, step_num=0)
return (grid_html, "World reset.")
def play_random_steps(pipeline: str, env: str, n_steps: int) -> tuple[str, str]:
"""Execute N random actions and return (grid_html, status)."""
session_key = f"{pipeline}/{env}"
session = _play_sessions.get(session_key)
if not session:
return ("", "No world loaded. Click 'Load World' first.")
rng = random.Random()
gs = 16
state = session["state"]
if isinstance(state, dict):
gs = state.get("GRID_SIZE", 16)
actions_taken = []
for _ in range(int(n_steps)):
action = rng.choice(["noop", "left", "right", "up", "down", "click"])
if action == "click":
action += f" {rng.randint(0, gs - 1)} {rng.randint(0, gs - 1)}"
try:
state = session["world"].step(action)
session["state"] = state
session["step"] += 1
session["history"].append(action)
actions_taken.append(action)
except Exception as e:
grid_html = render_grid_html(session["state"], session["step"], f"ERROR on {action}")
return (grid_html, f"Error after {len(actions_taken)} steps: {e}")
grid_html = render_grid_html(state, step_num=session["step"], action=actions_taken[-1] if actions_taken else "")
return (grid_html, f"Executed {len(actions_taken)} random steps. Total: {session['step']}")
# ---------------------------------------------------------------------------
# Gradio App
# ---------------------------------------------------------------------------
def build_app() -> gr.Blocks:
pipelines = discover_pipelines()
_keyboard_js = """
function() {
if (window._maraKeysAttached) return;
window._maraKeysAttached = true;
// Helper: set value on a Gradio textbox and trigger change
function setGradioValue(elemId, value) {
var container = document.getElementById(elemId);
if (!container) return;
var el = container.querySelector('textarea') || container.querySelector('input');
if (!el) return;
// Use native setter to bypass React/Svelte wrappers
var setter = Object.getOwnPropertyDescriptor(
HTMLTextAreaElement.prototype, 'value'
);
if (!setter) setter = Object.getOwnPropertyDescriptor(
HTMLInputElement.prototype, 'value'
);
if (setter && setter.set) setter.set.call(el, value);
el.dispatchEvent(new Event('input', {bubbles: true}));
el.dispatchEvent(new Event('change', {bubbles: true}));
}
// Keyboard shortcuts β€” capture phase to beat browser scroll
document.addEventListener('keydown', function(e) {
var tag = (e.target || e.srcElement).tagName;
var editable = (e.target || e.srcElement).isContentEditable;
if (tag === 'INPUT' || tag === 'TEXTAREA' || tag === 'SELECT' || editable) return;
var btnId = null;
switch(e.key) {
case 'ArrowUp': btnId = 'btn_up'; break;
case 'ArrowDown': btnId = 'btn_down'; break;
case 'ArrowLeft': btnId = 'btn_left'; break;
case 'ArrowRight': btnId = 'btn_right'; break;
case ' ': btnId = 'btn_noop'; break;
case 'r': case 'R': btnId = 'btn_reset'; break;
case 'n': case 'N': btnId = 'btn_random'; break;
default: return;
}
e.preventDefault();
e.stopPropagation();
var btn = document.getElementById(btnId);
if (btn) btn.click();
}, true); // true = capture phase
// Grid click β€” use event delegation on the document
// Gradio sanitizes onclick attrs, so we listen for clicks on <td> with data-x/data-y
document.addEventListener('click', function(e) {
var td = e.target.closest('td[data-x][data-y]');
if (!td) return;
var x = td.getAttribute('data-x');
var y = td.getAttribute('data-y');
if (x !== null && y !== null) {
// Use timestamp to force change event even if same cell clicked twice
setGradioValue('grid_click_input', x + ' ' + y + ' ' + Date.now());
}
});
}
"""
with gr.Blocks(title="MARA Results Explorer", theme=gr.themes.Soft(), js=_keyboard_js) as app:
gr.Markdown(
"# MARA Results Explorer\n\n"
"Browse and critique world model synthesis results from "
"[Basis-MARA/mara-adversarial-results]"
"(https://huggingface.co/datasets/Basis-MARA/mara-adversarial-results). "
"Select a pipeline, pick an environment, and explore the agent's reasoning, "
"code evolution, and discrepancies.\n"
)
def update_env_choices(pipeline):
envs = list_envs_for_pipeline(pipeline)
return gr.update(choices=envs, value=envs[0] if envs else None)
with gr.Tabs():
# ── Tab 1: Overview ──
with gr.Tab("Overview"):
gr.Markdown("High-level view of all environments in a pipeline. "
"Adapts columns based on pipeline type (adversarial / solver / autumn_synth).")
overview_pipeline = gr.Dropdown(choices=pipelines, label="Pipeline",
value=pipelines[0] if pipelines else None)
overview_output = gr.Markdown()
overview_pipeline.change(build_overview, inputs=overview_pipeline, outputs=overview_output)
# ── Tab 2: Environment Detail ──
with gr.Tab("Environment Detail"):
gr.Markdown("Detailed view of one environment. Content adapts to pipeline type:\n"
"- **Adversarial**: Summary, final code, discrepancies, per-round metrics\n"
"- **Solver**: Task prompts, scratchpads, answers, instructions\n"
"- **AutumnSynth**: Coverage report, encoders, transitions, logs")
with gr.Row():
detail_pipeline = gr.Dropdown(choices=pipelines, label="Pipeline")
detail_env = gr.Dropdown(choices=[], label="Environment")
detail_pipeline.change(update_env_choices, inputs=detail_pipeline, outputs=detail_env)
load_btn = gr.Button("Load", variant="primary")
# Dynamic sub-tabs β€” labels change based on pipeline type
with gr.Tabs():
with gr.Tab("Summary / Prompts"):
detail_summary = gr.Markdown()
with gr.Tab("Code / Scratchpads"):
detail_code = gr.Markdown()
with gr.Tab("Discrepancies / Answers"):
detail_disc = gr.Markdown()
with gr.Tab("Rounds / Instructions"):
detail_rounds = gr.Markdown()
load_btn.click(
load_env_detail,
inputs=[detail_pipeline, detail_env],
outputs=[detail_summary, detail_code, detail_disc, detail_rounds],
)
# ── Tab 3: Agent Reasoning ──
with gr.Tab("Agent Reasoning"):
gr.Markdown("View the agent's chain-of-thought, tool calls, and reasoning.\n\n"
"- **Adversarial pipelines**: Select round + challenger/synthesizer\n"
"- **Solver pipelines**: Select task type (cd/mfp/planning) to see scratchpad")
with gr.Row():
reason_pipeline = gr.Dropdown(choices=pipelines, label="Pipeline")
reason_env = gr.Dropdown(choices=[], label="Environment")
reason_pipeline.change(update_env_choices, inputs=reason_pipeline, outputs=reason_env)
with gr.Row():
reason_round = gr.Slider(0, 19, step=1, value=0, label="Round (adversarial only)")
reason_agent = gr.Radio(
["challenger", "synthesizer", "cd", "mfp", "planning"],
value="challenger",
label="Agent / Task Type"
)
reason_btn = gr.Button("Load Reasoning", variant="primary")
reason_output = gr.Markdown()
reason_btn.click(
load_agent_reasoning,
inputs=[reason_pipeline, reason_env, reason_round, reason_agent],
outputs=reason_output,
)
# ── Tab 4: Code Evolution ──
with gr.Tab("Code Evolution"):
gr.Markdown("Step through synthesized code versions.\n\n"
"- **Adversarial**: Code per round (synthesizer_code.py)\n"
"- **AutumnSynth**: Encoders (one per slider step)\n"
"- **Solver**: Code from each task type")
with gr.Row():
evo_pipeline = gr.Dropdown(choices=pipelines, label="Pipeline")
evo_env = gr.Dropdown(choices=[], label="Environment")
evo_pipeline.change(update_env_choices, inputs=evo_pipeline, outputs=evo_env)
round_slider = gr.Slider(0, 19, step=1, value=0, label="Round / Index")
evo_code = gr.Code(language="python", label="Code at this round")
round_slider.change(
show_round_code,
inputs=[evo_pipeline, evo_env, round_slider],
outputs=evo_code,
)
evo_env.change(
lambda p, e: show_round_code(p, e, 0),
inputs=[evo_pipeline, evo_env],
outputs=evo_code,
)
# ── Tab 5: Play Synth World ──
with gr.Tab("Play World"):
gr.Markdown("### Interactive World Simulator\n\n"
"Load a synthesized `world.py` from any pipeline and "
"step through it interactively.\n\n"
"**Keyboard:** Arrow keys = move, Space = noop, R = reset, "
"N = 10 random steps. **Click on grid cells** to send click actions.")
with gr.Row():
play_pipeline = gr.Dropdown(choices=pipelines, label="Pipeline")
play_env = gr.Dropdown(choices=[], label="Environment")
play_seed = gr.Number(value=42, label="Seed", precision=0)
play_load_btn = gr.Button("Load World", variant="primary")
play_pipeline.change(update_env_choices, inputs=play_pipeline, outputs=play_env)
play_status = gr.Markdown("Select a pipeline/env and click Load World.")
# Grid display β€” full width, clickable cells
play_grid = gr.HTML(label="Grid")
# Hidden textbox that receives grid click coordinates from JS
grid_click_input = gr.Textbox(visible=False, elem_id="grid_click_input")
# Controls β€” full-width rows
with gr.Row():
btn_left = gr.Button("← Left", elem_id="btn_left")
btn_up = gr.Button("↑ Up", elem_id="btn_up")
btn_down = gr.Button("↓ Down", elem_id="btn_down")
btn_right = gr.Button("β†’ Right", elem_id="btn_right")
btn_noop = gr.Button("Noop (Space)", elem_id="btn_noop")
with gr.Row():
random_n = gr.Slider(1, 50, value=10, step=1, label="N random steps")
btn_random = gr.Button("Run Random (N)", elem_id="btn_random")
btn_reset = gr.Button("Reset (R)", elem_id="btn_reset")
with gr.Accordion("World Code", open=False):
play_code_view = gr.Code(language="python", label="world.py (read-only)", interactive=False)
# Wire up
play_load_btn.click(
play_load_world,
inputs=[play_pipeline, play_env, play_seed],
outputs=[play_grid, play_status, play_code_view],
)
for btn, action_str in [
(btn_left, "left"), (btn_right, "right"),
(btn_up, "up"), (btn_down, "down"), (btn_noop, "noop"),
]:
btn.click(
lambda p, e, a=action_str: play_step(p, e, a),
inputs=[play_pipeline, play_env],
outputs=[play_grid, play_status],
)
# Grid cell click β€” JS writes "x y timestamp" to hidden textbox
def _handle_grid_click(pipeline, env, coords):
if not coords or not coords.strip():
return gr.update(), ""
parts = coords.strip().split()
if len(parts) >= 2:
return play_step(pipeline, env, f"click {parts[0]} {parts[1]}")
return gr.update(), ""
grid_click_input.change(
_handle_grid_click,
inputs=[play_pipeline, play_env, grid_click_input],
outputs=[play_grid, play_status],
)
btn_random.click(
play_random_steps,
inputs=[play_pipeline, play_env, random_n],
outputs=[play_grid, play_status],
)
btn_reset.click(
play_reset,
inputs=[play_pipeline, play_env, play_seed],
outputs=[play_grid, play_status],
)
# ── Tab 6: Solver Tasks ──
with gr.Tab("Solver Tasks"):
gr.Markdown("Dedicated solver task viewer. Select a `*_solver_*` or `direct_solver_*` pipeline.")
with gr.Row():
solver_pipeline = gr.Dropdown(choices=pipelines, label="Pipeline")
solver_env = gr.Dropdown(choices=[], label="Environment")
solver_task = gr.Dropdown(choices=[], label="Task")
solver_pipeline.change(update_env_choices, inputs=solver_pipeline, outputs=solver_env)
def update_solver_tasks(pipeline, env):
tasks = list_solver_tasks(pipeline, env) if pipeline and env else []
return gr.update(choices=tasks, value=tasks[0] if tasks else None)
solver_env.change(update_solver_tasks, inputs=[solver_pipeline, solver_env], outputs=solver_task)
solver_btn = gr.Button("Load", variant="primary")
with gr.Tabs():
with gr.Tab("Task Prompt"):
solver_summary = gr.Markdown()
with gr.Tab("Scratchpad"):
solver_scratchpad = gr.Markdown()
with gr.Tab("Answer"):
solver_answer = gr.Markdown()
with gr.Tab("Instructions"):
solver_instructions = gr.Markdown()
solver_btn.click(
load_solver_detail,
inputs=[solver_pipeline, solver_env, solver_task],
outputs=[solver_summary, solver_scratchpad, solver_answer, solver_instructions],
)
# ── Tab 6: Planning Videos ──
with gr.Tab("Planning Videos"):
gr.Markdown("View planning execution videos (GIFs) and comparison images.\n\n"
"Videos are organized across multiple subdirectories: "
"root, direct_solver, direct_solver_v2, real_env, stochastic, stochastic/real_env.")
video_env = gr.Dropdown(choices=[], label="Environment",
allow_custom_value=True)
video_btn = gr.Button("Load Videos", variant="primary")
video_output = gr.Markdown()
# Populate env dropdown on app load
@app.load(outputs=video_env)
def populate_video_envs():
envs = list_all_video_envs()
return gr.update(choices=envs, value=envs[0] if envs else None)
video_btn.click(load_planning_video_md, inputs=video_env, outputs=video_output)
# ── Tab 7: Compare Pipelines ──
with gr.Tab("Compare Pipelines"):
gr.Markdown("Compare results for one environment across all pipelines.")
compare_env = gr.Textbox(label="Environment name", placeholder="mario")
compare_btn = gr.Button("Compare", variant="primary")
compare_output = gr.Markdown()
compare_btn.click(compare_env_across_pipelines, inputs=compare_env, outputs=compare_output)
# ── Tab 8: Annotate ──
with gr.Tab("Annotate"):
gr.Markdown("### Critique and annotate runs\n"
"Annotations are persisted to the HuggingFace dataset repo.")
with gr.Row():
ann_pipeline = gr.Dropdown(choices=pipelines, label="Pipeline")
ann_env = gr.Dropdown(choices=[], label="Environment")
ann_pipeline.change(update_env_choices, inputs=ann_pipeline, outputs=ann_env)
existing_annotations = gr.Markdown("Select an environment to see annotations.")
def show_annotations(pipeline, env):
return format_annotations(pipeline, env) if pipeline and env else ""
ann_env.change(show_annotations, inputs=[ann_pipeline, ann_env], outputs=existing_annotations)
gr.Markdown("---\n#### Add annotation")
ann_label = gr.Dropdown(
choices=["correct-rule", "wrong-rule", "missing-rule", "lookup-table",
"information-leak", "wrong-ontology", "good-exploration",
"bad-exploration", "general"],
label="Label",
)
ann_comment = gr.Textbox(label="Comment", lines=3,
placeholder="e.g., R3 says gray moves up but it actually chases red")
ann_reviewer = gr.Textbox(label="Your name / Slack handle")
ann_btn = gr.Button("Submit Annotation", variant="primary")
ann_status = gr.Textbox(label="Status", interactive=False)
def submit_and_refresh(pipeline, env, label, comment, reviewer):
status = save_annotation(pipeline, env, label, comment, reviewer)
updated = format_annotations(pipeline, env)
return status, updated
ann_btn.click(
submit_and_refresh,
inputs=[ann_pipeline, ann_env, ann_label, ann_comment, ann_reviewer],
outputs=[ann_status, existing_annotations],
)
return app
def main():
parser = argparse.ArgumentParser(description="MARA Results Explorer")
parser.add_argument("--port", type=int, default=7860)
parser.add_argument("--share", action="store_true")
args = parser.parse_args()
app = build_app()
app.launch(server_port=args.port, share=args.share)
if __name__ == "__main__":
main()