| |
| """ |
| Live progress monitor for Frontier-CS parallel experiments. |
| |
| Scans result directories and prints a refreshing status table showing |
| each problem's current generation, best score, and latest score. |
| |
| Usage: |
| python scripts/dev/monitor_frontier_cs.py # default results dir |
| python scripts/dev/monitor_frontier_cs.py --interval 10 # refresh every 10s |
| python scripts/dev/monitor_frontier_cs.py --results-dir path # custom dir |
| """ |
|
|
| import argparse |
| import json |
| import os |
| import sys |
| import time |
| from pathlib import Path |
|
|
|
|
| def _scan_problem_dir(exp_dir: Path, pid: str): |
| """Scan a single problem directory and return its status.""" |
| gen_dirs = sorted(exp_dir.glob("gen_*"), key=lambda p: int(p.name.split("_")[1])) |
| n_gens = len(gen_dirs) |
|
|
| if n_gens == 0: |
| return {"pid": pid, "gens": 0, "best": 0.0, "latest": 0.0, "status": "starting"} |
|
|
| best_score = 0.0 |
| latest_score = 0.0 |
| latest_gen = 0 |
|
|
| for g in gen_dirs: |
| metrics_path = g / "results" / "metrics.json" |
| if not metrics_path.exists(): |
| continue |
| try: |
| with open(metrics_path) as f: |
| m = json.load(f) |
| score = m.get("combined_score", 0.0) |
| gen_num = int(g.name.split("_")[1]) |
| if score > best_score: |
| best_score = score |
| if gen_num >= latest_gen: |
| latest_gen = gen_num |
| latest_score = score |
| except (json.JSONDecodeError, ValueError): |
| continue |
|
|
| status = "done" if n_gens >= 50 else "running" |
| return { |
| "pid": pid, |
| "gens": n_gens, |
| "best": best_score, |
| "latest": latest_score, |
| "latest_gen": latest_gen, |
| "status": status, |
| } |
|
|
|
|
| def scan_experiments(results_root: Path): |
| """Scan experiment directories and return status for each problem. |
| |
| Supports both layouts: |
| - New: results_root is a run dir containing p0/, p1/, ... |
| - Legacy: results_root contains p0_batch_g50_xxx/, p1_agent_g50_xxx/, ... |
| """ |
| rows = [] |
|
|
| |
| subdirs = sorted(d for d in results_root.iterdir() if d.is_dir()) |
| has_problem_dirs = any(d.name.startswith("p") and d.name[1:].isdigit() for d in subdirs) |
|
|
| if has_problem_dirs: |
| |
| for d in subdirs: |
| if d.name.startswith("p") and d.name[1:].isdigit(): |
| rows.append(_scan_problem_dir(d, d.name)) |
| else: |
| |
| for d in subdirs: |
| parts = d.name.split("_") |
| pid = parts[0] if parts else d.name |
| rows.append(_scan_problem_dir(d, pid)) |
|
|
| return rows |
|
|
|
|
| def print_table(rows, total_target_gens=50): |
| """Print a formatted status table.""" |
| |
| print("\033[2J\033[H", end="") |
|
|
| now = time.strftime("%H:%M:%S") |
| running = sum(1 for r in rows if r["status"] == "running") |
| done = sum(1 for r in rows if r["status"] == "done") |
| total = len(rows) |
|
|
| print(f"Frontier-CS Progress Monitor [{now}] running={running} done={done} total={total}") |
| print("=" * 72) |
| print(f"{'Problem':<10} {'Gen':>6} {'Progress':>10} {'Best':>8} {'Latest':>8} {'Status':<8}") |
| print("-" * 72) |
|
|
| for r in rows: |
| gens = r["gens"] |
| pct = f"{gens}/{total_target_gens}" |
| bar_len = 8 |
| filled = int(bar_len * gens / total_target_gens) if total_target_gens > 0 else 0 |
| bar = "#" * filled + "." * (bar_len - filled) |
|
|
| status_str = r["status"] |
| best = f"{r['best']:.1f}" if r["best"] > 0 else "-" |
| latest = f"{r['latest']:.1f}" if r["latest"] > 0 else "-" |
|
|
| print(f"{r['pid']:<10} {gens:>6} {pct:>6} {bar} {best:>8} {latest:>8} {status_str:<8}") |
|
|
| print("-" * 72) |
|
|
| |
| scores = [r["best"] for r in rows if r["best"] > 0] |
| if scores: |
| avg = sum(scores) / len(scores) |
| print(f"Avg best score: {avg:.2f} (across {len(scores)} problems with score > 0)") |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Monitor Frontier-CS experiments") |
| parser.add_argument("--results-dir", type=str, |
| default="results/frontier_cs_algorithmic", |
| help="Results root directory") |
| parser.add_argument("--interval", type=int, default=30, |
| help="Refresh interval in seconds") |
| parser.add_argument("--once", action="store_true", |
| help="Print once and exit (no loop)") |
| args = parser.parse_args() |
|
|
| results_root = Path(args.results_dir) |
| if not results_root.exists(): |
| print(f"Results directory not found: {results_root}") |
| sys.exit(1) |
|
|
| if args.once: |
| rows = scan_experiments(results_root) |
| print_table(rows) |
| return |
|
|
| try: |
| while True: |
| rows = scan_experiments(results_root) |
| print_table(rows) |
| time.sleep(args.interval) |
| except KeyboardInterrupt: |
| print("\nMonitor stopped.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|