#!/usr/bin/env python3 """ Live progress monitor for Frontier-CS parallel experiments. Scans result directories and prints a refreshing status table showing each problem's current generation, best score, and latest score. Usage: python scripts/dev/monitor_frontier_cs.py # default results dir python scripts/dev/monitor_frontier_cs.py --interval 10 # refresh every 10s python scripts/dev/monitor_frontier_cs.py --results-dir path # custom dir """ import argparse import json import os import sys import time from pathlib import Path def _scan_problem_dir(exp_dir: Path, pid: str): """Scan a single problem directory and return its status.""" gen_dirs = sorted(exp_dir.glob("gen_*"), key=lambda p: int(p.name.split("_")[1])) n_gens = len(gen_dirs) if n_gens == 0: return {"pid": pid, "gens": 0, "best": 0.0, "latest": 0.0, "status": "starting"} best_score = 0.0 latest_score = 0.0 latest_gen = 0 for g in gen_dirs: metrics_path = g / "results" / "metrics.json" if not metrics_path.exists(): continue try: with open(metrics_path) as f: m = json.load(f) score = m.get("combined_score", 0.0) gen_num = int(g.name.split("_")[1]) if score > best_score: best_score = score if gen_num >= latest_gen: latest_gen = gen_num latest_score = score except (json.JSONDecodeError, ValueError): continue status = "done" if n_gens >= 50 else "running" return { "pid": pid, "gens": n_gens, "best": best_score, "latest": latest_score, "latest_gen": latest_gen, "status": status, } def scan_experiments(results_root: Path): """Scan experiment directories and return status for each problem. Supports both layouts: - New: results_root is a run dir containing p0/, p1/, ... - Legacy: results_root contains p0_batch_g50_xxx/, p1_agent_g50_xxx/, ... """ rows = [] # Detect layout: if subdirs match p, it's a run dir (new layout) subdirs = sorted(d for d in results_root.iterdir() if d.is_dir()) has_problem_dirs = any(d.name.startswith("p") and d.name[1:].isdigit() for d in subdirs) if has_problem_dirs: # New layout: results_root/p0/, p1/, ... for d in subdirs: if d.name.startswith("p") and d.name[1:].isdigit(): rows.append(_scan_problem_dir(d, d.name)) else: # Legacy layout: results_root/p0_batch_g50_xxx/ for d in subdirs: parts = d.name.split("_") pid = parts[0] if parts else d.name rows.append(_scan_problem_dir(d, pid)) return rows def print_table(rows, total_target_gens=50): """Print a formatted status table.""" # Clear screen print("\033[2J\033[H", end="") now = time.strftime("%H:%M:%S") running = sum(1 for r in rows if r["status"] == "running") done = sum(1 for r in rows if r["status"] == "done") total = len(rows) print(f"Frontier-CS Progress Monitor [{now}] running={running} done={done} total={total}") print("=" * 72) print(f"{'Problem':<10} {'Gen':>6} {'Progress':>10} {'Best':>8} {'Latest':>8} {'Status':<8}") print("-" * 72) for r in rows: gens = r["gens"] pct = f"{gens}/{total_target_gens}" bar_len = 8 filled = int(bar_len * gens / total_target_gens) if total_target_gens > 0 else 0 bar = "#" * filled + "." * (bar_len - filled) status_str = r["status"] best = f"{r['best']:.1f}" if r["best"] > 0 else "-" latest = f"{r['latest']:.1f}" if r["latest"] > 0 else "-" print(f"{r['pid']:<10} {gens:>6} {pct:>6} {bar} {best:>8} {latest:>8} {status_str:<8}") print("-" * 72) # Summary stats scores = [r["best"] for r in rows if r["best"] > 0] if scores: avg = sum(scores) / len(scores) print(f"Avg best score: {avg:.2f} (across {len(scores)} problems with score > 0)") def main(): parser = argparse.ArgumentParser(description="Monitor Frontier-CS experiments") parser.add_argument("--results-dir", type=str, default="results/frontier_cs_algorithmic", help="Results root directory") parser.add_argument("--interval", type=int, default=30, help="Refresh interval in seconds") parser.add_argument("--once", action="store_true", help="Print once and exit (no loop)") args = parser.parse_args() results_root = Path(args.results_dir) if not results_root.exists(): print(f"Results directory not found: {results_root}") sys.exit(1) if args.once: rows = scan_experiments(results_root) print_table(rows) return try: while True: rows = scan_experiments(results_root) print_table(rows) time.sleep(args.interval) except KeyboardInterrupt: print("\nMonitor stopped.") if __name__ == "__main__": main()