""" Cascade Validation Runner — proves the static cascade on SWE-bench. This script runs the cascade agent on selected SWE-bench instances and verifies the patches by applying test_patch and running FAIL_TO_PASS tests. Strategy: 1. Clone repo, set up conda environment from environment_setup_commit 2. Run cascade agent (T1 Llama-3.1-8B → T2 Llama-3.3-70B) 3. Apply model patch + test_patch 4. Run FAIL_TO_PASS tests via pytest 5. Record: resolved, cost, model tier used, token counts The critical question this answers: "Do the 10 cascade-only (T1/T2 solves where both T4 models fail) instances produce valid patches, or are they weak-test passes?" Requirements: - conda (mamba preferred for speed) - git - HF_TOKEN (free inference via huggingface_hub) - No Docker needed Usage: python validate_cascade.py --instance django__django-12308 python validate_cascade.py --batch 10 --target cascade-only python validate_cascade.py --batch 50 --target all """ import argparse import json import os import re import subprocess import sys import tempfile import time import traceback from datetime import datetime from pathlib import Path from typing import Optional, Tuple, Dict, List # ============================================================ # SWE-BENCH CONSTANTS # ============================================================ # The 10 cascade-only instances (T1 or T2 solves, both T4 models fail) # Extracted from the trace simulation in CORRECTED_REPORT.md CASCADE_ONLY_INSTANCES = [ "astropy__astropy-14365", "astropy__astropy-14995", "django__django-11815", "django__django-13089", "django__django-13807", "django__django-14315", "matplotlib__matplotlib-25224", "matplotlib__matplotlib-25311", "sympy__sympy-19487", "sympy__sympy-20590", ] # The full set includes 14 frontier-retry-only instances for comparison FRONTIER_ONLY_INSTANCES = [ "django__django-12453", "django__django-14030", "django__django-14349", "django__django-14855", "django__django-15098", "django__django-16235", "matplotlib__matplotlib-26020", "psf__requests-6028", "pylint-dev__pylint-7080", "scikit-learn__scikit-learn-13439", "scikit-learn__scikit-learn-14087", "sphinx-doc__sphinx-10323", "sphinx-doc__sphinx-10466", "sphinx-doc__sphinx-10614", ] REPO_URLS = { "django/django": "https://github.com/django/django.git", "pytest-dev/pytest": "https://github.com/pytest-dev/pytest.git", "scikit-learn/scikit-learn": "https://github.com/scikit-learn/scikit-learn.git", "sympy/sympy": "https://github.com/sympy/sympy.git", "matplotlib/matplotlib": "https://github.com/matplotlib/matplotlib.git", "sphinx-doc/sphinx": "https://github.com/sphinx-doc/sphinx.git", "astropy/astropy": "https://github.com/astropy/astropy.git", "psf/requests": "https://github.com/psf/requests.git", "pylint-dev/pylint": "https://github.com/pylint-dev/pylint.git", } def run(cmd: list, cwd: str = None, timeout: int = 180, env: dict = None) -> Tuple[int, str, str]: """Run a command, return (returncode, stdout, stderr).""" try: result = subprocess.run( cmd, cwd=cwd, capture_output=True, text=True, timeout=timeout, env=env or os.environ ) return result.returncode, result.stdout, result.stderr except subprocess.TimeoutExpired: return 124, "", "TIMEOUT" except Exception as e: return -1, "", str(e) # ============================================================ # CASCADE AGENT (using free HF Inference API) # ============================================================ def call_hf_model( model_id: str, messages: list, max_tokens: int = 4096, temperature: float = 0.2 ) -> Tuple[str, int, int]: """ Call a model via HF Inference API (free). Returns (response_text, input_tokens, output_tokens). """ from huggingface_hub import InferenceClient client = InferenceClient(model_id) completion = client.chat.completions.create( model=model_id, messages=messages, max_tokens=max_tokens, temperature=temperature, ) response = completion.choices[0].message.content input_tokens = getattr(completion.usage, "prompt_tokens", 0) if hasattr(completion, "usage") and completion.usage else len(messages) * 100 output_tokens = getattr(completion.usage, "completion_tokens", 0) if hasattr(completion, "usage") and completion.usage else len(response) // 4 return response, input_tokens, output_tokens def build_cascade_messages( instance: dict, repo_dir: Path, previous_failure: Optional[str] = None, ) -> list: """Build the message list for the cascade agent.""" problem = instance.get("problem_statement", "") hint = instance.get("hints_text", "") system_prompt = """You are a software engineer fixing a bug in an open-source project. Your task is to produce a correct patch that fixes the issue described. You have access to a bash shell in the repository directory. Use it to: - Explore the codebase (ls, find, grep, git log) - Read files (cat, head) - Run existing tests (pytest) - Edit files with sed or write tools Output format: - For bash commands: command here - For your final patch: diff here - When done and tests pass: Done Be thorough. Read the relevant code, understand the bug, make a minimal fix, and verify it passes the tests.""" # Build the task description task = f"""Repository: {instance['repo']} Base commit: {instance['base_commit'][:12]} PROBLEM: {problem}""" if hint: task += f"\n\nHINT: {hint}" task += f""" The repository is at {repo_dir}. Your bash commands will run from that directory. Start by exploring the codebase to understand the issue, then implement and test your fix.""" if previous_failure: system_prompt += f"\n\nYour previous attempt failed with the following issues:\n{previous_failure}\nPlease fix your approach." return [ {"role": "system", "content": system_prompt}, {"role": "user", "content": task}, ] def extract_patch(response: str) -> Optional[str]: """Extract a git patch from the agent's response.""" # Try tags m = re.search(r'(.*?)', response, re.DOTALL) if m: return m.group(1).strip() # Try ```diff blocks m = re.search(r'```diff\s*\n(.*?)```', response, re.DOTALL) if m: return m.group(1).strip() # Try ```patch blocks m = re.search(r'```patch\s*\n(.*?)```', response, re.DOTALL) if m: return m.group(1).strip() # Try to find diff content directly diff_match = re.search(r'(diff --git.*?)(?:\n```|\n|\n|\Z)', response, re.DOTALL) if diff_match: return diff_match.group(1).strip() return None def run_cascade_agent( instance: dict, repo_dir: Path, max_turns: int = 30, ) -> dict: """ Run the cascade agent on one instance. Tiers: T1: meta-llama/Llama-3.1-8B-Instruct (free, fast, weak) T2: meta-llama/Llama-3.3-70B-Instruct (free, moderate) Returns: {patch, resolved, tier, tokens, turns, cost} """ TIERS = [ {"name": "T1", "model": "meta-llama/Llama-3.1-8B-Instruct", "max_turns": max_turns, "cost_per_1k": 0.0}, {"name": "T2", "model": "meta-llama/Llama-3.3-70B-Instruct", "max_turns": max_turns, "cost_per_1k": 0.0}, ] result = { "instance_id": instance["instance_id"], "patch": None, "resolved": False, "tier_used": None, "total_turns": 0, "total_input_tokens": 0, "total_output_tokens": 0, "error": None, } previous_failures = [] for tier in TIERS: print(f"\n [{tier['name']}] {tier['model']} (max {tier['max_turns']} turns)") messages = build_cascade_messages( instance, repo_dir, previous_failure="\n".join(previous_failures) if previous_failures else None ) tier_input_tokens = 0 tier_output_tokens = 0 for turn in range(tier['max_turns']): try: response, in_tok, out_tok = call_hf_model( tier['model'], messages, max_tokens=4096, temperature=0.2 ) tier_input_tokens += in_tok tier_output_tokens += out_tok # Add response to conversation messages.append({"role": "assistant", "content": response}) print(f" Turn {turn+1}: {in_tok}+{out_tok} tokens") # Check for patch submission patch = extract_patch(response) if patch: print(f" → Patch found! ({len(patch)} chars)") result["patch"] = patch result["tier_used"] = tier["name"] result["total_turns"] = turn + 1 result["total_input_tokens"] = tier_input_tokens result["total_output_tokens"] = tier_output_tokens return result # Check for bash commands bash_commands = re.findall(r'(.*?)', response, re.DOTALL) for cmd in bash_commands: cmd = cmd.strip() print(f" $ {cmd[:100]}...") rc, stdout, stderr = run( ["bash", "-c", cmd], cwd=str(repo_dir), timeout=60 ) output = stdout.strip()[:2000] if stderr.strip(): output += f"\n[stderr: {stderr.strip()[:500]}]" if rc != 0: output += f"\n[EXIT CODE: {rc}]" messages.append({"role": "user", "content": f"\n{output}\n"}) # Check for submit if "" in response: print(f" → Submitted but no patch found") break except Exception as e: print(f" Error: {e}") previous_failures.append(f"[{tier['name']}] turn {turn+1} error: {str(e)[:200]}") break previous_failures.append(f"[{tier['name']}] failed after {tier['max_turns']} turns — no patch produced") result["error"] = "All tiers exhausted without producing a patch" return result # ============================================================ # PATCH VERIFICATION # ============================================================ def verify_cascade_patch( instance: dict, model_patch: str, repo_dir: Path, env_name: str = None, ) -> dict: """ Verify that a model-generated patch passes SWE-bench tests. 1. Reset repo to base_commit 2. Apply model patch 3. Apply test_patch 4. Run FAIL_TO_PASS tests 5. Run PASS_TO_PASS tests (regression check) """ result = { "resolved": False, "all_f2p_pass": False, "all_p2p_pass": False, "f2p_failures": [], "p2p_failures": [], "error": None, } try: # Reset to base_commit base_commit = instance.get("base_commit", "") if base_commit: rc, _, _ = run(["git", "checkout", "-f", base_commit], cwd=str(repo_dir)) if rc != 0: result["error"] = f"could not checkout base_commit {base_commit[:12]}" return result # Apply model patch patch_file = repo_dir / "aco_model.patch" patch_file.write_text(model_patch) rc, out, err = run( ["git", "apply", "--check", str(patch_file)], cwd=str(repo_dir) ) if rc != 0: result["error"] = f"patch --check failed: {err[:300]}" return result rc, out, err = run( ["git", "apply", str(patch_file)], cwd=str(repo_dir) ) if rc != 0: # Try with --reject rc, out, err = run( ["git", "apply", "--reject", str(patch_file)], cwd=str(repo_dir) ) if rc != 0: result["error"] = f"patch apply failed: {err[:300]}" return result # Apply test_patch test_patch = instance.get("test_patch", "") if not test_patch: result["error"] = "no test_patch in instance" return result test_file = repo_dir / "aco_test.patch" test_file.write_text(test_patch) rc, out, err = run( ["git", "apply", "--check", str(test_file)], cwd=str(repo_dir) ) if rc == 0: rc, out, err = run( ["git", "apply", str(test_file)], cwd=str(repo_dir) ) if rc != 0: rc, out, err = run( ["git", "apply", "--reject", str(test_file)], cwd=str(repo_dir) ) # Run FAIL_TO_PASS tests f2p = instance.get("FAIL_TO_PASS", []) if not f2p: result["error"] = "no FAIL_TO_PASS tests" return result print(f" Running {len(f2p)} FAIL_TO_PASS tests...") cmd_prefix = ["conda", "run", "-n", env_name] if env_name else [] cmd = cmd_prefix + ["python", "-m", "pytest", "-v", "--tb=short", "-x"] + f2p rc, out, err = run(cmd, cwd=str(repo_dir), timeout=300) if rc == 0: result["all_f2p_pass"] = True # Run PASS_TO_PASS regression tests p2p = instance.get("PASS_TO_PASS", []) if p2p: print(f" Running {len(p2p)} PASS_TO_PASS regression tests...") cmd2 = cmd_prefix + ["python", "-m", "pytest", "-v", "--tb=short", "-x"] + p2p[:20] rc2, out2, err2 = run(cmd2, cwd=str(repo_dir), timeout=300) if rc2 == 0: result["all_p2p_pass"] = True result["resolved"] = True else: result["error"] = f"P2P regression: {(out2+err2)[:300]}" result["p2p_failures"] = [l.strip() for l in (out2+err2).split('\n') if 'FAILED' in l and '::' in l] else: result["resolved"] = True else: result["error"] = f"F2P failures: {(out+err)[:500]}" result["f2p_failures"] = [l.strip() for l in (out+err).split('\n') if 'FAILED' in l and '::' in l] return result except Exception as e: result["error"] = f"verification error: {str(e)[:300]}" return result # ============================================================ # MAIN VALIDATION PIPELINE # ============================================================ def validate_one(instance: dict) -> dict: """ Full validation pipeline for one instance: 1. Clone repo 2. Set up conda environment 3. Run cascade agent 4. Verify patch 5. Report results """ inst_id = instance["instance_id"] repo = instance.get("repo", "") env_setup_commit = instance.get("environment_setup_commit", "") base_commit = instance.get("base_commit", "") result = { "instance_id": inst_id, "repo": repo, "timestamp": datetime.now().isoformat(), "stages": {}, "final_resolved": False, "tier_used": None, "total_cost": 0.0, "error": None, } print(f"\n{'='*70}") print(f"VALIDATING: {inst_id}") print(f" Repo: {repo} Base: {base_commit[:12]} EnvSetup: {env_setup_commit[:12]}") print(f"{'='*70}") with tempfile.TemporaryDirectory(prefix=f"aco_valid_{inst_id.replace('/', '_')}_") as tmpdir: work_dir = Path(tmpdir) repo_dir = work_dir / "repo" env_name = f"aco_{inst_id.replace('__', '_').replace('-', '_')[:40]}" # STAGE 1: Clone repo print("\n--- Stage 1: Clone repo ---") repo_url = REPO_URLS.get(repo, f"https://github.com/{repo}.git") t0 = time.time() rc, out, err = run(["git", "clone", "--depth", "50", repo_url, str(repo_dir)], timeout=300) if rc != 0: # Retry without depth limit rc, out, err = run(["git", "clone", repo_url, str(repo_dir)], timeout=600) result["stages"]["clone"] = {"success": rc == 0, "duration": time.time() - t0} if rc != 0: result["error"] = f"clone failed: {err[:300]}" return result # STAGE 2: Set up environment print("\n--- Stage 2: Set up conda environment ---") t0 = time.time() # Checkout env_setup_commit to find environment.yml if env_setup_commit: run(["git", "fetch", "origin", env_setup_commit], cwd=str(repo_dir), timeout=60) run(["git", "checkout", env_setup_commit], cwd=str(repo_dir), timeout=30) # Find environment.yml env_candidates = [ "environment.yml", "dev/environment.yml", ".github/environment.yml", "ci/environment.yml", ".azure-pipelines/environment.yml", ] env_yml = None for c in env_candidates: p = repo_dir / c if p.exists(): env_yml = p break if not env_yml: for p in repo_dir.rglob("environment.yml"): if p.stat().st_size > 10: env_yml = p break env_setup_ok = False if env_yml: print(f" Using environment.yml: {env_yml.relative_to(repo_dir)}") rc, out, err = run( ["conda", "env", "create", "-f", str(env_yml), "-n", env_name, "--quiet"], timeout=600 ) env_setup_ok = (rc == 0) else: print(f" No environment.yml found, creating python=3.10 env") rc, out, err = run( ["conda", "create", "-n", env_name, "python=3.10", "pip", "-y", "--quiet"], timeout=300 ) env_setup_ok = (rc == 0) if not env_setup_ok: result["stages"]["environment"] = {"success": False, "error": err[:300], "duration": time.time() - t0} result["error"] = f"env setup failed: {err[:300]}" return result # Checkout base_commit and install if base_commit: run(["git", "fetch", "origin", base_commit], cwd=str(repo_dir), timeout=60) run(["git", "checkout", base_commit], cwd=str(repo_dir), timeout=30) run(["conda", "run", "-n", env_name, "pip", "install", "-e", ".", "--quiet"], cwd=str(repo_dir), timeout=300) result["stages"]["environment"] = {"success": True, "duration": time.time() - t0} print(f" Environment ready in {time.time() - t0:.1f}s") # STAGE 3: Run cascade agent print("\n--- Stage 3: Run cascade agent ---") t0 = time.time() agent_result = run_cascade_agent(instance, repo_dir, max_turns=30) result["stages"]["agent"] = { "success": agent_result["patch"] is not None, "tier": agent_result["tier_used"], "turns": agent_result["total_turns"], "input_tokens": agent_result["total_input_tokens"], "output_tokens": agent_result["total_output_tokens"], "duration": time.time() - t0, } if not agent_result["patch"]: result["error"] = "No patch produced by any tier" return result # STAGE 4: Verify patch print("\n--- Stage 4: Verify patch ---") t0 = time.time() verify_result = verify_cascade_patch(instance, agent_result["patch"], repo_dir, env_name) result["stages"]["verify"] = { "resolved": verify_result["resolved"], "all_f2p_pass": verify_result.get("all_f2p_pass", False), "all_p2p_pass": verify_result.get("all_p2p_pass", False), "error": verify_result.get("error"), "duration": time.time() - t0, } result["final_resolved"] = verify_result["resolved"] result["tier_used"] = agent_result["tier_used"] if verify_result.get("error"): result["error"] = verify_result["error"] # Cleanup print(f"\n Cleaning up conda env {env_name}...") run(["conda", "env", "remove", "-n", env_name, "-y", "--quiet"], timeout=30) return result def main(): parser = argparse.ArgumentParser(description="Cascade Validation Runner") parser.add_argument("--instance", type=str, help="Single instance ID") parser.add_argument("--batch", type=int, default=10, help="Number of instances to validate") parser.add_argument("--target", choices=["cascade-only", "frontier-only", "all"], default="cascade-only") parser.add_argument("--output", type=str, default="validation_results.jsonl") args = parser.parse_args() from datasets import load_dataset print("Loading SWE-bench_Verified...") ds = load_dataset("princeton-nlp/SWE-bench_Verified", split="test") # Select instances if args.instance: instances = [dict(row) for row in ds if row["instance_id"] == args.instance] if not instances: print(f"Instance {args.instance} not found") sys.exit(1) elif args.target == "cascade-only": instances = [dict(row) for row in ds if row["instance_id"] in CASCADE_ONLY_INSTANCES] instances = instances[:args.batch] elif args.target == "frontier-only": instances = [dict(row) for row in ds if row["instance_id"] in FRONTIER_ONLY_INSTANCES] instances = instances[:args.batch] else: instances = [dict(row) for row in ds][:args.batch] print(f"Selected {len(instances)} instances for validation\n") results = [] for i, instance in enumerate(instances): print(f"\n{'#'*70}") print(f" [{i+1}/{len(instances)}] {instance['instance_id']}") print(f"{'#'*70}") try: result = validate_one(instance) results.append(result) # Print summary status = "✅ RESOLVED" if result["final_resolved"] else "❌ FAILED" print(f"\n {status} | Tier: {result['tier_used']} | error: {result.get('error', 'none')}") except Exception as e: print(f"\n ❌ CRASH: {e}") traceback.print_exc() results.append({ "instance_id": instance["instance_id"], "final_resolved": False, "error": str(e), }) # Save incrementally with open(args.output, "w") as f: for r in results: f.write(json.dumps(r) + "\n") print(f"\n ← Saved to {args.output} ({len(results)} results so far)") # Final report resolved = [r for r in results if r["final_resolved"]] t1_resolved = [r for r in resolved if r.get("tier_used") == "T1"] t2_resolved = [r for r in resolved if r.get("tier_used") == "T2"] print(f"\n{'='*70}") print(f"VALIDATION COMPLETE") print(f"{'='*70}") print(f" Total: {len(results)}") print(f" Resolved: {len(resolved)} ({len(resolved)/max(len(results),1)*100:.1f}%)") print(f" T1 (Llama-3.1-8B): {len(t1_resolved)}") print(f" T2 (Llama-3.3-70B): {len(t2_resolved)}") print(f" Failed: {len(results) - len(resolved)}") print(f" Results: {args.output}") if __name__ == "__main__": main()