| """ |
| Cascade Validation Runner — proves the static cascade on SWE-bench. |
| |
| This script runs the cascade agent on selected SWE-bench instances and |
| verifies the patches by applying test_patch and running FAIL_TO_PASS tests. |
| |
| Strategy: |
| 1. Clone repo, set up conda environment from environment_setup_commit |
| 2. Run cascade agent (T1 Llama-3.1-8B → T2 Llama-3.3-70B) |
| 3. Apply model patch + test_patch |
| 4. Run FAIL_TO_PASS tests via pytest |
| 5. Record: resolved, cost, model tier used, token counts |
| |
| The critical question this answers: |
| "Do the 10 cascade-only (T1/T2 solves where both T4 models fail) |
| instances produce valid patches, or are they weak-test passes?" |
| |
| Requirements: |
| - conda (mamba preferred for speed) |
| - git |
| - HF_TOKEN (free inference via huggingface_hub) |
| - No Docker needed |
| |
| Usage: |
| python validate_cascade.py --instance django__django-12308 |
| python validate_cascade.py --batch 10 --target cascade-only |
| python validate_cascade.py --batch 50 --target all |
| """ |
|
|
| import argparse |
| import json |
| import os |
| import re |
| import subprocess |
| import sys |
| import tempfile |
| import time |
| import traceback |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Optional, Tuple, Dict, List |
|
|
| |
| |
| |
|
|
| |
| |
| CASCADE_ONLY_INSTANCES = [ |
| "astropy__astropy-14365", |
| "astropy__astropy-14995", |
| "django__django-11815", |
| "django__django-13089", |
| "django__django-13807", |
| "django__django-14315", |
| "matplotlib__matplotlib-25224", |
| "matplotlib__matplotlib-25311", |
| "sympy__sympy-19487", |
| "sympy__sympy-20590", |
| ] |
|
|
| |
| FRONTIER_ONLY_INSTANCES = [ |
| "django__django-12453", |
| "django__django-14030", |
| "django__django-14349", |
| "django__django-14855", |
| "django__django-15098", |
| "django__django-16235", |
| "matplotlib__matplotlib-26020", |
| "psf__requests-6028", |
| "pylint-dev__pylint-7080", |
| "scikit-learn__scikit-learn-13439", |
| "scikit-learn__scikit-learn-14087", |
| "sphinx-doc__sphinx-10323", |
| "sphinx-doc__sphinx-10466", |
| "sphinx-doc__sphinx-10614", |
| ] |
|
|
| REPO_URLS = { |
| "django/django": "https://github.com/django/django.git", |
| "pytest-dev/pytest": "https://github.com/pytest-dev/pytest.git", |
| "scikit-learn/scikit-learn": "https://github.com/scikit-learn/scikit-learn.git", |
| "sympy/sympy": "https://github.com/sympy/sympy.git", |
| "matplotlib/matplotlib": "https://github.com/matplotlib/matplotlib.git", |
| "sphinx-doc/sphinx": "https://github.com/sphinx-doc/sphinx.git", |
| "astropy/astropy": "https://github.com/astropy/astropy.git", |
| "psf/requests": "https://github.com/psf/requests.git", |
| "pylint-dev/pylint": "https://github.com/pylint-dev/pylint.git", |
| } |
|
|
|
|
| def run(cmd: list, cwd: str = None, timeout: int = 180, env: dict = None) -> Tuple[int, str, str]: |
| """Run a command, return (returncode, stdout, stderr).""" |
| try: |
| result = subprocess.run( |
| cmd, cwd=cwd, capture_output=True, text=True, |
| timeout=timeout, env=env or os.environ |
| ) |
| return result.returncode, result.stdout, result.stderr |
| except subprocess.TimeoutExpired: |
| return 124, "", "TIMEOUT" |
| except Exception as e: |
| return -1, "", str(e) |
|
|
|
|
| |
| |
| |
|
|
| def call_hf_model( |
| model_id: str, |
| messages: list, |
| max_tokens: int = 4096, |
| temperature: float = 0.2 |
| ) -> Tuple[str, int, int]: |
| """ |
| Call a model via HF Inference API (free). |
| Returns (response_text, input_tokens, output_tokens). |
| """ |
| from huggingface_hub import InferenceClient |
| |
| client = InferenceClient(model_id) |
| |
| completion = client.chat.completions.create( |
| model=model_id, |
| messages=messages, |
| max_tokens=max_tokens, |
| temperature=temperature, |
| ) |
| |
| response = completion.choices[0].message.content |
| input_tokens = getattr(completion.usage, "prompt_tokens", 0) if hasattr(completion, "usage") and completion.usage else len(messages) * 100 |
| output_tokens = getattr(completion.usage, "completion_tokens", 0) if hasattr(completion, "usage") and completion.usage else len(response) // 4 |
| |
| return response, input_tokens, output_tokens |
|
|
|
|
| def build_cascade_messages( |
| instance: dict, |
| repo_dir: Path, |
| previous_failure: Optional[str] = None, |
| ) -> list: |
| """Build the message list for the cascade agent.""" |
| |
| problem = instance.get("problem_statement", "") |
| hint = instance.get("hints_text", "") |
| |
| system_prompt = """You are a software engineer fixing a bug in an open-source project. |
| Your task is to produce a correct patch that fixes the issue described. |
| |
| You have access to a bash shell in the repository directory. Use it to: |
| - Explore the codebase (ls, find, grep, git log) |
| - Read files (cat, head) |
| - Run existing tests (pytest) |
| - Edit files with sed or write tools |
| |
| Output format: |
| - For bash commands: <bash>command here</bash> |
| - For your final patch: <patch>diff here</patch> |
| - When done and tests pass: <submit>Done</submit> |
| |
| Be thorough. Read the relevant code, understand the bug, make a minimal fix, |
| and verify it passes the tests.""" |
|
|
| |
| task = f"""Repository: {instance['repo']} |
| Base commit: {instance['base_commit'][:12]} |
| |
| PROBLEM: |
| {problem}""" |
|
|
| if hint: |
| task += f"\n\nHINT: {hint}" |
| |
| task += f""" |
| |
| The repository is at {repo_dir}. Your bash commands will run from that directory. |
| Start by exploring the codebase to understand the issue, then implement and test your fix.""" |
|
|
| if previous_failure: |
| system_prompt += f"\n\nYour previous attempt failed with the following issues:\n{previous_failure}\nPlease fix your approach." |
|
|
| return [ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": task}, |
| ] |
|
|
|
|
| def extract_patch(response: str) -> Optional[str]: |
| """Extract a git patch from the agent's response.""" |
| |
| m = re.search(r'<patch>(.*?)</patch>', response, re.DOTALL) |
| if m: |
| return m.group(1).strip() |
| |
| |
| m = re.search(r'```diff\s*\n(.*?)```', response, re.DOTALL) |
| if m: |
| return m.group(1).strip() |
| |
| |
| m = re.search(r'```patch\s*\n(.*?)```', response, re.DOTALL) |
| if m: |
| return m.group(1).strip() |
| |
| |
| diff_match = re.search(r'(diff --git.*?)(?:\n```|\n<patch>|\n<submit>|\Z)', response, re.DOTALL) |
| if diff_match: |
| return diff_match.group(1).strip() |
| |
| return None |
|
|
|
|
| def run_cascade_agent( |
| instance: dict, |
| repo_dir: Path, |
| max_turns: int = 30, |
| ) -> dict: |
| """ |
| Run the cascade agent on one instance. |
| |
| Tiers: |
| T1: meta-llama/Llama-3.1-8B-Instruct (free, fast, weak) |
| T2: meta-llama/Llama-3.3-70B-Instruct (free, moderate) |
| |
| Returns: {patch, resolved, tier, tokens, turns, cost} |
| """ |
| |
| TIERS = [ |
| {"name": "T1", "model": "meta-llama/Llama-3.1-8B-Instruct", "max_turns": max_turns, "cost_per_1k": 0.0}, |
| {"name": "T2", "model": "meta-llama/Llama-3.3-70B-Instruct", "max_turns": max_turns, "cost_per_1k": 0.0}, |
| ] |
| |
| result = { |
| "instance_id": instance["instance_id"], |
| "patch": None, |
| "resolved": False, |
| "tier_used": None, |
| "total_turns": 0, |
| "total_input_tokens": 0, |
| "total_output_tokens": 0, |
| "error": None, |
| } |
| |
| previous_failures = [] |
| |
| for tier in TIERS: |
| print(f"\n [{tier['name']}] {tier['model']} (max {tier['max_turns']} turns)") |
| |
| messages = build_cascade_messages( |
| instance, repo_dir, |
| previous_failure="\n".join(previous_failures) if previous_failures else None |
| ) |
| |
| tier_input_tokens = 0 |
| tier_output_tokens = 0 |
| |
| for turn in range(tier['max_turns']): |
| try: |
| response, in_tok, out_tok = call_hf_model( |
| tier['model'], messages, |
| max_tokens=4096, |
| temperature=0.2 |
| ) |
| tier_input_tokens += in_tok |
| tier_output_tokens += out_tok |
| |
| |
| messages.append({"role": "assistant", "content": response}) |
| |
| print(f" Turn {turn+1}: {in_tok}+{out_tok} tokens") |
| |
| |
| patch = extract_patch(response) |
| if patch: |
| print(f" → Patch found! ({len(patch)} chars)") |
| result["patch"] = patch |
| result["tier_used"] = tier["name"] |
| result["total_turns"] = turn + 1 |
| result["total_input_tokens"] = tier_input_tokens |
| result["total_output_tokens"] = tier_output_tokens |
| return result |
| |
| |
| bash_commands = re.findall(r'<bash>(.*?)</bash>', response, re.DOTALL) |
| for cmd in bash_commands: |
| cmd = cmd.strip() |
| print(f" $ {cmd[:100]}...") |
| rc, stdout, stderr = run( |
| ["bash", "-c", cmd], |
| cwd=str(repo_dir), |
| timeout=60 |
| ) |
| output = stdout.strip()[:2000] |
| if stderr.strip(): |
| output += f"\n[stderr: {stderr.strip()[:500]}]" |
| if rc != 0: |
| output += f"\n[EXIT CODE: {rc}]" |
| messages.append({"role": "user", "content": f"<output>\n{output}\n</output>"}) |
| |
| |
| if "<submit>" in response: |
| print(f" → Submitted but no patch found") |
| break |
| |
| except Exception as e: |
| print(f" Error: {e}") |
| previous_failures.append(f"[{tier['name']}] turn {turn+1} error: {str(e)[:200]}") |
| break |
| |
| previous_failures.append(f"[{tier['name']}] failed after {tier['max_turns']} turns — no patch produced") |
| |
| result["error"] = "All tiers exhausted without producing a patch" |
| return result |
|
|
|
|
| |
| |
| |
|
|
| def verify_cascade_patch( |
| instance: dict, |
| model_patch: str, |
| repo_dir: Path, |
| env_name: str = None, |
| ) -> dict: |
| """ |
| Verify that a model-generated patch passes SWE-bench tests. |
| |
| 1. Reset repo to base_commit |
| 2. Apply model patch |
| 3. Apply test_patch |
| 4. Run FAIL_TO_PASS tests |
| 5. Run PASS_TO_PASS tests (regression check) |
| """ |
| result = { |
| "resolved": False, |
| "all_f2p_pass": False, |
| "all_p2p_pass": False, |
| "f2p_failures": [], |
| "p2p_failures": [], |
| "error": None, |
| } |
| |
| try: |
| |
| base_commit = instance.get("base_commit", "") |
| if base_commit: |
| rc, _, _ = run(["git", "checkout", "-f", base_commit], cwd=str(repo_dir)) |
| if rc != 0: |
| result["error"] = f"could not checkout base_commit {base_commit[:12]}" |
| return result |
| |
| |
| patch_file = repo_dir / "aco_model.patch" |
| patch_file.write_text(model_patch) |
| |
| rc, out, err = run( |
| ["git", "apply", "--check", str(patch_file)], |
| cwd=str(repo_dir) |
| ) |
| if rc != 0: |
| result["error"] = f"patch --check failed: {err[:300]}" |
| return result |
| |
| rc, out, err = run( |
| ["git", "apply", str(patch_file)], |
| cwd=str(repo_dir) |
| ) |
| if rc != 0: |
| |
| rc, out, err = run( |
| ["git", "apply", "--reject", str(patch_file)], |
| cwd=str(repo_dir) |
| ) |
| if rc != 0: |
| result["error"] = f"patch apply failed: {err[:300]}" |
| return result |
| |
| |
| test_patch = instance.get("test_patch", "") |
| if not test_patch: |
| result["error"] = "no test_patch in instance" |
| return result |
| |
| test_file = repo_dir / "aco_test.patch" |
| test_file.write_text(test_patch) |
| |
| rc, out, err = run( |
| ["git", "apply", "--check", str(test_file)], |
| cwd=str(repo_dir) |
| ) |
| if rc == 0: |
| rc, out, err = run( |
| ["git", "apply", str(test_file)], |
| cwd=str(repo_dir) |
| ) |
| if rc != 0: |
| rc, out, err = run( |
| ["git", "apply", "--reject", str(test_file)], |
| cwd=str(repo_dir) |
| ) |
| |
| |
| f2p = instance.get("FAIL_TO_PASS", []) |
| if not f2p: |
| result["error"] = "no FAIL_TO_PASS tests" |
| return result |
| |
| print(f" Running {len(f2p)} FAIL_TO_PASS tests...") |
| cmd_prefix = ["conda", "run", "-n", env_name] if env_name else [] |
| cmd = cmd_prefix + ["python", "-m", "pytest", "-v", "--tb=short", "-x"] + f2p |
| rc, out, err = run(cmd, cwd=str(repo_dir), timeout=300) |
| |
| if rc == 0: |
| result["all_f2p_pass"] = True |
| |
| |
| p2p = instance.get("PASS_TO_PASS", []) |
| if p2p: |
| print(f" Running {len(p2p)} PASS_TO_PASS regression tests...") |
| cmd2 = cmd_prefix + ["python", "-m", "pytest", "-v", "--tb=short", "-x"] + p2p[:20] |
| rc2, out2, err2 = run(cmd2, cwd=str(repo_dir), timeout=300) |
| |
| if rc2 == 0: |
| result["all_p2p_pass"] = True |
| result["resolved"] = True |
| else: |
| result["error"] = f"P2P regression: {(out2+err2)[:300]}" |
| result["p2p_failures"] = [l.strip() for l in (out2+err2).split('\n') if 'FAILED' in l and '::' in l] |
| else: |
| result["resolved"] = True |
| else: |
| result["error"] = f"F2P failures: {(out+err)[:500]}" |
| result["f2p_failures"] = [l.strip() for l in (out+err).split('\n') if 'FAILED' in l and '::' in l] |
| |
| return result |
| |
| except Exception as e: |
| result["error"] = f"verification error: {str(e)[:300]}" |
| return result |
|
|
|
|
| |
| |
| |
|
|
| def validate_one(instance: dict) -> dict: |
| """ |
| Full validation pipeline for one instance: |
| 1. Clone repo |
| 2. Set up conda environment |
| 3. Run cascade agent |
| 4. Verify patch |
| 5. Report results |
| """ |
| inst_id = instance["instance_id"] |
| repo = instance.get("repo", "") |
| env_setup_commit = instance.get("environment_setup_commit", "") |
| base_commit = instance.get("base_commit", "") |
| |
| result = { |
| "instance_id": inst_id, |
| "repo": repo, |
| "timestamp": datetime.now().isoformat(), |
| "stages": {}, |
| "final_resolved": False, |
| "tier_used": None, |
| "total_cost": 0.0, |
| "error": None, |
| } |
| |
| print(f"\n{'='*70}") |
| print(f"VALIDATING: {inst_id}") |
| print(f" Repo: {repo} Base: {base_commit[:12]} EnvSetup: {env_setup_commit[:12]}") |
| print(f"{'='*70}") |
| |
| with tempfile.TemporaryDirectory(prefix=f"aco_valid_{inst_id.replace('/', '_')}_") as tmpdir: |
| work_dir = Path(tmpdir) |
| repo_dir = work_dir / "repo" |
| env_name = f"aco_{inst_id.replace('__', '_').replace('-', '_')[:40]}" |
| |
| |
| print("\n--- Stage 1: Clone repo ---") |
| repo_url = REPO_URLS.get(repo, f"https://github.com/{repo}.git") |
| t0 = time.time() |
| rc, out, err = run(["git", "clone", "--depth", "50", repo_url, str(repo_dir)], timeout=300) |
| if rc != 0: |
| |
| rc, out, err = run(["git", "clone", repo_url, str(repo_dir)], timeout=600) |
| result["stages"]["clone"] = {"success": rc == 0, "duration": time.time() - t0} |
| if rc != 0: |
| result["error"] = f"clone failed: {err[:300]}" |
| return result |
| |
| |
| print("\n--- Stage 2: Set up conda environment ---") |
| t0 = time.time() |
| |
| |
| if env_setup_commit: |
| run(["git", "fetch", "origin", env_setup_commit], cwd=str(repo_dir), timeout=60) |
| run(["git", "checkout", env_setup_commit], cwd=str(repo_dir), timeout=30) |
| |
| |
| env_candidates = [ |
| "environment.yml", "dev/environment.yml", ".github/environment.yml", |
| "ci/environment.yml", ".azure-pipelines/environment.yml", |
| ] |
| env_yml = None |
| for c in env_candidates: |
| p = repo_dir / c |
| if p.exists(): |
| env_yml = p |
| break |
| if not env_yml: |
| for p in repo_dir.rglob("environment.yml"): |
| if p.stat().st_size > 10: |
| env_yml = p |
| break |
| |
| env_setup_ok = False |
| if env_yml: |
| print(f" Using environment.yml: {env_yml.relative_to(repo_dir)}") |
| rc, out, err = run( |
| ["conda", "env", "create", "-f", str(env_yml), "-n", env_name, "--quiet"], |
| timeout=600 |
| ) |
| env_setup_ok = (rc == 0) |
| else: |
| print(f" No environment.yml found, creating python=3.10 env") |
| rc, out, err = run( |
| ["conda", "create", "-n", env_name, "python=3.10", "pip", "-y", "--quiet"], |
| timeout=300 |
| ) |
| env_setup_ok = (rc == 0) |
| |
| if not env_setup_ok: |
| result["stages"]["environment"] = {"success": False, "error": err[:300], "duration": time.time() - t0} |
| result["error"] = f"env setup failed: {err[:300]}" |
| return result |
| |
| |
| if base_commit: |
| run(["git", "fetch", "origin", base_commit], cwd=str(repo_dir), timeout=60) |
| run(["git", "checkout", base_commit], cwd=str(repo_dir), timeout=30) |
| |
| run(["conda", "run", "-n", env_name, "pip", "install", "-e", ".", "--quiet"], |
| cwd=str(repo_dir), timeout=300) |
| |
| result["stages"]["environment"] = {"success": True, "duration": time.time() - t0} |
| print(f" Environment ready in {time.time() - t0:.1f}s") |
| |
| |
| print("\n--- Stage 3: Run cascade agent ---") |
| t0 = time.time() |
| agent_result = run_cascade_agent(instance, repo_dir, max_turns=30) |
| result["stages"]["agent"] = { |
| "success": agent_result["patch"] is not None, |
| "tier": agent_result["tier_used"], |
| "turns": agent_result["total_turns"], |
| "input_tokens": agent_result["total_input_tokens"], |
| "output_tokens": agent_result["total_output_tokens"], |
| "duration": time.time() - t0, |
| } |
| |
| if not agent_result["patch"]: |
| result["error"] = "No patch produced by any tier" |
| return result |
| |
| |
| print("\n--- Stage 4: Verify patch ---") |
| t0 = time.time() |
| verify_result = verify_cascade_patch(instance, agent_result["patch"], repo_dir, env_name) |
| result["stages"]["verify"] = { |
| "resolved": verify_result["resolved"], |
| "all_f2p_pass": verify_result.get("all_f2p_pass", False), |
| "all_p2p_pass": verify_result.get("all_p2p_pass", False), |
| "error": verify_result.get("error"), |
| "duration": time.time() - t0, |
| } |
| result["final_resolved"] = verify_result["resolved"] |
| result["tier_used"] = agent_result["tier_used"] |
| |
| if verify_result.get("error"): |
| result["error"] = verify_result["error"] |
| |
| |
| print(f"\n Cleaning up conda env {env_name}...") |
| run(["conda", "env", "remove", "-n", env_name, "-y", "--quiet"], timeout=30) |
| |
| return result |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Cascade Validation Runner") |
| parser.add_argument("--instance", type=str, help="Single instance ID") |
| parser.add_argument("--batch", type=int, default=10, help="Number of instances to validate") |
| parser.add_argument("--target", choices=["cascade-only", "frontier-only", "all"], default="cascade-only") |
| parser.add_argument("--output", type=str, default="validation_results.jsonl") |
| args = parser.parse_args() |
| |
| from datasets import load_dataset |
| |
| print("Loading SWE-bench_Verified...") |
| ds = load_dataset("princeton-nlp/SWE-bench_Verified", split="test") |
| |
| |
| if args.instance: |
| instances = [dict(row) for row in ds if row["instance_id"] == args.instance] |
| if not instances: |
| print(f"Instance {args.instance} not found") |
| sys.exit(1) |
| elif args.target == "cascade-only": |
| instances = [dict(row) for row in ds if row["instance_id"] in CASCADE_ONLY_INSTANCES] |
| instances = instances[:args.batch] |
| elif args.target == "frontier-only": |
| instances = [dict(row) for row in ds if row["instance_id"] in FRONTIER_ONLY_INSTANCES] |
| instances = instances[:args.batch] |
| else: |
| instances = [dict(row) for row in ds][:args.batch] |
| |
| print(f"Selected {len(instances)} instances for validation\n") |
| |
| results = [] |
| for i, instance in enumerate(instances): |
| print(f"\n{'#'*70}") |
| print(f" [{i+1}/{len(instances)}] {instance['instance_id']}") |
| print(f"{'#'*70}") |
| |
| try: |
| result = validate_one(instance) |
| results.append(result) |
| |
| |
| status = "✅ RESOLVED" if result["final_resolved"] else "❌ FAILED" |
| print(f"\n {status} | Tier: {result['tier_used']} | error: {result.get('error', 'none')}") |
| |
| except Exception as e: |
| print(f"\n ❌ CRASH: {e}") |
| traceback.print_exc() |
| results.append({ |
| "instance_id": instance["instance_id"], |
| "final_resolved": False, |
| "error": str(e), |
| }) |
| |
| |
| with open(args.output, "w") as f: |
| for r in results: |
| f.write(json.dumps(r) + "\n") |
| |
| print(f"\n ← Saved to {args.output} ({len(results)} results so far)") |
| |
| |
| resolved = [r for r in results if r["final_resolved"]] |
| t1_resolved = [r for r in resolved if r.get("tier_used") == "T1"] |
| t2_resolved = [r for r in resolved if r.get("tier_used") == "T2"] |
| |
| print(f"\n{'='*70}") |
| print(f"VALIDATION COMPLETE") |
| print(f"{'='*70}") |
| print(f" Total: {len(results)}") |
| print(f" Resolved: {len(resolved)} ({len(resolved)/max(len(results),1)*100:.1f}%)") |
| print(f" T1 (Llama-3.1-8B): {len(t1_resolved)}") |
| print(f" T2 (Llama-3.3-70B): {len(t2_resolved)}") |
| print(f" Failed: {len(results) - len(resolved)}") |
| print(f" Results: {args.output}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|