shinka-backup / scripts /dev /eval_frontier_cs_baseline.py
JustinTX's picture
Add files using upload-large-folder tool
3f6526a verified
#!/usr/bin/env python3
"""
Evaluate Frontier-CS baseline: score all existing LLM solutions without evolution.
For each model, evaluates all variants on all problems and reports:
- Per-problem best score (best-of-k)
- Per-problem average score (avg-of-k)
- Overall average across problems
Usage:
.venv/bin/python scripts/dev/eval_frontier_cs_baseline.py
.venv/bin/python scripts/dev/eval_frontier_cs_baseline.py --models gpt5 gemini3pro --concurrency 4
.venv/bin/python scripts/dev/eval_frontier_cs_baseline.py --problems 0-49
"""
from __future__ import annotations
import argparse
import csv
import json
import logging
import os
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
from tasks.frontier_cs_entry.evaluate_algorithmic import main as evaluate
logger = logging.getLogger(__name__)
DEFAULT_FRONTIER_CS_DIR = "tasks/Frontier-CS"
RESULTS_DIR = "results/frontier_cs_baseline"
def find_all_solutions(frontier_cs_dir: str, problem_id: str, model: str) -> list[Path]:
"""Find all solution variants for a model on a problem."""
solutions_dir = Path(frontier_cs_dir) / "algorithmic" / "solutions" / str(problem_id)
if not solutions_dir.exists():
return []
exact = solutions_dir / f"{model}.cpp"
variants = sorted(solutions_dir.glob(f"{model}_*.cpp"))
results = []
if exact.exists():
results.append(exact)
results.extend(variants)
return results
def list_problem_ids(frontier_cs_dir: str) -> list[str]:
"""List all problem IDs that have both a problem dir and solutions dir."""
problems_dir = Path(frontier_cs_dir) / "algorithmic" / "problems"
solutions_dir = Path(frontier_cs_dir) / "algorithmic" / "solutions"
pids = []
for d in sorted(problems_dir.iterdir(), key=lambda p: int(p.name)):
if d.is_dir() and (solutions_dir / d.name).is_dir():
pids.append(d.name)
return pids
def eval_one(problem_id: str, solution_path: Path, frontier_cs_dir: str,
results_base: str) -> dict:
"""Evaluate a single solution. Returns result dict."""
results_dir = os.path.join(
results_base, f"p{problem_id}", solution_path.stem
)
os.makedirs(results_dir, exist_ok=True)
try:
result = evaluate(
program_path=str(solution_path),
results_dir=results_dir,
problem_id=problem_id,
frontier_cs_dir=frontier_cs_dir,
)
score = result.get("combined_score", 0.0)
except Exception as e:
logger.warning(f"Failed p{problem_id}/{solution_path.name}: {e}")
score = 0.0
return {
"problem_id": problem_id,
"solution": solution_path.name,
"score": score,
}
def parse_problem_range(s: str, all_pids: list[str]) -> list[str]:
"""Parse '0-49' or 'all' into list of problem IDs."""
if s == "all":
return all_pids
if "-" in s:
lo, hi = s.split("-", 1)
lo, hi = int(lo), int(hi)
return [p for p in all_pids if lo <= int(p) <= hi]
return [s]
def main():
parser = argparse.ArgumentParser(
description="Evaluate Frontier-CS baseline solutions",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--models", nargs="+", default=["gpt5", "gemini3pro"],
help="Model prefixes to evaluate")
parser.add_argument("--problems", type=str, default="all",
help="Problem range: 'all', '0-49', or single ID")
parser.add_argument("--concurrency", type=int, default=4,
help="Number of concurrent evaluations (keep low to avoid go-judge contention)")
parser.add_argument("--frontier-cs-dir", type=str, default=DEFAULT_FRONTIER_CS_DIR)
parser.add_argument("--output", type=str, default=None,
help="Output CSV path (default: auto in results dir)")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
frontier_cs_dir = args.frontier_cs_dir
if not Path(frontier_cs_dir).is_absolute():
project_root = Path(__file__).resolve().parents[2]
frontier_cs_dir = str(project_root / frontier_cs_dir)
all_pids = list_problem_ids(frontier_cs_dir)
selected_pids = parse_problem_range(args.problems, all_pids)
# Collect all (problem, solution) pairs to evaluate
tasks = []
for model in args.models:
for pid in selected_pids:
solutions = find_all_solutions(frontier_cs_dir, pid, model)
for sol in solutions:
tasks.append((pid, sol, model))
print("=" * 60)
print("Frontier-CS Baseline Evaluation")
print("=" * 60)
print(f" Models: {', '.join(args.models)}")
print(f" Problems: {len(selected_pids)}")
print(f" Total evals: {len(tasks)}")
print(f" Concurrency: {args.concurrency}")
print("=" * 60)
print()
results_base = os.path.join(RESULTS_DIR, time.strftime("%Y%m%d_%H%M%S"))
os.makedirs(results_base, exist_ok=True)
# Run evaluations in parallel
all_results = []
done = 0
start = time.time()
with ThreadPoolExecutor(max_workers=args.concurrency) as pool:
futures = {
pool.submit(eval_one, pid, sol, frontier_cs_dir, results_base): (pid, sol, model)
for pid, sol, model in tasks
}
for future in as_completed(futures):
pid, sol, model = futures[future]
try:
result = future.result()
result["model"] = model
all_results.append(result)
except Exception as e:
logger.error(f"Error evaluating p{pid}/{sol.name}: {e}")
all_results.append({
"problem_id": pid, "solution": sol.name,
"model": model, "score": 0.0,
})
done += 1
if done % 20 == 0 or done == len(tasks):
elapsed = time.time() - start
rate = done / elapsed if elapsed > 0 else 0
print(f" [{done}/{len(tasks)}] {rate:.1f} evals/s elapsed={elapsed:.0f}s")
# Aggregate per model
print()
print("=" * 60)
print("Results")
print("=" * 60)
csv_path = args.output or os.path.join(results_base, "baseline_results.csv")
with open(csv_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["model", "problem_id", "solution", "score"])
writer.writeheader()
for r in sorted(all_results, key=lambda x: (x["model"], int(x["problem_id"]), x["solution"])):
writer.writerow(r)
for model in args.models:
model_results = [r for r in all_results if r["model"] == model]
if not model_results:
print(f"\n{model}: no results")
continue
# Group by problem
by_problem = {}
for r in model_results:
by_problem.setdefault(r["problem_id"], []).append(r["score"])
best_scores = []
avg_scores = []
for pid in sorted(by_problem, key=int):
scores = by_problem[pid]
best = max(scores)
avg = sum(scores) / len(scores)
best_scores.append(best)
avg_scores.append(avg)
overall_best = sum(best_scores) / len(best_scores) if best_scores else 0
overall_avg = sum(avg_scores) / len(avg_scores) if avg_scores else 0
print(f"\n{model}:")
print(f" Problems evaluated: {len(by_problem)}")
print(f" Avg score (avg-of-k): {overall_avg:.2f}")
print(f" Avg score (best-of-k): {overall_best:.2f}")
print(f"\nDetailed results: {csv_path}")
print()
if __name__ == "__main__":
main()