| |
| """ |
| Evaluate Frontier-CS baseline: score all existing LLM solutions without evolution. |
| |
| For each model, evaluates all variants on all problems and reports: |
| - Per-problem best score (best-of-k) |
| - Per-problem average score (avg-of-k) |
| - Overall average across problems |
| |
| Usage: |
| .venv/bin/python scripts/dev/eval_frontier_cs_baseline.py |
| .venv/bin/python scripts/dev/eval_frontier_cs_baseline.py --models gpt5 gemini3pro --concurrency 4 |
| .venv/bin/python scripts/dev/eval_frontier_cs_baseline.py --problems 0-49 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import csv |
| import json |
| import logging |
| import os |
| import sys |
| import time |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from pathlib import Path |
|
|
| sys.path.insert(0, str(Path(__file__).resolve().parents[2])) |
|
|
| from tasks.frontier_cs_entry.evaluate_algorithmic import main as evaluate |
|
|
| logger = logging.getLogger(__name__) |
|
|
| DEFAULT_FRONTIER_CS_DIR = "tasks/Frontier-CS" |
| RESULTS_DIR = "results/frontier_cs_baseline" |
|
|
|
|
| def find_all_solutions(frontier_cs_dir: str, problem_id: str, model: str) -> list[Path]: |
| """Find all solution variants for a model on a problem.""" |
| solutions_dir = Path(frontier_cs_dir) / "algorithmic" / "solutions" / str(problem_id) |
| if not solutions_dir.exists(): |
| return [] |
| exact = solutions_dir / f"{model}.cpp" |
| variants = sorted(solutions_dir.glob(f"{model}_*.cpp")) |
| results = [] |
| if exact.exists(): |
| results.append(exact) |
| results.extend(variants) |
| return results |
|
|
|
|
| def list_problem_ids(frontier_cs_dir: str) -> list[str]: |
| """List all problem IDs that have both a problem dir and solutions dir.""" |
| problems_dir = Path(frontier_cs_dir) / "algorithmic" / "problems" |
| solutions_dir = Path(frontier_cs_dir) / "algorithmic" / "solutions" |
| pids = [] |
| for d in sorted(problems_dir.iterdir(), key=lambda p: int(p.name)): |
| if d.is_dir() and (solutions_dir / d.name).is_dir(): |
| pids.append(d.name) |
| return pids |
|
|
|
|
| def eval_one(problem_id: str, solution_path: Path, frontier_cs_dir: str, |
| results_base: str) -> dict: |
| """Evaluate a single solution. Returns result dict.""" |
| results_dir = os.path.join( |
| results_base, f"p{problem_id}", solution_path.stem |
| ) |
| os.makedirs(results_dir, exist_ok=True) |
|
|
| try: |
| result = evaluate( |
| program_path=str(solution_path), |
| results_dir=results_dir, |
| problem_id=problem_id, |
| frontier_cs_dir=frontier_cs_dir, |
| ) |
| score = result.get("combined_score", 0.0) |
| except Exception as e: |
| logger.warning(f"Failed p{problem_id}/{solution_path.name}: {e}") |
| score = 0.0 |
|
|
| return { |
| "problem_id": problem_id, |
| "solution": solution_path.name, |
| "score": score, |
| } |
|
|
|
|
| def parse_problem_range(s: str, all_pids: list[str]) -> list[str]: |
| """Parse '0-49' or 'all' into list of problem IDs.""" |
| if s == "all": |
| return all_pids |
| if "-" in s: |
| lo, hi = s.split("-", 1) |
| lo, hi = int(lo), int(hi) |
| return [p for p in all_pids if lo <= int(p) <= hi] |
| return [s] |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="Evaluate Frontier-CS baseline solutions", |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
| ) |
| parser.add_argument("--models", nargs="+", default=["gpt5", "gemini3pro"], |
| help="Model prefixes to evaluate") |
| parser.add_argument("--problems", type=str, default="all", |
| help="Problem range: 'all', '0-49', or single ID") |
| parser.add_argument("--concurrency", type=int, default=4, |
| help="Number of concurrent evaluations (keep low to avoid go-judge contention)") |
| parser.add_argument("--frontier-cs-dir", type=str, default=DEFAULT_FRONTIER_CS_DIR) |
| parser.add_argument("--output", type=str, default=None, |
| help="Output CSV path (default: auto in results dir)") |
| args = parser.parse_args() |
|
|
| logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") |
|
|
| frontier_cs_dir = args.frontier_cs_dir |
| if not Path(frontier_cs_dir).is_absolute(): |
| project_root = Path(__file__).resolve().parents[2] |
| frontier_cs_dir = str(project_root / frontier_cs_dir) |
|
|
| all_pids = list_problem_ids(frontier_cs_dir) |
| selected_pids = parse_problem_range(args.problems, all_pids) |
|
|
| |
| tasks = [] |
| for model in args.models: |
| for pid in selected_pids: |
| solutions = find_all_solutions(frontier_cs_dir, pid, model) |
| for sol in solutions: |
| tasks.append((pid, sol, model)) |
|
|
| print("=" * 60) |
| print("Frontier-CS Baseline Evaluation") |
| print("=" * 60) |
| print(f" Models: {', '.join(args.models)}") |
| print(f" Problems: {len(selected_pids)}") |
| print(f" Total evals: {len(tasks)}") |
| print(f" Concurrency: {args.concurrency}") |
| print("=" * 60) |
| print() |
|
|
| results_base = os.path.join(RESULTS_DIR, time.strftime("%Y%m%d_%H%M%S")) |
| os.makedirs(results_base, exist_ok=True) |
|
|
| |
| all_results = [] |
| done = 0 |
| start = time.time() |
|
|
| with ThreadPoolExecutor(max_workers=args.concurrency) as pool: |
| futures = { |
| pool.submit(eval_one, pid, sol, frontier_cs_dir, results_base): (pid, sol, model) |
| for pid, sol, model in tasks |
| } |
| for future in as_completed(futures): |
| pid, sol, model = futures[future] |
| try: |
| result = future.result() |
| result["model"] = model |
| all_results.append(result) |
| except Exception as e: |
| logger.error(f"Error evaluating p{pid}/{sol.name}: {e}") |
| all_results.append({ |
| "problem_id": pid, "solution": sol.name, |
| "model": model, "score": 0.0, |
| }) |
| done += 1 |
| if done % 20 == 0 or done == len(tasks): |
| elapsed = time.time() - start |
| rate = done / elapsed if elapsed > 0 else 0 |
| print(f" [{done}/{len(tasks)}] {rate:.1f} evals/s elapsed={elapsed:.0f}s") |
|
|
| |
| print() |
| print("=" * 60) |
| print("Results") |
| print("=" * 60) |
|
|
| csv_path = args.output or os.path.join(results_base, "baseline_results.csv") |
| with open(csv_path, "w", newline="") as f: |
| writer = csv.DictWriter(f, fieldnames=["model", "problem_id", "solution", "score"]) |
| writer.writeheader() |
| for r in sorted(all_results, key=lambda x: (x["model"], int(x["problem_id"]), x["solution"])): |
| writer.writerow(r) |
|
|
| for model in args.models: |
| model_results = [r for r in all_results if r["model"] == model] |
| if not model_results: |
| print(f"\n{model}: no results") |
| continue |
|
|
| |
| by_problem = {} |
| for r in model_results: |
| by_problem.setdefault(r["problem_id"], []).append(r["score"]) |
|
|
| best_scores = [] |
| avg_scores = [] |
| for pid in sorted(by_problem, key=int): |
| scores = by_problem[pid] |
| best = max(scores) |
| avg = sum(scores) / len(scores) |
| best_scores.append(best) |
| avg_scores.append(avg) |
|
|
| overall_best = sum(best_scores) / len(best_scores) if best_scores else 0 |
| overall_avg = sum(avg_scores) / len(avg_scores) if avg_scores else 0 |
|
|
| print(f"\n{model}:") |
| print(f" Problems evaluated: {len(by_problem)}") |
| print(f" Avg score (avg-of-k): {overall_avg:.2f}") |
| print(f" Avg score (best-of-k): {overall_best:.2f}") |
|
|
| print(f"\nDetailed results: {csv_path}") |
| print() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|