Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """MARA Results Explorer β Gradio app for browsing and critiquing runs. | |
| Browse all pipeline results from the Basis-MARA/mara-adversarial-results | |
| HuggingFace dataset. Compare world models across pipelines, step through | |
| rounds, read agent reasoning/scratchpad, and annotate results. | |
| Usage: | |
| python scripts/critique_app.py | |
| python scripts/critique_app.py --port 7861 | |
| Requires: pip install gradio huggingface_hub | |
| """ | |
| import argparse | |
| import datetime | |
| import json | |
| import os | |
| import random | |
| import tempfile | |
| import traceback | |
| from pathlib import Path | |
| import gradio as gr | |
| from huggingface_hub import HfApi, hf_hub_download | |
| # --------------------------------------------------------------------------- | |
| # Config | |
| # --------------------------------------------------------------------------- | |
| HF_REPO = "Basis-MARA/mara-adversarial-results" | |
| REPO_TYPE = "dataset" | |
| _env_cache: dict = {} | |
| _hf_token = os.environ.get("HF_TOKEN") | |
| _api = HfApi(token=_hf_token) | |
| # Folders to exclude from pipeline listing | |
| EXCLUDE_FOLDERS = {"planning_videos", "annotations", "__pycache__"} | |
| # Pipeline type detection | |
| def _pipeline_type(name: str) -> str: | |
| """Classify a pipeline folder into a type for appropriate UI handling.""" | |
| if "solver" in name or name.startswith("direct_solver"): | |
| return "solver" | |
| if "autumn_synth" in name: | |
| return "autumn_synth" | |
| if "adversarial" in name: | |
| return "adversarial" | |
| return "unknown" | |
| # Pipeline metadata (label + model). Auto-discovered pipelines not here get generic labels. | |
| PIPELINE_META = { | |
| "adversarial_results": ("Adversarial (early)", "Claude Sonnet"), | |
| "adversarial_results_raw": ("Adversarial (Claude, raw)", "Claude Sonnet"), | |
| "adversarial_results_inference": ("Adversarial (inference)", "Claude Sonnet"), | |
| "adversarial_results_raw_remaining_envs": ("Adversarial (Claude, remaining)", "Claude Sonnet"), | |
| "adversarial_results_raw_inference_with_buffer": ("Adversarial (Claude, inf+buffer) β ", "Claude Sonnet"), | |
| "adversarial_results_raw_inference": ("Adversarial (Claude, inference)", "Claude Sonnet"), | |
| "adversarial_results_raw_inference_boed": ("Adversarial (Claude, inf+BOED)", "Claude Sonnet"), | |
| "adversarial_results_raw_stochastic": ("Adversarial (Claude, stochastic)", "Claude Sonnet"), | |
| "adversarial_results_raw_stochastic_boed": ("Adversarial (Claude, stoch+BOED)", "Claude Sonnet"), | |
| "adversarial_results_raw_effectful_gpt-4o": ("Adversarial (GPT-4o)", "GPT-4o"), | |
| "adversarial_results_raw_effectful_gpt-5.4": ("Adversarial (GPT-5.4)", "GPT-5.4"), | |
| "adversarial_results_raw_inference_with_buffer_effectful_gpt-4o": ("Adversarial (GPT-4o, inf+buf)", "GPT-4o"), | |
| "adversarial_results_raw_inference_with_buffer_effectful_gpt-5.4": ("Adversarial (GPT-5.4, inf+buf)", "GPT-5.4"), | |
| "adversarial_synthesis_solver_results_raw_inference_v9": ("Adv + Solver (v9)", "Claude Sonnet"), | |
| "adversarial_synthesis_solver_results_raw_protocol_v9_roundselect": ("Adv + Solver (v9 round-select)", "Claude Sonnet"), | |
| "adversarial_synthesis_solver_results_raw_remaining_envs": ("Adv + Solver (remaining)", "Claude Sonnet"), | |
| "autumn_synth_results": ("AutumnSynth (bottom-up)", "Claude Sonnet"), | |
| "autumn_synth_results_encoder_next": ("AutumnSynth (encoder-next)", "Claude Sonnet"), | |
| "autumn_synth_results_old": ("AutumnSynth (old)", "Claude Sonnet"), | |
| "direct_solver_results_v1": ("Direct Solver v1", "Claude Sonnet"), | |
| "direct_solver_results_v2": ("Direct Solver v2", "Claude Sonnet"), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Data loading helpers | |
| # --------------------------------------------------------------------------- | |
| def discover_pipelines() -> list[str]: | |
| """Auto-discover all pipeline folders on HF.""" | |
| cache_key = "pipelines" | |
| if cache_key in _env_cache: | |
| return _env_cache[cache_key] | |
| try: | |
| items = list(_api.list_repo_tree(HF_REPO, repo_type=REPO_TYPE, path_in_repo="")) | |
| folders = sorted([ | |
| f.path for f in items | |
| if not f.path.startswith(".") | |
| and f.path not in EXCLUDE_FOLDERS | |
| # Filter out files (have known extensions) | |
| and not any(f.path.endswith(ext) for ext in ( | |
| ".json", ".py", ".png", ".csv", ".pkl", ".log", ".gif", | |
| ".gitattributes", ".md", ".txt", | |
| )) | |
| ]) | |
| _env_cache[cache_key] = folders | |
| return folders | |
| except Exception as e: | |
| return [f"Error: {e}"] | |
| def pipeline_label(pipeline: str) -> str: | |
| meta = PIPELINE_META.get(pipeline) | |
| return meta[0] if meta else pipeline | |
| def list_envs_for_pipeline(pipeline: str) -> list[str]: | |
| """List environments available under a pipeline folder.""" | |
| cache_key = f"envs:{pipeline}" | |
| if cache_key in _env_cache: | |
| return _env_cache[cache_key] | |
| try: | |
| items = list(_api.list_repo_tree(HF_REPO, repo_type=REPO_TYPE, path_in_repo=pipeline)) | |
| envs = sorted([ | |
| f.path.split("/")[-1] for f in items | |
| if "/" in f.path | |
| and not any(f.path.endswith(ext) for ext in ( | |
| ".json", ".py", ".png", ".csv", ".pkl", ".log", ".gif", | |
| ".gitattributes", ".md", ".txt", | |
| )) | |
| and "__pycache__" not in f.path | |
| and ".DS_Store" not in f.path | |
| ]) | |
| _env_cache[cache_key] = envs | |
| return envs | |
| except Exception: | |
| _env_cache[cache_key] = [] | |
| return [] | |
| def list_solver_tasks(pipeline: str, env: str) -> list[str]: | |
| """List task types (cd, mfp, planning) for solver results.""" | |
| items = safe_list_tree(f"{pipeline}/{env}") | |
| return sorted([f.path.split("/")[-1] for f in items | |
| if "/" in f.path and f.path.split("/")[-1] in ("cd", "mfp", "planning")]) | |
| def download_file(path: str) -> str | None: | |
| try: | |
| return hf_hub_download(HF_REPO, path, repo_type=REPO_TYPE) | |
| except Exception: | |
| return None | |
| def load_json(path: str) -> dict | list | None: | |
| local = download_file(path) | |
| if local is None: | |
| return None | |
| try: | |
| return json.loads(Path(local).read_text()) | |
| except Exception: | |
| return None | |
| def load_text(path: str) -> str: | |
| local = download_file(path) | |
| if local is None: | |
| return "" | |
| try: | |
| return Path(local).read_text() | |
| except Exception: | |
| return "" | |
| def safe_list_tree(path: str) -> list: | |
| """List files/dirs at a path, returning empty list on 404.""" | |
| try: | |
| return list(_api.list_repo_tree(HF_REPO, repo_type=REPO_TYPE, path_in_repo=path)) | |
| except Exception: | |
| return [] | |
| # --------------------------------------------------------------------------- | |
| # Extract agent reasoning from thoughts JSON | |
| # --------------------------------------------------------------------------- | |
| def extract_reasoning(thoughts: dict | list | None) -> str: | |
| """Extract human-readable reasoning from a *_thoughts.json file. | |
| These files contain the full Claude Code conversation. We pull out: | |
| - thinking blocks (chain-of-thought) | |
| - text blocks (agent's spoken reasoning) | |
| - tool_use summaries (what tools were called) | |
| """ | |
| if not thoughts or not isinstance(thoughts, dict): | |
| return "(no thoughts data)" | |
| msgs = thoughts.get("messages", []) | |
| if not msgs: | |
| return "(empty messages)" | |
| sections = [] | |
| turn = 0 | |
| for m in msgs: | |
| mtype = m.get("type", "") | |
| if mtype == "assistant": | |
| msg_data = m.get("message", m) | |
| content = msg_data.get("content", []) | |
| if isinstance(content, str): | |
| if len(content.strip()) > 0: | |
| sections.append(f"**Agent:** {content[:2000]}") | |
| continue | |
| if not isinstance(content, list): | |
| continue | |
| for block in content: | |
| if not isinstance(block, dict): | |
| continue | |
| btype = block.get("type", "") | |
| if btype == "thinking": | |
| text = block.get("thinking", "") | |
| if text.strip(): | |
| display = text[:3000] + ("..." if len(text) > 3000 else "") | |
| sections.append(f"<details><summary>Thinking (turn {turn})</summary>\n\n{display}\n\n</details>") | |
| elif btype == "text": | |
| text = block.get("text", "") | |
| if text.strip(): | |
| sections.append(f"**Agent:** {text[:2000]}") | |
| elif btype == "tool_use": | |
| name = block.get("name", "?") | |
| inp = block.get("input", {}) | |
| if isinstance(inp, dict): | |
| summary = inp.get("command", inp.get("description", inp.get("pattern", str(inp)[:200]))) | |
| else: | |
| summary = str(inp)[:200] | |
| sections.append(f"Tool `{name}`: `{summary}`") | |
| turn += 1 | |
| elif mtype == "user": | |
| msg_data = m.get("message", m) | |
| content = msg_data.get("content", []) | |
| if isinstance(content, list): | |
| for block in content: | |
| if isinstance(block, dict) and block.get("type") == "tool_result": | |
| result_content = block.get("content", "") | |
| if isinstance(result_content, str) and len(result_content) > 0: | |
| preview = result_content[:500] + ("..." if len(result_content) > 500 else "") | |
| sections.append(f"<details><summary>Tool result</summary>\n\n```\n{preview}\n```\n\n</details>") | |
| if not sections: | |
| result = thoughts.get("result", "") | |
| if result: | |
| return f"**Final result:**\n\n{result[:5000]}" | |
| return "(could not extract reasoning from this format)" | |
| return "\n\n".join(sections) | |
| # --------------------------------------------------------------------------- | |
| # Overview tab β adapts to pipeline type | |
| # --------------------------------------------------------------------------- | |
| def build_overview(pipeline: str) -> str: | |
| if not pipeline: | |
| return "Select a pipeline." | |
| envs = list_envs_for_pipeline(pipeline) | |
| if not envs: | |
| return f"No environments found for `{pipeline}`." | |
| label = pipeline_label(pipeline) | |
| meta = PIPELINE_META.get(pipeline) | |
| model = meta[1] if meta else "unknown" | |
| ptype = _pipeline_type(pipeline) | |
| lines = [f"# {label}\n", f"**Model:** {model} | **Type:** {ptype} | **Environments:** {len(envs)}\n"] | |
| if ptype == "adversarial": | |
| # Adversarial synthesis β show rounds, match score, cost | |
| lines.append("| Environment | Rounds | Final Match | Cost (USD) | Discrepancies |") | |
| lines.append("|------------|--------|-------------|------------|---------------|") | |
| for env in envs: | |
| summary = load_json(f"{pipeline}/{env}/experiment_summary.json") | |
| if summary: | |
| n_rounds = summary.get("num_rounds", "?") | |
| match = summary.get("final_match_score", "?") | |
| cost = summary.get("total_cost_usd", 0) | |
| discs = sum(r.get("num_discrepancies", 0) for r in summary.get("rounds", [])) | |
| match_str = f"{match:.2f}" if isinstance(match, (int, float)) else str(match) | |
| cost_str = f"${cost:.2f}" if isinstance(cost, (int, float)) else str(cost) | |
| lines.append(f"| {env} | {n_rounds} | {match_str} | {cost_str} | {discs} |") | |
| else: | |
| lines.append(f"| {env} | - | - | - | - |") | |
| elif ptype == "solver": | |
| # Solver β show which task types exist | |
| lines.append("| Environment | Tasks Available |") | |
| lines.append("|------------|----------------|") | |
| for env in envs: | |
| tasks = list_solver_tasks(pipeline, env) | |
| tasks_str = ", ".join(tasks) if tasks else "-" | |
| lines.append(f"| {env} | {tasks_str} |") | |
| elif ptype == "autumn_synth": | |
| # AutumnSynth β show what components exist | |
| lines.append("| Environment | Components |") | |
| lines.append("|------------|-----------|") | |
| for env in envs: | |
| items = safe_list_tree(f"{pipeline}/{env}") | |
| subdirs = [f.path.split("/")[-1] for f in items | |
| if "/" in f.path and not f.path.endswith((".json", ".py", ".pkl", ".log"))] | |
| subdirs = [s for s in subdirs if s not in ("__pycache__", ".DS_Store")] | |
| lines.append(f"| {env} | {', '.join(sorted(subdirs)) if subdirs else '-'} |") | |
| else: | |
| # Generic fallback | |
| lines.append("| Environment |") | |
| lines.append("|------------|") | |
| for env in envs: | |
| lines.append(f"| {env} |") | |
| return "\n".join(lines) | |
| # --------------------------------------------------------------------------- | |
| # Environment detail β adapts to pipeline type | |
| # --------------------------------------------------------------------------- | |
| def load_env_detail(pipeline: str, env: str) -> tuple[str, str, str, str]: | |
| if not pipeline or not env: | |
| return ("Select a pipeline and environment.", "", "", "") | |
| ptype = _pipeline_type(pipeline) | |
| if ptype == "adversarial": | |
| return _load_adversarial_detail(pipeline, env) | |
| elif ptype == "solver": | |
| return _load_solver_overview(pipeline, env) | |
| elif ptype == "autumn_synth": | |
| return _load_autumn_synth_detail(pipeline, env) | |
| else: | |
| return (f"## {env}\n\nUnknown pipeline type for `{pipeline}`.", "", "", "") | |
| def _load_adversarial_detail(pipeline: str, env: str) -> tuple[str, str, str, str]: | |
| """Load adversarial synthesis detail: summary, code, discrepancies, rounds.""" | |
| # Summary | |
| summary = load_json(f"{pipeline}/{env}/experiment_summary.json") | |
| if summary: | |
| summary_md = f"## {env}\n\n```json\n{json.dumps(summary, indent=2)}\n```" | |
| else: | |
| summary_md = f"## {env}\n\nNo experiment_summary.json found." | |
| # Final world code | |
| code = load_text(f"{pipeline}/{env}/final_world.py") | |
| if not code: | |
| code = load_text(f"{pipeline}/{env}/code/world.py") | |
| if not code: | |
| code = "(no world.py found)" | |
| # Discrepancies | |
| disc_lines = [] | |
| for round_n in range(20): | |
| disc_items = safe_list_tree(f"{pipeline}/{env}/round_{round_n}/challenger/discrepancies") | |
| for item in disc_items: | |
| if item.path.endswith(".json"): | |
| disc = load_json(item.path) | |
| if disc and isinstance(disc, dict): | |
| disc_lines.append(f"### Round {round_n} β {item.path.split('/')[-1]}") | |
| disc_lines.append(f"**Description:** {disc.get('description', '(none)')}\n") | |
| disc_lines.append(f"**Actions:** `{disc.get('actions', '')}`\n") | |
| if not disc_items and round_n > 0: | |
| break | |
| discrepancies_md = "\n".join(disc_lines) if disc_lines else "No discrepancies found." | |
| # Rounds summary | |
| round_lines = [] | |
| for round_n in range(20): | |
| metrics = load_json(f"{pipeline}/{env}/round_{round_n}/round_metrics.json") | |
| if metrics is None: | |
| break | |
| match_score = metrics.get("observation_match_score", "?") | |
| n_disc = metrics.get("num_discrepancies", "?") | |
| cost_s = metrics.get("synthesizer_cost_usd", 0) | |
| cost_c = metrics.get("challenger_cost_usd", 0) | |
| cov = metrics.get("coverage", {}) | |
| round_lines.append(f"### Round {round_n}") | |
| round_lines.append(f"- Match score: **{match_score}**") | |
| round_lines.append(f"- Discrepancies: {n_disc}") | |
| if isinstance(cost_s, (int, float)) and isinstance(cost_c, (int, float)): | |
| round_lines.append(f"- Cost: synth ${cost_s:.2f} + challenger ${cost_c:.2f}") | |
| if cov: | |
| round_lines.append(f"- Coverage: {cov.get('action_types_used', '?')} action types, " | |
| f"{cov.get('unique_states_seen', '?')} unique states, " | |
| f"{cov.get('total_steps', '?')} steps") | |
| round_lines.append("") | |
| rounds_md = "\n".join(round_lines) if round_lines else "No round data found." | |
| return (summary_md, code, discrepancies_md, rounds_md) | |
| def _load_solver_overview(pipeline: str, env: str) -> tuple[str, str, str, str]: | |
| """Load solver pipeline detail: shows all available tasks with scratchpad/answer previews.""" | |
| tasks = list_solver_tasks(pipeline, env) | |
| summary_parts = [f"## {env} β Solver Tasks\n"] | |
| scratchpad_parts = [] | |
| answer_parts = [] | |
| instructions_parts = [] | |
| if not tasks: | |
| summary_parts.append("No solver tasks (cd/mfp/planning) found for this environment.") | |
| else: | |
| for task in tasks: | |
| base = f"{pipeline}/{env}/{task}" | |
| summary_parts.append(f"### {task.upper()}") | |
| # Task prompt | |
| prompt = load_json(f"{base}/task_prompt.json") | |
| if prompt: | |
| summary_parts.append(f"```json\n{json.dumps(prompt, indent=2)[:3000]}\n```\n") | |
| else: | |
| summary_parts.append("(no task_prompt.json)\n") | |
| # Scratchpad | |
| sp = load_text(f"{base}/scratchpad.md") | |
| if sp: | |
| scratchpad_parts.append(f"### {task.upper()}\n\n{sp[:5000]}\n") | |
| else: | |
| scratchpad_parts.append(f"### {task.upper()}\n\n(no scratchpad.md)\n") | |
| # Answer | |
| ans = load_json(f"{base}/answer.json") | |
| if ans: | |
| answer_parts.append(f"### {task.upper()}\n\n```json\n{json.dumps(ans, indent=2)[:3000]}\n```\n") | |
| else: | |
| answer_parts.append(f"### {task.upper()}\n\n(no answer.json)\n") | |
| # Instructions | |
| inst = load_text(f"{base}/INSTRUCTIONS.md") | |
| if inst: | |
| instructions_parts.append(f"### {task.upper()}\n\n{inst[:5000]}\n") | |
| summary_md = "\n".join(summary_parts) | |
| scratchpad_md = "\n".join(scratchpad_parts) if scratchpad_parts else "(no scratchpads found)" | |
| answer_md = "\n".join(answer_parts) if answer_parts else "(no answers found)" | |
| instructions_md = "\n".join(instructions_parts) if instructions_parts else "(no instructions found)" | |
| return (summary_md, scratchpad_md, answer_md, instructions_md) | |
| def _load_autumn_synth_detail(pipeline: str, env: str) -> tuple[str, str, str, str]: | |
| """Load AutumnSynth detail: coverage, encoders, transitions, logs.""" | |
| # Coverage report | |
| cov = load_json(f"{pipeline}/{env}/coverage_report.json") | |
| if cov: | |
| summary_md = f"## {env} β AutumnSynth\n\n```json\n{json.dumps(cov, indent=2)[:5000]}\n```" | |
| else: | |
| summary_md = f"## {env} β AutumnSynth\n\nNo coverage_report.json found." | |
| # List encoders | |
| encoder_items = safe_list_tree(f"{pipeline}/{env}/encoders") | |
| encoder_names = [i.path.split("/")[-1] for i in encoder_items if i.path.endswith(".py")] | |
| if encoder_names: | |
| # Load first encoder as sample | |
| sample = load_text(f"{pipeline}/{env}/encoders/{encoder_names[0]}") | |
| code_md = f"### Encoders ({len(encoder_names)})\n\n" | |
| code_md += ", ".join(f"`{n}`" for n in encoder_names) + "\n\n" | |
| code_md += f"#### Sample: {encoder_names[0]}\n```python\n{sample[:5000]}\n```" | |
| else: | |
| code_md = "(no encoders found)" | |
| # Transitions | |
| trans_items = safe_list_tree(f"{pipeline}/{env}/transitions") | |
| trans_names = [i.path.split("/")[-1] for i in trans_items if i.path.endswith(".py")] | |
| if trans_names: | |
| sample = load_text(f"{pipeline}/{env}/transitions/{trans_names[0]}") | |
| transitions_md = f"### Transitions ({len(trans_names)})\n\n" | |
| transitions_md += ", ".join(f"`{n}`" for n in trans_names) + "\n\n" | |
| transitions_md += f"#### Sample: {trans_names[0]}\n```python\n{sample[:5000]}\n```" | |
| else: | |
| transitions_md = "(no transitions found)" | |
| # Logs | |
| logs_md = "" | |
| for logname in ("joint_synthesis.log", "joint_dependency_synthesis.log", "dependency_visualize.log"): | |
| log = load_text(f"{pipeline}/{env}/{logname}") | |
| if log: | |
| logs_md += f"### {logname}\n\n```\n{log[:5000]}\n```\n\n" | |
| if not logs_md: | |
| logs_md = "(no logs found)" | |
| return (summary_md, code_md, transitions_md, logs_md) | |
| # --------------------------------------------------------------------------- | |
| # Agent reasoning / scratchpad | |
| # --------------------------------------------------------------------------- | |
| def load_agent_reasoning(pipeline: str, env: str, round_n: int, agent_type: str) -> str: | |
| """Load and format agent reasoning for a specific round.""" | |
| if not pipeline or not env: | |
| return "Select a pipeline and environment." | |
| ptype = _pipeline_type(pipeline) | |
| if ptype == "solver": | |
| # For solver, agent_type maps to task type | |
| task_type = agent_type # Will be cd/mfp/planning from the radio | |
| sp = load_text(f"{pipeline}/{env}/{task_type}/scratchpad.md") | |
| if sp: | |
| return f"## {env} / {task_type} β Scratchpad\n\n{sp}" | |
| inst = load_text(f"{pipeline}/{env}/{task_type}/INSTRUCTIONS.md") | |
| if inst: | |
| return f"## {env} / {task_type} β Instructions\n\n{inst[:10000]}" | |
| return f"No scratchpad or instructions found for {env}/{task_type}." | |
| # Adversarial / other β load thoughts JSON | |
| thoughts = load_json(f"{pipeline}/{env}/round_{round_n}/{agent_type}_thoughts.json") | |
| if thoughts: | |
| header = f"## {agent_type.capitalize()} β Round {round_n}\n\n" | |
| n_turns = thoughts.get("num_turns", "?") | |
| cost = thoughts.get("total_cost_usd", 0) | |
| is_error = thoughts.get("is_error", False) | |
| error_msg = thoughts.get("error", "") | |
| stop = thoughts.get("stop_reason", "") | |
| if isinstance(cost, (int, float)): | |
| header += f"**Turns:** {n_turns} | **Cost:** ${cost:.2f} | **Error:** {is_error}" | |
| else: | |
| header += f"**Turns:** {n_turns} | **Error:** {is_error}" | |
| if error_msg: | |
| header += f" (`{error_msg}`)" | |
| if stop: | |
| header += f" | **Stop:** {stop}" | |
| header += "\n\n---\n\n" | |
| return header + extract_reasoning(thoughts) | |
| # Try INSTRUCTIONS.md as fallback | |
| instructions = load_text(f"{pipeline}/{env}/round_{round_n}/{agent_type}/INSTRUCTIONS.md") | |
| if instructions: | |
| return f"## {agent_type.capitalize()} Instructions β Round {round_n}\n\n{instructions[:10000]}" | |
| return f"No {agent_type} data found for round {round_n}." | |
| # --------------------------------------------------------------------------- | |
| # Solver task detail (standalone) | |
| # --------------------------------------------------------------------------- | |
| def load_solver_detail(pipeline: str, env: str, task: str) -> tuple[str, str, str, str]: | |
| """Load solver task detail: (summary, scratchpad, answer, instructions).""" | |
| if not pipeline or not env or not task: | |
| return ("Select pipeline, environment, and task.", "", "", "") | |
| base = f"{pipeline}/{env}/{task}" | |
| # Task prompt | |
| prompt = load_json(f"{base}/task_prompt.json") | |
| if prompt: | |
| summary_md = f"## {env} / {task}\n\n```json\n{json.dumps(prompt, indent=2)[:5000]}\n```" | |
| else: | |
| summary_md = f"## {env} / {task}\n\nNo task_prompt.json found." | |
| # Scratchpad | |
| scratchpad = load_text(f"{base}/scratchpad.md") | |
| if not scratchpad: | |
| scratchpad = "(no scratchpad.md found)" | |
| # Answer | |
| answer = load_json(f"{base}/answer.json") | |
| if answer: | |
| answer_md = f"```json\n{json.dumps(answer, indent=2)[:5000]}\n```" | |
| else: | |
| answer_md = "(no answer.json found)" | |
| # Instructions | |
| instructions = load_text(f"{base}/INSTRUCTIONS.md") | |
| if not instructions: | |
| instructions = "(no INSTRUCTIONS.md found)" | |
| return (summary_md, scratchpad, answer_md, instructions) | |
| # --------------------------------------------------------------------------- | |
| # Planning videos β recursive search across all subdirs | |
| # --------------------------------------------------------------------------- | |
| def _collect_video_dirs() -> list[str]: | |
| """Discover all subdirectories under planning_videos/ recursively.""" | |
| cache_key = "video_dirs" | |
| if cache_key in _env_cache: | |
| return _env_cache[cache_key] | |
| dirs = ["planning_videos"] | |
| to_visit = ["planning_videos"] | |
| visited = set() | |
| while to_visit: | |
| current = to_visit.pop() | |
| if current in visited: | |
| continue | |
| visited.add(current) | |
| items = safe_list_tree(current) | |
| for item in items: | |
| name = item.path.split("/")[-1] | |
| # If it looks like a directory (no file extension), add it | |
| if not any(name.endswith(ext) for ext in (".gif", ".png", ".json", ".csv", ".DS_Store")): | |
| dirs.append(item.path) | |
| to_visit.append(item.path) | |
| _env_cache[cache_key] = dirs | |
| return dirs | |
| def list_planning_videos(env: str) -> list[tuple[str, str]]: | |
| """Find all planning videos/images for an environment across all video folders.""" | |
| results = [] | |
| for dirpath in _collect_video_dirs(): | |
| items = safe_list_tree(dirpath) | |
| for item in items: | |
| fname = item.path.split("/")[-1] | |
| if env in fname and (fname.endswith(".gif") or fname.endswith(".png")): | |
| results.append((item.path, fname)) | |
| return results | |
| def list_all_video_envs() -> list[str]: | |
| """Get all unique environment names that have planning videos.""" | |
| cache_key = "video_envs" | |
| if cache_key in _env_cache: | |
| return _env_cache[cache_key] | |
| envs = set() | |
| for dirpath in _collect_video_dirs(): | |
| items = safe_list_tree(dirpath) | |
| for item in items: | |
| fname = item.path.split("/")[-1] | |
| if fname.endswith(".gif") or fname.endswith(".png"): | |
| # Extract env name: everything before _planning, _mfp, _cd, etc. | |
| for sep in ("_planning", "_mfp", "_cd"): | |
| if sep in fname: | |
| envs.add(fname.split(sep)[0]) | |
| break | |
| result = sorted(envs) | |
| _env_cache[cache_key] = result | |
| return result | |
| def load_planning_video_md(env: str) -> str: | |
| """Load planning video links as markdown.""" | |
| if not env: | |
| return "Enter an environment name." | |
| videos = list_planning_videos(env) | |
| if not videos: | |
| return f"No planning videos found for `{env}`." | |
| lines = [f"## Planning Videos for {env}\n"] | |
| for hf_path, fname in videos: | |
| # Group by subfolder | |
| parts = hf_path.split("/") | |
| subfolder = "/".join(parts[1:-1]) if len(parts) > 2 else "(root)" | |
| url = f"https://huggingface.co/datasets/{HF_REPO}/resolve/main/{hf_path}" | |
| if fname.endswith(".gif"): | |
| lines.append(f"### [{subfolder}] {fname}\n\n") | |
| else: | |
| lines.append(f"### [{subfolder}] {fname}\n\n") | |
| return "\n".join(lines) | |
| # --------------------------------------------------------------------------- | |
| # Code evolution | |
| # --------------------------------------------------------------------------- | |
| def show_round_code(pipeline: str, env: str, round_idx: int) -> str: | |
| if not pipeline or not env: | |
| return "" | |
| ptype = _pipeline_type(pipeline) | |
| if ptype == "adversarial": | |
| code = load_text(f"{pipeline}/{env}/round_{round_idx}/synthesizer_code.py") | |
| if not code: | |
| code = load_text(f"{pipeline}/{env}/round_{round_idx}/synthesizer/code/world.py") | |
| if not code and round_idx == 0: | |
| code = load_text(f"{pipeline}/{env}/final_world.py") | |
| if code: | |
| code = f"# (final_world.py β no per-round code found)\n{code}" | |
| if not code: | |
| code = load_text(f"{pipeline}/{env}/code/world.py") | |
| return code if code else "(no code found for this round)" | |
| elif ptype == "autumn_synth": | |
| # Show encoders for the env | |
| encoder_items = safe_list_tree(f"{pipeline}/{env}/encoders") | |
| encoder_names = [i.path.split("/")[-1] for i in encoder_items if i.path.endswith(".py")] | |
| if round_idx < len(encoder_names): | |
| return load_text(f"{pipeline}/{env}/encoders/{encoder_names[round_idx]}") | |
| return "(no more encoders to show)" | |
| elif ptype == "solver": | |
| # Show code from solver tasks | |
| tasks = list_solver_tasks(pipeline, env) | |
| if round_idx < len(tasks): | |
| task = tasks[round_idx] | |
| code_items = safe_list_tree(f"{pipeline}/{env}/{task}/code") | |
| py_files = [i for i in code_items if i.path.endswith(".py")] | |
| if py_files: | |
| return load_text(py_files[0].path) | |
| return "(no solver code found)" | |
| return "(unsupported pipeline type for code view)" | |
| # --------------------------------------------------------------------------- | |
| # Cross-pipeline comparison | |
| # --------------------------------------------------------------------------- | |
| def compare_env_across_pipelines(env: str) -> str: | |
| if not env: | |
| return "Enter an environment name." | |
| lines = [f"# {env} β Cross-Pipeline Comparison\n"] | |
| # Adversarial pipelines with experiment_summary | |
| adv_rows = [] | |
| for pipeline in discover_pipelines(): | |
| ptype = _pipeline_type(pipeline) | |
| if ptype == "adversarial": | |
| summary = load_json(f"{pipeline}/{env}/experiment_summary.json") | |
| if summary: | |
| label = pipeline_label(pipeline) | |
| n_rounds = summary.get("num_rounds", "?") | |
| match = summary.get("final_match_score", "?") | |
| cost = summary.get("total_cost_usd", 0) | |
| discs = sum(r.get("num_discrepancies", 0) for r in summary.get("rounds", [])) | |
| match_str = f"{match:.2f}" if isinstance(match, (int, float)) else str(match) | |
| cost_str = f"${cost:.2f}" if isinstance(cost, (int, float)) else str(cost) | |
| adv_rows.append(f"| {label} | {n_rounds} | {match_str} | {cost_str} | {discs} |") | |
| if adv_rows: | |
| lines.append("### Adversarial Synthesis\n") | |
| lines.append("| Pipeline | Rounds | Final Match | Cost | Discrepancies |") | |
| lines.append("|----------|--------|-------------|------|---------------|") | |
| lines.extend(adv_rows) | |
| lines.append("") | |
| # Solver pipelines | |
| solver_rows = [] | |
| for pipeline in discover_pipelines(): | |
| ptype = _pipeline_type(pipeline) | |
| if ptype == "solver": | |
| tasks = list_solver_tasks(pipeline, env) | |
| if tasks: | |
| label = pipeline_label(pipeline) | |
| solver_rows.append(f"| {label} | {', '.join(tasks)} |") | |
| if solver_rows: | |
| lines.append("### Solver Results\n") | |
| lines.append("| Pipeline | Tasks |") | |
| lines.append("|----------|-------|") | |
| lines.extend(solver_rows) | |
| lines.append("") | |
| # AutumnSynth | |
| autumn_rows = [] | |
| for pipeline in discover_pipelines(): | |
| ptype = _pipeline_type(pipeline) | |
| if ptype == "autumn_synth": | |
| items = safe_list_tree(f"{pipeline}/{env}") | |
| if items: | |
| label = pipeline_label(pipeline) | |
| subdirs = [f.path.split("/")[-1] for f in items | |
| if not any(f.path.endswith(ext) for ext in (".json", ".py", ".pkl", ".log")) | |
| and f.path.split("/")[-1] not in ("__pycache__", ".DS_Store")] | |
| autumn_rows.append(f"| {label} | {', '.join(sorted(subdirs)) if subdirs else '-'} |") | |
| if autumn_rows: | |
| lines.append("### AutumnSynth\n") | |
| lines.append("| Pipeline | Components |") | |
| lines.append("|----------|-----------|") | |
| lines.extend(autumn_rows) | |
| lines.append("") | |
| if not adv_rows and not solver_rows and not autumn_rows: | |
| lines.append(f"No results found for `{env}` in any pipeline.") | |
| return "\n".join(lines) | |
| # --------------------------------------------------------------------------- | |
| # Annotations | |
| # --------------------------------------------------------------------------- | |
| ANNOTATION_PATH = "annotations/annotations.json" | |
| _annotations_cache: dict | None = None | |
| def load_annotations() -> dict: | |
| global _annotations_cache | |
| if _annotations_cache is not None: | |
| return _annotations_cache | |
| data = load_json(ANNOTATION_PATH) | |
| _annotations_cache = data if isinstance(data, dict) else {} | |
| return _annotations_cache | |
| def save_annotation(pipeline: str, env: str, label: str, comment: str, reviewer: str) -> str: | |
| global _annotations_cache | |
| if not reviewer.strip(): | |
| return "Please enter your name / Slack handle." | |
| if not comment.strip(): | |
| return "Please enter a comment." | |
| existing = load_annotations().copy() | |
| key = f"{pipeline}/{env}" | |
| if key not in existing: | |
| existing[key] = [] | |
| existing[key].append({ | |
| "label": label, | |
| "comment": comment.strip(), | |
| "reviewer": reviewer.strip(), | |
| "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), | |
| }) | |
| content = json.dumps(existing, indent=2) | |
| try: | |
| _api.upload_file( | |
| path_or_fileobj=content.encode("utf-8"), | |
| path_in_repo=ANNOTATION_PATH, | |
| repo_id=HF_REPO, | |
| repo_type=REPO_TYPE, | |
| commit_message=f"Annotation: {label} on {key} by {reviewer.strip()}", | |
| ) | |
| _annotations_cache = existing | |
| total = sum(len(v) for v in existing.values()) | |
| return f"Saved to HuggingFace! Total annotations: {total}" | |
| except Exception as e: | |
| local_path = Path("annotations_local.json") | |
| local_path.write_text(content) | |
| _annotations_cache = existing | |
| return f"Saved locally (HF push failed: {e})." | |
| def format_annotations(pipeline: str, env: str) -> str: | |
| annotations = load_annotations() | |
| key = f"{pipeline}/{env}" | |
| entries = annotations.get(key, []) | |
| if not entries: | |
| return "No annotations yet." | |
| lines = [f"### Annotations ({len(entries)})\n"] | |
| for ann in entries: | |
| ts = ann.get("timestamp", "")[:10] | |
| lines.append(f"- **[{ann['label']}]** {ann['comment']} β _{ann['reviewer']}_ ({ts})") | |
| return "\n".join(lines) | |
| # --------------------------------------------------------------------------- | |
| # Play Synth World β interactive simulator | |
| # --------------------------------------------------------------------------- | |
| # CSS color name β hex mapping for grid rendering | |
| COLOR_MAP = { | |
| "black": "#111111", "white": "#ffffff", "red": "#ff0000", "green": "#00cc00", | |
| "blue": "#0066ff", "yellow": "#ffff00", "gold": "#ffd700", "orange": "#ff8800", | |
| "darkorange": "#ff8c00", "purple": "#9933ff", "mediumpurple": "#9370db", | |
| "gray": "#888888", "grey": "#888888", "brown": "#8b4513", "pink": "#ff69b4", | |
| "cyan": "#00cccc", "magenta": "#ff00ff", "lime": "#00ff00", | |
| "darkgreen": "#006400", "darkblue": "#00008b", "darkred": "#8b0000", | |
| "lightblue": "#add8e6", "lightgreen": "#90ee90", "maroon": "#800000", | |
| "olive": "#808000", "teal": "#008080", "navy": "#000080", | |
| } | |
| def _state_to_text_grid(state) -> tuple[list[list[str]], int]: | |
| """Convert world state (dict or str) to a 2D color matrix + grid_size.""" | |
| if isinstance(state, str): | |
| rows = [line.split() for line in state.strip().split("\n") if line.strip()] | |
| gs = len(rows) | |
| return rows, gs | |
| elif isinstance(state, dict): | |
| gs = state.get("GRID_SIZE", 16) | |
| matrix = [["black"] * gs for _ in range(gs)] | |
| for key, items in state.items(): | |
| if key == "GRID_SIZE" or not isinstance(items, list): | |
| continue | |
| for item in items: | |
| if not isinstance(item, dict): | |
| continue | |
| pos = item.get("position", item) | |
| x = pos.get("x", 0) | |
| y = pos.get("y", 0) | |
| if 0 <= x < gs and 0 <= y < gs: | |
| matrix[y][x] = item.get("color", key).lower() | |
| return matrix, gs | |
| return [["black"] * 16 for _ in range(16)], 16 | |
| def render_grid_html(state, step_num: int = 0, action: str = "") -> str: | |
| """Render a world state as an HTML table with clickable colored cells.""" | |
| matrix, gs = _state_to_text_grid(state) | |
| cell_px = max(16, min(40, 640 // gs)) | |
| html = '<div style="font-family:monospace;margin:8px 0" id="mara-grid-container">' | |
| if action: | |
| html += f'<div style="margin-bottom:4px"><b>Step {step_num}</b> β action: <code>{action}</code></div>' | |
| elif step_num == 0: | |
| html += '<div style="margin-bottom:4px"><b>Initial state</b> (after reset)</div>' | |
| html += '<table style="border-collapse:collapse;border:1px solid #444;cursor:crosshair">' | |
| for y, row in enumerate(matrix): | |
| html += "<tr>" | |
| for x, color in enumerate(row): | |
| hex_c = COLOR_MAP.get(color.lower(), color if color.startswith("#") else "#ff00ff") | |
| html += (f'<td data-x="{x}" data-y="{y}" ' | |
| f'style="width:{cell_px}px;height:{cell_px}px;' | |
| f'background:{hex_c};border:1px solid #333;padding:0" ' | |
| f'title="({x},{y}) {color}"></td>') | |
| html += "</tr>" | |
| html += "</table></div>" | |
| return html | |
| # Minimal stochastic base class for worlds that import it | |
| _STOCHASTIC_BASE = ''' | |
| import random as _random | |
| class StochasticWorld: | |
| def __init__(self, seed=42): | |
| self._rng = _random.Random(seed) | |
| self.params = {} | |
| def multinomial(self, options): | |
| items = list(options.items()) | |
| weights = [float(w) for _, w in items] | |
| total = sum(weights) | |
| r = self._rng.random() * total | |
| cumul = 0.0 | |
| for val, w in items: | |
| cumul += float(w) | |
| if r <= cumul: | |
| return val | |
| return items[-1][0] | |
| def uniform_int(self, lo, hi): | |
| return self._rng.randint(lo, hi) | |
| def bernoulli(self, p): | |
| return self._rng.random() < p | |
| def reseed(self, seed): | |
| self._rng = _random.Random(seed) | |
| class SamplingHandler: | |
| def __init__(self, seed=42): | |
| self._rng = _random.Random(seed) | |
| def multinomial(self, options): | |
| items = list(options.items()) | |
| weights = [float(w) for _, w in items] | |
| total = sum(weights) | |
| r = self._rng.random() * total | |
| cumul = 0.0 | |
| for val, w in items: | |
| cumul += float(w) | |
| if r <= cumul: | |
| return val | |
| return items[-1][0] | |
| def uniform_int(self, lo, hi): | |
| return self._rng.randint(lo, hi) | |
| def bernoulli(self, p): | |
| return self._rng.random() < p | |
| def reseed(self, seed): | |
| self._rng = _random.Random(seed) | |
| ''' | |
| def _load_world_from_code(code_text: str, seed: int = 42): | |
| """Load a SynthesizedWorld class from code text, exec it, return instance.""" | |
| # Write stochastic.py to a temp dir so imports work | |
| tmpdir = tempfile.mkdtemp(prefix="mara_play_") | |
| stochastic_path = Path(tmpdir) / "stochastic.py" | |
| stochastic_path.write_text(_STOCHASTIC_BASE) | |
| import sys | |
| if tmpdir not in sys.path: | |
| sys.path.insert(0, tmpdir) | |
| namespace = {"__builtins__": __builtins__} | |
| try: | |
| exec(compile(code_text, "<world.py>", "exec"), namespace) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to compile world code: {e}") | |
| finally: | |
| # Clean up sys.path but leave tmpdir for imports during runtime | |
| pass | |
| # Find the world class | |
| cls = namespace.get("SynthesizedWorld") | |
| if cls is None: | |
| for name, obj in namespace.items(): | |
| if isinstance(obj, type) and hasattr(obj, "reset") and hasattr(obj, "step"): | |
| cls = obj | |
| break | |
| if cls is None: | |
| raise RuntimeError("No SynthesizedWorld class found in the code.") | |
| return cls(seed=seed) | |
| # Session state for play tab | |
| _play_sessions: dict[str, dict] = {} | |
| def play_load_world(pipeline: str, env: str, seed: int) -> tuple[str, str, str]: | |
| """Load a world from HF and return (grid_html, status, code).""" | |
| if not pipeline or not env: | |
| return ("", "Select a pipeline and environment.", "") | |
| # Try to find world code | |
| code = load_text(f"{pipeline}/{env}/final_world.py") | |
| if not code: | |
| code = load_text(f"{pipeline}/{env}/code/world.py") | |
| if not code: | |
| # For solver pipelines, try the synthesized code from the adversarial prefix | |
| return ("", f"No world.py found for {pipeline}/{env}.", "") | |
| try: | |
| world = _load_world_from_code(code, seed=int(seed)) | |
| state = world.reset() | |
| except Exception as e: | |
| tb = traceback.format_exc() | |
| return ("", f"Error loading world: {e}\n\n```\n{tb[-1000:]}\n```", code) | |
| session_key = f"{pipeline}/{env}" | |
| _play_sessions[session_key] = { | |
| "world": world, | |
| "state": state, | |
| "step": 0, | |
| "history": [], | |
| } | |
| grid_html = render_grid_html(state, step_num=0) | |
| return (grid_html, f"World loaded! Grid ready. Use action buttons to step.", code) | |
| def play_step(pipeline: str, env: str, action: str) -> tuple[str, str]: | |
| """Execute one step and return (grid_html, status).""" | |
| session_key = f"{pipeline}/{env}" | |
| session = _play_sessions.get(session_key) | |
| if not session: | |
| return ("", "No world loaded. Click 'Load World' first.") | |
| try: | |
| state = session["world"].step(action) | |
| session["state"] = state | |
| session["step"] += 1 | |
| session["history"].append(action) | |
| except Exception as e: | |
| return (render_grid_html(session["state"], session["step"], f"ERROR: {action}"), | |
| f"Error on step: {e}") | |
| grid_html = render_grid_html(state, step_num=session["step"], action=action) | |
| return (grid_html, f"Step {session['step']} β action: {action}") | |
| def play_reset(pipeline: str, env: str, seed: int) -> tuple[str, str]: | |
| """Reset the world and return (grid_html, status).""" | |
| session_key = f"{pipeline}/{env}" | |
| session = _play_sessions.get(session_key) | |
| if not session: | |
| return ("", "No world loaded. Click 'Load World' first.") | |
| try: | |
| if hasattr(session["world"], "reseed"): | |
| session["world"].reseed(int(seed)) | |
| state = session["world"].reset() | |
| session["state"] = state | |
| session["step"] = 0 | |
| session["history"] = [] | |
| except Exception as e: | |
| return ("", f"Error on reset: {e}") | |
| grid_html = render_grid_html(state, step_num=0) | |
| return (grid_html, "World reset.") | |
| def play_random_steps(pipeline: str, env: str, n_steps: int) -> tuple[str, str]: | |
| """Execute N random actions and return (grid_html, status).""" | |
| session_key = f"{pipeline}/{env}" | |
| session = _play_sessions.get(session_key) | |
| if not session: | |
| return ("", "No world loaded. Click 'Load World' first.") | |
| rng = random.Random() | |
| gs = 16 | |
| state = session["state"] | |
| if isinstance(state, dict): | |
| gs = state.get("GRID_SIZE", 16) | |
| actions_taken = [] | |
| for _ in range(int(n_steps)): | |
| action = rng.choice(["noop", "left", "right", "up", "down", "click"]) | |
| if action == "click": | |
| action += f" {rng.randint(0, gs - 1)} {rng.randint(0, gs - 1)}" | |
| try: | |
| state = session["world"].step(action) | |
| session["state"] = state | |
| session["step"] += 1 | |
| session["history"].append(action) | |
| actions_taken.append(action) | |
| except Exception as e: | |
| grid_html = render_grid_html(session["state"], session["step"], f"ERROR on {action}") | |
| return (grid_html, f"Error after {len(actions_taken)} steps: {e}") | |
| grid_html = render_grid_html(state, step_num=session["step"], action=actions_taken[-1] if actions_taken else "") | |
| return (grid_html, f"Executed {len(actions_taken)} random steps. Total: {session['step']}") | |
| # --------------------------------------------------------------------------- | |
| # Gradio App | |
| # --------------------------------------------------------------------------- | |
| def build_app() -> gr.Blocks: | |
| pipelines = discover_pipelines() | |
| _keyboard_js = """ | |
| function() { | |
| if (window._maraKeysAttached) return; | |
| window._maraKeysAttached = true; | |
| // Helper: set value on a Gradio textbox and trigger change | |
| function setGradioValue(elemId, value) { | |
| var container = document.getElementById(elemId); | |
| if (!container) return; | |
| var el = container.querySelector('textarea') || container.querySelector('input'); | |
| if (!el) return; | |
| // Use native setter to bypass React/Svelte wrappers | |
| var setter = Object.getOwnPropertyDescriptor( | |
| HTMLTextAreaElement.prototype, 'value' | |
| ); | |
| if (!setter) setter = Object.getOwnPropertyDescriptor( | |
| HTMLInputElement.prototype, 'value' | |
| ); | |
| if (setter && setter.set) setter.set.call(el, value); | |
| el.dispatchEvent(new Event('input', {bubbles: true})); | |
| el.dispatchEvent(new Event('change', {bubbles: true})); | |
| } | |
| // Keyboard shortcuts β capture phase to beat browser scroll | |
| document.addEventListener('keydown', function(e) { | |
| var tag = (e.target || e.srcElement).tagName; | |
| var editable = (e.target || e.srcElement).isContentEditable; | |
| if (tag === 'INPUT' || tag === 'TEXTAREA' || tag === 'SELECT' || editable) return; | |
| var btnId = null; | |
| switch(e.key) { | |
| case 'ArrowUp': btnId = 'btn_up'; break; | |
| case 'ArrowDown': btnId = 'btn_down'; break; | |
| case 'ArrowLeft': btnId = 'btn_left'; break; | |
| case 'ArrowRight': btnId = 'btn_right'; break; | |
| case ' ': btnId = 'btn_noop'; break; | |
| case 'r': case 'R': btnId = 'btn_reset'; break; | |
| case 'n': case 'N': btnId = 'btn_random'; break; | |
| default: return; | |
| } | |
| e.preventDefault(); | |
| e.stopPropagation(); | |
| var btn = document.getElementById(btnId); | |
| if (btn) btn.click(); | |
| }, true); // true = capture phase | |
| // Grid click β use event delegation on the document | |
| // Gradio sanitizes onclick attrs, so we listen for clicks on <td> with data-x/data-y | |
| document.addEventListener('click', function(e) { | |
| var td = e.target.closest('td[data-x][data-y]'); | |
| if (!td) return; | |
| var x = td.getAttribute('data-x'); | |
| var y = td.getAttribute('data-y'); | |
| if (x !== null && y !== null) { | |
| // Use timestamp to force change event even if same cell clicked twice | |
| setGradioValue('grid_click_input', x + ' ' + y + ' ' + Date.now()); | |
| } | |
| }); | |
| } | |
| """ | |
| with gr.Blocks(title="MARA Results Explorer", theme=gr.themes.Soft(), js=_keyboard_js) as app: | |
| gr.Markdown( | |
| "# MARA Results Explorer\n\n" | |
| "Browse and critique world model synthesis results from " | |
| "[Basis-MARA/mara-adversarial-results]" | |
| "(https://huggingface.co/datasets/Basis-MARA/mara-adversarial-results). " | |
| "Select a pipeline, pick an environment, and explore the agent's reasoning, " | |
| "code evolution, and discrepancies.\n" | |
| ) | |
| def update_env_choices(pipeline): | |
| envs = list_envs_for_pipeline(pipeline) | |
| return gr.update(choices=envs, value=envs[0] if envs else None) | |
| with gr.Tabs(): | |
| # ββ Tab 1: Overview ββ | |
| with gr.Tab("Overview"): | |
| gr.Markdown("High-level view of all environments in a pipeline. " | |
| "Adapts columns based on pipeline type (adversarial / solver / autumn_synth).") | |
| overview_pipeline = gr.Dropdown(choices=pipelines, label="Pipeline", | |
| value=pipelines[0] if pipelines else None) | |
| overview_output = gr.Markdown() | |
| overview_pipeline.change(build_overview, inputs=overview_pipeline, outputs=overview_output) | |
| # ββ Tab 2: Environment Detail ββ | |
| with gr.Tab("Environment Detail"): | |
| gr.Markdown("Detailed view of one environment. Content adapts to pipeline type:\n" | |
| "- **Adversarial**: Summary, final code, discrepancies, per-round metrics\n" | |
| "- **Solver**: Task prompts, scratchpads, answers, instructions\n" | |
| "- **AutumnSynth**: Coverage report, encoders, transitions, logs") | |
| with gr.Row(): | |
| detail_pipeline = gr.Dropdown(choices=pipelines, label="Pipeline") | |
| detail_env = gr.Dropdown(choices=[], label="Environment") | |
| detail_pipeline.change(update_env_choices, inputs=detail_pipeline, outputs=detail_env) | |
| load_btn = gr.Button("Load", variant="primary") | |
| # Dynamic sub-tabs β labels change based on pipeline type | |
| with gr.Tabs(): | |
| with gr.Tab("Summary / Prompts"): | |
| detail_summary = gr.Markdown() | |
| with gr.Tab("Code / Scratchpads"): | |
| detail_code = gr.Markdown() | |
| with gr.Tab("Discrepancies / Answers"): | |
| detail_disc = gr.Markdown() | |
| with gr.Tab("Rounds / Instructions"): | |
| detail_rounds = gr.Markdown() | |
| load_btn.click( | |
| load_env_detail, | |
| inputs=[detail_pipeline, detail_env], | |
| outputs=[detail_summary, detail_code, detail_disc, detail_rounds], | |
| ) | |
| # ββ Tab 3: Agent Reasoning ββ | |
| with gr.Tab("Agent Reasoning"): | |
| gr.Markdown("View the agent's chain-of-thought, tool calls, and reasoning.\n\n" | |
| "- **Adversarial pipelines**: Select round + challenger/synthesizer\n" | |
| "- **Solver pipelines**: Select task type (cd/mfp/planning) to see scratchpad") | |
| with gr.Row(): | |
| reason_pipeline = gr.Dropdown(choices=pipelines, label="Pipeline") | |
| reason_env = gr.Dropdown(choices=[], label="Environment") | |
| reason_pipeline.change(update_env_choices, inputs=reason_pipeline, outputs=reason_env) | |
| with gr.Row(): | |
| reason_round = gr.Slider(0, 19, step=1, value=0, label="Round (adversarial only)") | |
| reason_agent = gr.Radio( | |
| ["challenger", "synthesizer", "cd", "mfp", "planning"], | |
| value="challenger", | |
| label="Agent / Task Type" | |
| ) | |
| reason_btn = gr.Button("Load Reasoning", variant="primary") | |
| reason_output = gr.Markdown() | |
| reason_btn.click( | |
| load_agent_reasoning, | |
| inputs=[reason_pipeline, reason_env, reason_round, reason_agent], | |
| outputs=reason_output, | |
| ) | |
| # ββ Tab 4: Code Evolution ββ | |
| with gr.Tab("Code Evolution"): | |
| gr.Markdown("Step through synthesized code versions.\n\n" | |
| "- **Adversarial**: Code per round (synthesizer_code.py)\n" | |
| "- **AutumnSynth**: Encoders (one per slider step)\n" | |
| "- **Solver**: Code from each task type") | |
| with gr.Row(): | |
| evo_pipeline = gr.Dropdown(choices=pipelines, label="Pipeline") | |
| evo_env = gr.Dropdown(choices=[], label="Environment") | |
| evo_pipeline.change(update_env_choices, inputs=evo_pipeline, outputs=evo_env) | |
| round_slider = gr.Slider(0, 19, step=1, value=0, label="Round / Index") | |
| evo_code = gr.Code(language="python", label="Code at this round") | |
| round_slider.change( | |
| show_round_code, | |
| inputs=[evo_pipeline, evo_env, round_slider], | |
| outputs=evo_code, | |
| ) | |
| evo_env.change( | |
| lambda p, e: show_round_code(p, e, 0), | |
| inputs=[evo_pipeline, evo_env], | |
| outputs=evo_code, | |
| ) | |
| # ββ Tab 5: Play Synth World ββ | |
| with gr.Tab("Play World"): | |
| gr.Markdown("### Interactive World Simulator\n\n" | |
| "Load a synthesized `world.py` from any pipeline and " | |
| "step through it interactively.\n\n" | |
| "**Keyboard:** Arrow keys = move, Space = noop, R = reset, " | |
| "N = 10 random steps. **Click on grid cells** to send click actions.") | |
| with gr.Row(): | |
| play_pipeline = gr.Dropdown(choices=pipelines, label="Pipeline") | |
| play_env = gr.Dropdown(choices=[], label="Environment") | |
| play_seed = gr.Number(value=42, label="Seed", precision=0) | |
| play_load_btn = gr.Button("Load World", variant="primary") | |
| play_pipeline.change(update_env_choices, inputs=play_pipeline, outputs=play_env) | |
| play_status = gr.Markdown("Select a pipeline/env and click Load World.") | |
| # Grid display β full width, clickable cells | |
| play_grid = gr.HTML(label="Grid") | |
| # Hidden textbox that receives grid click coordinates from JS | |
| grid_click_input = gr.Textbox(visible=False, elem_id="grid_click_input") | |
| # Controls β full-width rows | |
| with gr.Row(): | |
| btn_left = gr.Button("β Left", elem_id="btn_left") | |
| btn_up = gr.Button("β Up", elem_id="btn_up") | |
| btn_down = gr.Button("β Down", elem_id="btn_down") | |
| btn_right = gr.Button("β Right", elem_id="btn_right") | |
| btn_noop = gr.Button("Noop (Space)", elem_id="btn_noop") | |
| with gr.Row(): | |
| random_n = gr.Slider(1, 50, value=10, step=1, label="N random steps") | |
| btn_random = gr.Button("Run Random (N)", elem_id="btn_random") | |
| btn_reset = gr.Button("Reset (R)", elem_id="btn_reset") | |
| with gr.Accordion("World Code", open=False): | |
| play_code_view = gr.Code(language="python", label="world.py (read-only)", interactive=False) | |
| # Wire up | |
| play_load_btn.click( | |
| play_load_world, | |
| inputs=[play_pipeline, play_env, play_seed], | |
| outputs=[play_grid, play_status, play_code_view], | |
| ) | |
| for btn, action_str in [ | |
| (btn_left, "left"), (btn_right, "right"), | |
| (btn_up, "up"), (btn_down, "down"), (btn_noop, "noop"), | |
| ]: | |
| btn.click( | |
| lambda p, e, a=action_str: play_step(p, e, a), | |
| inputs=[play_pipeline, play_env], | |
| outputs=[play_grid, play_status], | |
| ) | |
| # Grid cell click β JS writes "x y timestamp" to hidden textbox | |
| def _handle_grid_click(pipeline, env, coords): | |
| if not coords or not coords.strip(): | |
| return gr.update(), "" | |
| parts = coords.strip().split() | |
| if len(parts) >= 2: | |
| return play_step(pipeline, env, f"click {parts[0]} {parts[1]}") | |
| return gr.update(), "" | |
| grid_click_input.change( | |
| _handle_grid_click, | |
| inputs=[play_pipeline, play_env, grid_click_input], | |
| outputs=[play_grid, play_status], | |
| ) | |
| btn_random.click( | |
| play_random_steps, | |
| inputs=[play_pipeline, play_env, random_n], | |
| outputs=[play_grid, play_status], | |
| ) | |
| btn_reset.click( | |
| play_reset, | |
| inputs=[play_pipeline, play_env, play_seed], | |
| outputs=[play_grid, play_status], | |
| ) | |
| # ββ Tab 6: Solver Tasks ββ | |
| with gr.Tab("Solver Tasks"): | |
| gr.Markdown("Dedicated solver task viewer. Select a `*_solver_*` or `direct_solver_*` pipeline.") | |
| with gr.Row(): | |
| solver_pipeline = gr.Dropdown(choices=pipelines, label="Pipeline") | |
| solver_env = gr.Dropdown(choices=[], label="Environment") | |
| solver_task = gr.Dropdown(choices=[], label="Task") | |
| solver_pipeline.change(update_env_choices, inputs=solver_pipeline, outputs=solver_env) | |
| def update_solver_tasks(pipeline, env): | |
| tasks = list_solver_tasks(pipeline, env) if pipeline and env else [] | |
| return gr.update(choices=tasks, value=tasks[0] if tasks else None) | |
| solver_env.change(update_solver_tasks, inputs=[solver_pipeline, solver_env], outputs=solver_task) | |
| solver_btn = gr.Button("Load", variant="primary") | |
| with gr.Tabs(): | |
| with gr.Tab("Task Prompt"): | |
| solver_summary = gr.Markdown() | |
| with gr.Tab("Scratchpad"): | |
| solver_scratchpad = gr.Markdown() | |
| with gr.Tab("Answer"): | |
| solver_answer = gr.Markdown() | |
| with gr.Tab("Instructions"): | |
| solver_instructions = gr.Markdown() | |
| solver_btn.click( | |
| load_solver_detail, | |
| inputs=[solver_pipeline, solver_env, solver_task], | |
| outputs=[solver_summary, solver_scratchpad, solver_answer, solver_instructions], | |
| ) | |
| # ββ Tab 6: Planning Videos ββ | |
| with gr.Tab("Planning Videos"): | |
| gr.Markdown("View planning execution videos (GIFs) and comparison images.\n\n" | |
| "Videos are organized across multiple subdirectories: " | |
| "root, direct_solver, direct_solver_v2, real_env, stochastic, stochastic/real_env.") | |
| video_env = gr.Dropdown(choices=[], label="Environment", | |
| allow_custom_value=True) | |
| video_btn = gr.Button("Load Videos", variant="primary") | |
| video_output = gr.Markdown() | |
| # Populate env dropdown on app load | |
| def populate_video_envs(): | |
| envs = list_all_video_envs() | |
| return gr.update(choices=envs, value=envs[0] if envs else None) | |
| video_btn.click(load_planning_video_md, inputs=video_env, outputs=video_output) | |
| # ββ Tab 7: Compare Pipelines ββ | |
| with gr.Tab("Compare Pipelines"): | |
| gr.Markdown("Compare results for one environment across all pipelines.") | |
| compare_env = gr.Textbox(label="Environment name", placeholder="mario") | |
| compare_btn = gr.Button("Compare", variant="primary") | |
| compare_output = gr.Markdown() | |
| compare_btn.click(compare_env_across_pipelines, inputs=compare_env, outputs=compare_output) | |
| # ββ Tab 8: Annotate ββ | |
| with gr.Tab("Annotate"): | |
| gr.Markdown("### Critique and annotate runs\n" | |
| "Annotations are persisted to the HuggingFace dataset repo.") | |
| with gr.Row(): | |
| ann_pipeline = gr.Dropdown(choices=pipelines, label="Pipeline") | |
| ann_env = gr.Dropdown(choices=[], label="Environment") | |
| ann_pipeline.change(update_env_choices, inputs=ann_pipeline, outputs=ann_env) | |
| existing_annotations = gr.Markdown("Select an environment to see annotations.") | |
| def show_annotations(pipeline, env): | |
| return format_annotations(pipeline, env) if pipeline and env else "" | |
| ann_env.change(show_annotations, inputs=[ann_pipeline, ann_env], outputs=existing_annotations) | |
| gr.Markdown("---\n#### Add annotation") | |
| ann_label = gr.Dropdown( | |
| choices=["correct-rule", "wrong-rule", "missing-rule", "lookup-table", | |
| "information-leak", "wrong-ontology", "good-exploration", | |
| "bad-exploration", "general"], | |
| label="Label", | |
| ) | |
| ann_comment = gr.Textbox(label="Comment", lines=3, | |
| placeholder="e.g., R3 says gray moves up but it actually chases red") | |
| ann_reviewer = gr.Textbox(label="Your name / Slack handle") | |
| ann_btn = gr.Button("Submit Annotation", variant="primary") | |
| ann_status = gr.Textbox(label="Status", interactive=False) | |
| def submit_and_refresh(pipeline, env, label, comment, reviewer): | |
| status = save_annotation(pipeline, env, label, comment, reviewer) | |
| updated = format_annotations(pipeline, env) | |
| return status, updated | |
| ann_btn.click( | |
| submit_and_refresh, | |
| inputs=[ann_pipeline, ann_env, ann_label, ann_comment, ann_reviewer], | |
| outputs=[ann_status, existing_annotations], | |
| ) | |
| return app | |
| def main(): | |
| parser = argparse.ArgumentParser(description="MARA Results Explorer") | |
| parser.add_argument("--port", type=int, default=7860) | |
| parser.add_argument("--share", action="store_true") | |
| args = parser.parse_args() | |
| app = build_app() | |
| app.launch(server_port=args.port, share=args.share) | |
| if __name__ == "__main__": | |
| main() | |