shinka-backup / scripts /dev /monitor_frontier_cs.py
JustinTX's picture
Add files using upload-large-folder tool
3f6526a verified
#!/usr/bin/env python3
"""
Live progress monitor for Frontier-CS parallel experiments.
Scans result directories and prints a refreshing status table showing
each problem's current generation, best score, and latest score.
Usage:
python scripts/dev/monitor_frontier_cs.py # default results dir
python scripts/dev/monitor_frontier_cs.py --interval 10 # refresh every 10s
python scripts/dev/monitor_frontier_cs.py --results-dir path # custom dir
"""
import argparse
import json
import os
import sys
import time
from pathlib import Path
def _scan_problem_dir(exp_dir: Path, pid: str):
"""Scan a single problem directory and return its status."""
gen_dirs = sorted(exp_dir.glob("gen_*"), key=lambda p: int(p.name.split("_")[1]))
n_gens = len(gen_dirs)
if n_gens == 0:
return {"pid": pid, "gens": 0, "best": 0.0, "latest": 0.0, "status": "starting"}
best_score = 0.0
latest_score = 0.0
latest_gen = 0
for g in gen_dirs:
metrics_path = g / "results" / "metrics.json"
if not metrics_path.exists():
continue
try:
with open(metrics_path) as f:
m = json.load(f)
score = m.get("combined_score", 0.0)
gen_num = int(g.name.split("_")[1])
if score > best_score:
best_score = score
if gen_num >= latest_gen:
latest_gen = gen_num
latest_score = score
except (json.JSONDecodeError, ValueError):
continue
status = "done" if n_gens >= 50 else "running"
return {
"pid": pid,
"gens": n_gens,
"best": best_score,
"latest": latest_score,
"latest_gen": latest_gen,
"status": status,
}
def scan_experiments(results_root: Path):
"""Scan experiment directories and return status for each problem.
Supports both layouts:
- New: results_root is a run dir containing p0/, p1/, ...
- Legacy: results_root contains p0_batch_g50_xxx/, p1_agent_g50_xxx/, ...
"""
rows = []
# Detect layout: if subdirs match p<N>, it's a run dir (new layout)
subdirs = sorted(d for d in results_root.iterdir() if d.is_dir())
has_problem_dirs = any(d.name.startswith("p") and d.name[1:].isdigit() for d in subdirs)
if has_problem_dirs:
# New layout: results_root/p0/, p1/, ...
for d in subdirs:
if d.name.startswith("p") and d.name[1:].isdigit():
rows.append(_scan_problem_dir(d, d.name))
else:
# Legacy layout: results_root/p0_batch_g50_xxx/
for d in subdirs:
parts = d.name.split("_")
pid = parts[0] if parts else d.name
rows.append(_scan_problem_dir(d, pid))
return rows
def print_table(rows, total_target_gens=50):
"""Print a formatted status table."""
# Clear screen
print("\033[2J\033[H", end="")
now = time.strftime("%H:%M:%S")
running = sum(1 for r in rows if r["status"] == "running")
done = sum(1 for r in rows if r["status"] == "done")
total = len(rows)
print(f"Frontier-CS Progress Monitor [{now}] running={running} done={done} total={total}")
print("=" * 72)
print(f"{'Problem':<10} {'Gen':>6} {'Progress':>10} {'Best':>8} {'Latest':>8} {'Status':<8}")
print("-" * 72)
for r in rows:
gens = r["gens"]
pct = f"{gens}/{total_target_gens}"
bar_len = 8
filled = int(bar_len * gens / total_target_gens) if total_target_gens > 0 else 0
bar = "#" * filled + "." * (bar_len - filled)
status_str = r["status"]
best = f"{r['best']:.1f}" if r["best"] > 0 else "-"
latest = f"{r['latest']:.1f}" if r["latest"] > 0 else "-"
print(f"{r['pid']:<10} {gens:>6} {pct:>6} {bar} {best:>8} {latest:>8} {status_str:<8}")
print("-" * 72)
# Summary stats
scores = [r["best"] for r in rows if r["best"] > 0]
if scores:
avg = sum(scores) / len(scores)
print(f"Avg best score: {avg:.2f} (across {len(scores)} problems with score > 0)")
def main():
parser = argparse.ArgumentParser(description="Monitor Frontier-CS experiments")
parser.add_argument("--results-dir", type=str,
default="results/frontier_cs_algorithmic",
help="Results root directory")
parser.add_argument("--interval", type=int, default=30,
help="Refresh interval in seconds")
parser.add_argument("--once", action="store_true",
help="Print once and exit (no loop)")
args = parser.parse_args()
results_root = Path(args.results_dir)
if not results_root.exists():
print(f"Results directory not found: {results_root}")
sys.exit(1)
if args.once:
rows = scan_experiments(results_root)
print_table(rows)
return
try:
while True:
rows = scan_experiments(results_root)
print_table(rows)
time.sleep(args.interval)
except KeyboardInterrupt:
print("\nMonitor stopped.")
if __name__ == "__main__":
main()