shinka-backup / tasks /frontier_cs_entry /compare_experiments.py
JustinTX's picture
Add files using upload-large-folder tool
2facf1f verified
#!/usr/bin/env python3
"""Compare two experiment runs across multiple problems.
Usage:
# New layout: two run directories, each containing p0/, p1/, ...
python tasks/frontier_cs_entry/compare_experiments.py \
results/frontier_cs_algorithmic/batch_g50_20260327_120000 \
results/frontier_cs_algorithmic/agent_g50_20260327_130000
# Legacy layout: shared root + variant name patterns
python tasks/frontier_cs_entry/compare_experiments.py \
results/frontier_cs_algorithmic batch agent --legacy
# Options work with both layouts
python tasks/frontier_cs_entry/compare_experiments.py dir_a dir_b --problems p0 p1 p2
python tasks/frontier_cs_entry/compare_experiments.py dir_a dir_b --csv results/comparison.csv
"""
import argparse
import json
import os
import re
import sqlite3
import sys
from collections import defaultdict
from pathlib import Path
def find_experiment_dirs_new(run_dir: str) -> dict[str, Path]:
"""Find problem directories in a run directory (new layout: run_dir/p0/, p1/, ...).
Returns dict mapping problem_id -> directory path.
"""
run_path = Path(run_dir)
if not run_path.exists():
print(f"Error: {run_dir} does not exist", file=sys.stderr)
sys.exit(1)
problem_dirs = {}
for d in sorted(run_path.iterdir()):
if not d.is_dir():
continue
match = re.match(r"(p\d+)$", d.name)
if match:
pid = match.group(1)
problem_dirs[pid] = d
return problem_dirs
def find_experiment_dirs_legacy(results_dir: str, variant: str) -> dict[str, Path]:
"""Find experiment directories in legacy flat layout.
Pattern: results_dir/p{id}_{variant}_g{gens}_{timestamp}/
Returns dict mapping problem_id -> directory path.
"""
results_path = Path(results_dir)
if not results_path.exists():
print(f"Error: {results_dir} does not exist", file=sys.stderr)
sys.exit(1)
problem_dirs = {}
for d in sorted(results_path.iterdir()):
if not d.is_dir():
continue
match = re.match(r"(p\d+)_(" + re.escape(variant) + r")_g\d+_\d+", d.name)
if match:
pid = match.group(1)
# If multiple runs for same problem+variant, take the latest (last sorted)
problem_dirs[pid] = d
return problem_dirs
def get_scores_from_db(db_path: Path) -> dict:
"""Extract score statistics from an evolution_db.sqlite.
Uses score_bounded from public_metrics when available (to avoid
unbounded scores >100 inflating comparisons). Falls back to
combined_score if public_metrics is missing or unparseable.
"""
if not db_path.exists():
return None
try:
conn = sqlite3.connect(str(db_path))
cur = conn.cursor()
# Read all programs and extract bounded score where possible
cur.execute(
"SELECT generation, combined_score, public_metrics FROM programs "
"ORDER BY generation"
)
all_rows = cur.fetchall()
def _extract_score(combined_score, public_metrics_json):
"""Return bounded score if available, else combined_score."""
if public_metrics_json:
try:
pm = json.loads(public_metrics_json)
if "score_bounded" in pm:
return pm["score_bounded"]
except (json.JSONDecodeError, TypeError):
pass
return combined_score or 0.0
# Best score overall
best_score = 0.0
best_gen = None
for gen, cs, pm in all_rows:
s = _extract_score(cs, pm)
if s > best_score:
best_score = s
best_gen = gen
# Total generations and programs
max_gen = max((r[0] for r in all_rows), default=0)
total_programs = len(all_rows)
# Running best per generation (cumulative max)
gen_best = {}
for gen, cs, pm in all_rows:
s = _extract_score(cs, pm)
if gen not in gen_best or s > gen_best[gen]:
gen_best[gen] = s
gen_scores = sorted(gen_best.items())
# Compute cumulative best trajectory
cum_best = []
running_max = float("-inf")
for gen, score in gen_scores:
if score is not None and score > running_max:
running_max = score
cum_best.append((gen, running_max))
# Score at generation milestones
milestones = {}
for gen, cb in cum_best:
for m in [10, 20, 30, 40, 50]:
if gen == m:
milestones[m] = cb
conn.close()
return {
"best_score": best_score or 0.0,
"best_gen": best_gen,
"max_gen": max_gen,
"total_programs": total_programs,
"milestones": milestones,
"trajectory": cum_best,
}
except Exception as e:
return {"error": str(e)}
def format_score(score):
if score is None or score == 0:
return " 0.00"
return f"{score:6.2f}"
def main():
parser = argparse.ArgumentParser(
description="Compare two experiment runs across problems"
)
parser.add_argument("dir_a", help="First run directory (or legacy results_dir)")
parser.add_argument("dir_b", help="Second run directory (or legacy variant_b name)")
parser.add_argument(
"--legacy", action="store_true",
help="Legacy mode: dir_a is shared results root, dir_b is variant_b name. "
"Requires --variant-a.",
)
parser.add_argument("--variant-a", type=str, default=None, help="Legacy: variant_a name")
parser.add_argument(
"--problems", nargs="*", help="Only compare these problem IDs (e.g., p0 p1)"
)
parser.add_argument("--csv", help="Export results to CSV file")
parser.add_argument(
"--sort",
choices=["problem", "diff", "score_a", "score_b"],
default="problem",
help="Sort order for output table",
)
args = parser.parse_args()
# Find experiment directories
if args.legacy:
va = args.variant_a or "batch"
vb = args.dir_b
dirs_a = find_experiment_dirs_legacy(args.dir_a, va)
dirs_b = find_experiment_dirs_legacy(args.dir_a, vb)
else:
dirs_a = find_experiment_dirs_new(args.dir_a)
dirs_b = find_experiment_dirs_new(args.dir_b)
all_problems = sorted(
set(dirs_a.keys()) | set(dirs_b.keys()),
key=lambda x: int(x[1:]), # Sort by numeric ID
)
if args.problems:
all_problems = [p for p in all_problems if p in args.problems]
if not all_problems:
print("No matching problems found.", file=sys.stderr)
sys.exit(1)
# Collect results
rows = []
for pid in all_problems:
stats_a = None
stats_b = None
if pid in dirs_a:
stats_a = get_scores_from_db(dirs_a[pid] / "evolution_db.sqlite")
if pid in dirs_b:
stats_b = get_scores_from_db(dirs_b[pid] / "evolution_db.sqlite")
score_a = stats_a["best_score"] if stats_a and "best_score" in stats_a else 0.0
score_b = stats_b["best_score"] if stats_b and "best_score" in stats_b else 0.0
diff = score_b - score_a
gen_a = stats_a.get("best_gen") if stats_a else None
gen_b = stats_b.get("best_gen") if stats_b else None
rows.append(
{
"pid": pid,
"score_a": score_a,
"score_b": score_b,
"diff": diff,
"gen_a": gen_a,
"gen_b": gen_b,
"stats_a": stats_a,
"stats_b": stats_b,
}
)
# Sort
if args.sort == "diff":
rows.sort(key=lambda r: r["diff"], reverse=True)
elif args.sort == "score_a":
rows.sort(key=lambda r: r["score_a"], reverse=True)
elif args.sort == "score_b":
rows.sort(key=lambda r: r["score_b"], reverse=True)
# Print table
if args.legacy:
va = args.variant_a or "batch"
vb = args.dir_b
else:
va = Path(args.dir_a).name
vb = Path(args.dir_b).name
header = f"{'Problem':>8} {va:>10} {'gen':>4} {vb:>10} {'gen':>4} {'diff':>8} {'winner':>8}"
sep = "-" * len(header)
print(f"\n Comparison: {va} vs {vb}")
print(f" Dir A: {args.dir_a}")
print(f" Dir B: {args.dir_b}")
print(f" Problems: {len(rows)}\n")
print(header)
print(sep)
wins_a, wins_b, ties = 0, 0, 0
sum_a, sum_b, sum_diff = 0.0, 0.0, 0.0
count_both = 0
for r in rows:
gen_a_str = f"{r['gen_a']:4d}" if r["gen_a"] is not None else " -"
gen_b_str = f"{r['gen_b']:4d}" if r["gen_b"] is not None else " -"
if abs(r["diff"]) < 0.01:
winner = "tie"
ties += 1
elif r["diff"] > 0:
winner = vb
wins_b += 1
else:
winner = va
wins_a += 1
diff_str = f"{r['diff']:+8.2f}"
print(
f"{r['pid']:>8} {format_score(r['score_a']):>10} {gen_a_str} "
f"{format_score(r['score_b']):>10} {gen_b_str} {diff_str} {winner:>8}"
)
if r["score_a"] > 0 or r["score_b"] > 0:
sum_a += r["score_a"]
sum_b += r["score_b"]
sum_diff += r["diff"]
count_both += 1
print(sep)
# Summary
if count_both > 0:
avg_a = sum_a / count_both
avg_b = sum_b / count_both
avg_diff = sum_diff / count_both
print(
f"{'avg':>8} {avg_a:10.2f} {avg_b:10.2f} {avg_diff:+8.2f}"
)
print(
f"{'total':>8} {sum_a:10.2f} {sum_b:10.2f} {sum_diff:+8.2f}"
)
print(f"\n Wins: {va}={wins_a}, {vb}={wins_b}, ties={ties}")
if count_both > 0:
print(f" Average score: {va}={sum_a/count_both:.2f}, {vb}={sum_b/count_both:.2f} (diff={sum_diff/count_both:+.2f})")
# CSV export
if args.csv:
csv_path = Path(args.csv)
csv_path.parent.mkdir(parents=True, exist_ok=True)
with open(csv_path, "w") as f:
f.write(f"problem,{va}_score,{va}_best_gen,{vb}_score,{vb}_best_gen,diff,winner\n")
for r in rows:
winner = "tie" if abs(r["diff"]) < 0.01 else (vb if r["diff"] > 0 else va)
gen_a = r["gen_a"] if r["gen_a"] is not None else ""
gen_b = r["gen_b"] if r["gen_b"] is not None else ""
f.write(
f"{r['pid']},{r['score_a']:.4f},{gen_a},"
f"{r['score_b']:.4f},{gen_b},{r['diff']:.4f},{winner}\n"
)
print(f"\n CSV exported to {csv_path}")
print()
if __name__ == "__main__":
main()