#!/usr/bin/env python3 """ Evaluate Frontier-CS baseline: score all existing LLM solutions without evolution. For each model, evaluates all variants on all problems and reports: - Per-problem best score (best-of-k) - Per-problem average score (avg-of-k) - Overall average across problems Usage: .venv/bin/python scripts/dev/eval_frontier_cs_baseline.py .venv/bin/python scripts/dev/eval_frontier_cs_baseline.py --models gpt5 gemini3pro --concurrency 4 .venv/bin/python scripts/dev/eval_frontier_cs_baseline.py --problems 0-49 """ from __future__ import annotations import argparse import csv import json import logging import os import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parents[2])) from tasks.frontier_cs_entry.evaluate_algorithmic import main as evaluate logger = logging.getLogger(__name__) DEFAULT_FRONTIER_CS_DIR = "tasks/Frontier-CS" RESULTS_DIR = "results/frontier_cs_baseline" def find_all_solutions(frontier_cs_dir: str, problem_id: str, model: str) -> list[Path]: """Find all solution variants for a model on a problem.""" solutions_dir = Path(frontier_cs_dir) / "algorithmic" / "solutions" / str(problem_id) if not solutions_dir.exists(): return [] exact = solutions_dir / f"{model}.cpp" variants = sorted(solutions_dir.glob(f"{model}_*.cpp")) results = [] if exact.exists(): results.append(exact) results.extend(variants) return results def list_problem_ids(frontier_cs_dir: str) -> list[str]: """List all problem IDs that have both a problem dir and solutions dir.""" problems_dir = Path(frontier_cs_dir) / "algorithmic" / "problems" solutions_dir = Path(frontier_cs_dir) / "algorithmic" / "solutions" pids = [] for d in sorted(problems_dir.iterdir(), key=lambda p: int(p.name)): if d.is_dir() and (solutions_dir / d.name).is_dir(): pids.append(d.name) return pids def eval_one(problem_id: str, solution_path: Path, frontier_cs_dir: str, results_base: str) -> dict: """Evaluate a single solution. Returns result dict.""" results_dir = os.path.join( results_base, f"p{problem_id}", solution_path.stem ) os.makedirs(results_dir, exist_ok=True) try: result = evaluate( program_path=str(solution_path), results_dir=results_dir, problem_id=problem_id, frontier_cs_dir=frontier_cs_dir, ) score = result.get("combined_score", 0.0) except Exception as e: logger.warning(f"Failed p{problem_id}/{solution_path.name}: {e}") score = 0.0 return { "problem_id": problem_id, "solution": solution_path.name, "score": score, } def parse_problem_range(s: str, all_pids: list[str]) -> list[str]: """Parse '0-49' or 'all' into list of problem IDs.""" if s == "all": return all_pids if "-" in s: lo, hi = s.split("-", 1) lo, hi = int(lo), int(hi) return [p for p in all_pids if lo <= int(p) <= hi] return [s] def main(): parser = argparse.ArgumentParser( description="Evaluate Frontier-CS baseline solutions", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("--models", nargs="+", default=["gpt5", "gemini3pro"], help="Model prefixes to evaluate") parser.add_argument("--problems", type=str, default="all", help="Problem range: 'all', '0-49', or single ID") parser.add_argument("--concurrency", type=int, default=4, help="Number of concurrent evaluations (keep low to avoid go-judge contention)") parser.add_argument("--frontier-cs-dir", type=str, default=DEFAULT_FRONTIER_CS_DIR) parser.add_argument("--output", type=str, default=None, help="Output CSV path (default: auto in results dir)") args = parser.parse_args() logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") frontier_cs_dir = args.frontier_cs_dir if not Path(frontier_cs_dir).is_absolute(): project_root = Path(__file__).resolve().parents[2] frontier_cs_dir = str(project_root / frontier_cs_dir) all_pids = list_problem_ids(frontier_cs_dir) selected_pids = parse_problem_range(args.problems, all_pids) # Collect all (problem, solution) pairs to evaluate tasks = [] for model in args.models: for pid in selected_pids: solutions = find_all_solutions(frontier_cs_dir, pid, model) for sol in solutions: tasks.append((pid, sol, model)) print("=" * 60) print("Frontier-CS Baseline Evaluation") print("=" * 60) print(f" Models: {', '.join(args.models)}") print(f" Problems: {len(selected_pids)}") print(f" Total evals: {len(tasks)}") print(f" Concurrency: {args.concurrency}") print("=" * 60) print() results_base = os.path.join(RESULTS_DIR, time.strftime("%Y%m%d_%H%M%S")) os.makedirs(results_base, exist_ok=True) # Run evaluations in parallel all_results = [] done = 0 start = time.time() with ThreadPoolExecutor(max_workers=args.concurrency) as pool: futures = { pool.submit(eval_one, pid, sol, frontier_cs_dir, results_base): (pid, sol, model) for pid, sol, model in tasks } for future in as_completed(futures): pid, sol, model = futures[future] try: result = future.result() result["model"] = model all_results.append(result) except Exception as e: logger.error(f"Error evaluating p{pid}/{sol.name}: {e}") all_results.append({ "problem_id": pid, "solution": sol.name, "model": model, "score": 0.0, }) done += 1 if done % 20 == 0 or done == len(tasks): elapsed = time.time() - start rate = done / elapsed if elapsed > 0 else 0 print(f" [{done}/{len(tasks)}] {rate:.1f} evals/s elapsed={elapsed:.0f}s") # Aggregate per model print() print("=" * 60) print("Results") print("=" * 60) csv_path = args.output or os.path.join(results_base, "baseline_results.csv") with open(csv_path, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=["model", "problem_id", "solution", "score"]) writer.writeheader() for r in sorted(all_results, key=lambda x: (x["model"], int(x["problem_id"]), x["solution"])): writer.writerow(r) for model in args.models: model_results = [r for r in all_results if r["model"] == model] if not model_results: print(f"\n{model}: no results") continue # Group by problem by_problem = {} for r in model_results: by_problem.setdefault(r["problem_id"], []).append(r["score"]) best_scores = [] avg_scores = [] for pid in sorted(by_problem, key=int): scores = by_problem[pid] best = max(scores) avg = sum(scores) / len(scores) best_scores.append(best) avg_scores.append(avg) overall_best = sum(best_scores) / len(best_scores) if best_scores else 0 overall_avg = sum(avg_scores) / len(avg_scores) if avg_scores else 0 print(f"\n{model}:") print(f" Problems evaluated: {len(by_problem)}") print(f" Avg score (avg-of-k): {overall_avg:.2f}") print(f" Avg score (best-of-k): {overall_best:.2f}") print(f"\nDetailed results: {csv_path}") print() if __name__ == "__main__": main()