| """ |
| Replay viewer for completed SkyDiscover runs. |
| |
| Loads checkpoint data and serves the live monitor dashboard |
| for interactive exploration of past runs. |
| |
| Usage: |
| python -m skydiscover.extras.monitor.viewer <path> [--port PORT] [--summary-model MODEL] |
| |
| <path> can be: |
| - A checkpoint directory (contains metadata.json + programs/) |
| - An output directory (contains island/checkpoints/ or checkpoints/) |
| - A directory containing program JSON files |
| """ |
|
|
| import argparse |
| import json |
| import logging |
| import os |
| import sys |
| import time |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| def _ckpt_num(name: str) -> int: |
| try: |
| return int(name.split("_")[-1]) |
| except (ValueError, IndexError): |
| return 0 |
|
|
|
|
| def find_checkpoint_dir(path: str) -> Optional[str]: |
| """Auto-detect the best checkpoint directory from *path*.""" |
| p = Path(path) |
|
|
| |
| if (p / "metadata.json").exists() and (p / "programs").is_dir(): |
| return str(p) |
|
|
| |
| if (p / "programs").is_dir() and list((p / "programs").glob("*.json")): |
| return str(p) |
|
|
| |
| ckpts = sorted(p.glob("checkpoint_*"), key=lambda x: _ckpt_num(x.name)) |
| if ckpts: |
| return str(ckpts[-1]) |
|
|
| |
| if (p / "checkpoints").is_dir(): |
| ckpts = sorted( |
| (p / "checkpoints").glob("checkpoint_*"), |
| key=lambda x: _ckpt_num(x.name), |
| ) |
| if ckpts: |
| return str(ckpts[-1]) |
|
|
| |
| for subdir in sorted(p.iterdir()): |
| if subdir.is_dir(): |
| ckpt_dir = subdir / "checkpoints" |
| if ckpt_dir.is_dir(): |
| ckpts = sorted( |
| ckpt_dir.glob("checkpoint_*"), |
| key=lambda x: _ckpt_num(x.name), |
| ) |
| if ckpts: |
| return str(ckpts[-1]) |
|
|
| |
| jsons = [j for j in p.glob("*.json") if j.name != "metadata.json"] |
| if jsons: |
| return str(p) |
|
|
| return None |
|
|
|
|
| def load_programs(ckpt_dir: str) -> Tuple[List[Dict], Optional[str], int]: |
| """Load programs from a checkpoint directory. |
| |
| Returns: |
| (programs_list_sorted_by_iteration, best_program_id, last_iteration) |
| """ |
| p = Path(ckpt_dir) |
| programs: Dict[str, Dict] = {} |
| best_program_id: Optional[str] = None |
| last_iteration = 0 |
|
|
| |
| meta_path = p / "metadata.json" |
| if meta_path.exists(): |
| with open(meta_path) as f: |
| meta = json.load(f) |
| best_program_id = meta.get("best_program_id") |
| last_iteration = meta.get("last_iteration", 0) |
|
|
| |
| programs_dir = p / "programs" |
| if programs_dir.is_dir(): |
| for jf in programs_dir.glob("*.json"): |
| try: |
| with open(jf) as f: |
| data = json.load(f) |
| programs[data["id"]] = data |
| except Exception as e: |
| logger.warning(f"Skipping {jf.name}: {e}") |
| else: |
| |
| for jf in p.glob("*.json"): |
| if jf.name == "metadata.json": |
| continue |
| try: |
| with open(jf) as f: |
| data = json.load(f) |
| if "id" in data: |
| programs[data["id"]] = data |
| except Exception: |
| logger.debug("Failed to load program from %s", jf, exc_info=True) |
|
|
| |
| if not best_program_id and programs: |
| best_score = -float("inf") |
| for pid, prog in programs.items(): |
| s = (prog.get("metrics") or {}).get("combined_score", 0) |
| if isinstance(s, (int, float)) and s > best_score: |
| best_score = s |
| best_program_id = pid |
|
|
| prog_list = sorted(programs.values(), key=lambda x: x.get("iteration_found", 0)) |
| return prog_list, best_program_id, last_iteration |
|
|
|
|
| def _to_monitor_format(prog: Dict, all_progs: Dict[str, Dict]) -> Dict: |
| """Convert checkpoint program dict → monitor event program dict.""" |
| metrics = prog.get("metrics") or {} |
| score = metrics.get("combined_score", 0.0) |
| if not isinstance(score, (int, float)): |
| score = 0.0 |
|
|
| parent_id = prog.get("parent_id") |
| parent_score = None |
| parent_iter = None |
| if parent_id and parent_id in all_progs: |
| pm = all_progs[parent_id].get("metrics") or {} |
| parent_score = pm.get("combined_score") |
| parent_iter = all_progs[parent_id].get("iteration_found") |
|
|
| context_ids = prog.get("other_context_ids") or [] |
| context_scores = [] |
| for cid in context_ids: |
| if cid in all_progs: |
| cm = all_progs[cid].get("metrics") or {} |
| context_scores.append(cm.get("combined_score")) |
| else: |
| context_scores.append(None) |
|
|
| |
| label_type = "unknown" |
| pi = prog.get("parent_info") |
| if pi and isinstance(pi, (list, tuple)) and len(pi) >= 1: |
| ls = str(pi[0]).lower() |
| if "diverge" in ls: |
| label_type = "diverge" |
| elif "refine" in ls: |
| label_type = "refine" |
| elif "crossover" in ls: |
| label_type = "crossover" |
| if label_type == "unknown": |
| label_type = (prog.get("metadata") or {}).get("label_type", "unknown") |
|
|
| island = (prog.get("metadata") or {}).get("island") |
| image_path = (prog.get("metadata") or {}).get("image_path") |
| solution = prog.get("solution", "") |
|
|
| from skydiscover.extras.monitor.callback import _safe_metrics |
|
|
| return { |
| "id": prog["id"], |
| "iteration": prog.get("iteration_found", 0), |
| "score": score, |
| "metrics": _safe_metrics(metrics), |
| "parent_id": parent_id, |
| "parent_score": parent_score, |
| "parent_iter": parent_iter, |
| "context_ids": context_ids, |
| "context_scores": context_scores, |
| "label_type": label_type, |
| "solution_snippet": solution[:500], |
| "island": island, |
| "generation": prog.get("generation", 0), |
| "image_path": image_path, |
| } |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser( |
| description="Replay viewer for completed SkyDiscover runs", |
| ) |
| parser.add_argument("path", help="Output directory or checkpoint path") |
| parser.add_argument("--port", type=int, default=8765) |
| parser.add_argument("--host", default="127.0.0.1") |
| parser.add_argument( |
| "--summary-model", |
| default="", |
| help="LLM model for per-program summaries (default: gpt-5-mini). " |
| "Requires OPENAI_API_KEY env var.", |
| ) |
| args = parser.parse_args() |
|
|
| |
| ckpt_dir = find_checkpoint_dir(args.path) |
| if not ckpt_dir: |
| print(f"Error: no checkpoint data found in '{args.path}'") |
| sys.exit(1) |
|
|
| logger.info(f"Loading from: {ckpt_dir}") |
| prog_list, best_id, last_iter = load_programs(ckpt_dir) |
| if not prog_list: |
| print(f"Error: no programs found in '{ckpt_dir}'") |
| sys.exit(1) |
|
|
| logger.info(f"Loaded {len(prog_list)} programs (best={best_id}, last_iter={last_iter})") |
|
|
| all_progs = {p["id"]: p for p in prog_list} |
| monitor_programs = [_to_monitor_format(p, all_progs) for p in prog_list] |
|
|
| |
| from skydiscover.extras.monitor.server import MonitorServer |
|
|
| server = MonitorServer(host=args.host, port=args.port) |
|
|
| |
| summary_model = args.summary_model |
| if not summary_model and os.environ.get("OPENAI_API_KEY"): |
| summary_model = "gpt-5-mini" |
| if summary_model: |
| server.configure_summary(model=summary_model, interval=0) |
|
|
| server.start() |
|
|
| |
| best_score = -float("inf") |
| for mp in monitor_programs: |
| pid = mp["id"] |
| s = mp["score"] |
| is_best = (pid == best_id) or (s > best_score) |
| if s > best_score: |
| best_score = s |
|
|
| solution = all_progs[pid].get("solution", "") |
| parent_solution = "" |
| if mp["parent_id"] and mp["parent_id"] in all_progs: |
| parent_solution = all_progs[mp["parent_id"]].get("solution", "") |
|
|
| server.push_event( |
| { |
| "type": "new_program", |
| "program": mp, |
| "stats": { |
| "total_programs": len(monitor_programs), |
| "current_iteration": last_iter, |
| "best_score": best_score, |
| "iterations_since_improvement": 0, |
| "programs_per_min": 0, |
| "elapsed_seconds": 0, |
| }, |
| "is_best": is_best, |
| "full_solution": solution[: server.max_solution_length], |
| "parent_full_solution": parent_solution[: server.max_solution_length], |
| } |
| ) |
|
|
| |
| time.sleep(1.5) |
|
|
| print(f"\n Dashboard ready at http://localhost:{args.port}/") |
| print(f" {len(prog_list)} programs loaded from {ckpt_dir}") |
| if summary_model: |
| print(f" Per-program summaries: {summary_model}") |
| else: |
| print(" Per-program summaries: disabled (set OPENAI_API_KEY or --summary-model)") |
| print(" Press Ctrl+C to stop\n") |
|
|
| try: |
| while True: |
| time.sleep(1) |
| except KeyboardInterrupt: |
| pass |
|
|
| server.stop() |
| print("Stopped.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|