agent-cost-optimizer / validate_cascade.py
narcolepticchicken's picture
Upload validate_cascade.py
366b543 verified
"""
Cascade Validation Runner — proves the static cascade on SWE-bench.
This script runs the cascade agent on selected SWE-bench instances and
verifies the patches by applying test_patch and running FAIL_TO_PASS tests.
Strategy:
1. Clone repo, set up conda environment from environment_setup_commit
2. Run cascade agent (T1 Llama-3.1-8B → T2 Llama-3.3-70B)
3. Apply model patch + test_patch
4. Run FAIL_TO_PASS tests via pytest
5. Record: resolved, cost, model tier used, token counts
The critical question this answers:
"Do the 10 cascade-only (T1/T2 solves where both T4 models fail)
instances produce valid patches, or are they weak-test passes?"
Requirements:
- conda (mamba preferred for speed)
- git
- HF_TOKEN (free inference via huggingface_hub)
- No Docker needed
Usage:
python validate_cascade.py --instance django__django-12308
python validate_cascade.py --batch 10 --target cascade-only
python validate_cascade.py --batch 50 --target all
"""
import argparse
import json
import os
import re
import subprocess
import sys
import tempfile
import time
import traceback
from datetime import datetime
from pathlib import Path
from typing import Optional, Tuple, Dict, List
# ============================================================
# SWE-BENCH CONSTANTS
# ============================================================
# The 10 cascade-only instances (T1 or T2 solves, both T4 models fail)
# Extracted from the trace simulation in CORRECTED_REPORT.md
CASCADE_ONLY_INSTANCES = [
"astropy__astropy-14365",
"astropy__astropy-14995",
"django__django-11815",
"django__django-13089",
"django__django-13807",
"django__django-14315",
"matplotlib__matplotlib-25224",
"matplotlib__matplotlib-25311",
"sympy__sympy-19487",
"sympy__sympy-20590",
]
# The full set includes 14 frontier-retry-only instances for comparison
FRONTIER_ONLY_INSTANCES = [
"django__django-12453",
"django__django-14030",
"django__django-14349",
"django__django-14855",
"django__django-15098",
"django__django-16235",
"matplotlib__matplotlib-26020",
"psf__requests-6028",
"pylint-dev__pylint-7080",
"scikit-learn__scikit-learn-13439",
"scikit-learn__scikit-learn-14087",
"sphinx-doc__sphinx-10323",
"sphinx-doc__sphinx-10466",
"sphinx-doc__sphinx-10614",
]
REPO_URLS = {
"django/django": "https://github.com/django/django.git",
"pytest-dev/pytest": "https://github.com/pytest-dev/pytest.git",
"scikit-learn/scikit-learn": "https://github.com/scikit-learn/scikit-learn.git",
"sympy/sympy": "https://github.com/sympy/sympy.git",
"matplotlib/matplotlib": "https://github.com/matplotlib/matplotlib.git",
"sphinx-doc/sphinx": "https://github.com/sphinx-doc/sphinx.git",
"astropy/astropy": "https://github.com/astropy/astropy.git",
"psf/requests": "https://github.com/psf/requests.git",
"pylint-dev/pylint": "https://github.com/pylint-dev/pylint.git",
}
def run(cmd: list, cwd: str = None, timeout: int = 180, env: dict = None) -> Tuple[int, str, str]:
"""Run a command, return (returncode, stdout, stderr)."""
try:
result = subprocess.run(
cmd, cwd=cwd, capture_output=True, text=True,
timeout=timeout, env=env or os.environ
)
return result.returncode, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return 124, "", "TIMEOUT"
except Exception as e:
return -1, "", str(e)
# ============================================================
# CASCADE AGENT (using free HF Inference API)
# ============================================================
def call_hf_model(
model_id: str,
messages: list,
max_tokens: int = 4096,
temperature: float = 0.2
) -> Tuple[str, int, int]:
"""
Call a model via HF Inference API (free).
Returns (response_text, input_tokens, output_tokens).
"""
from huggingface_hub import InferenceClient
client = InferenceClient(model_id)
completion = client.chat.completions.create(
model=model_id,
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
)
response = completion.choices[0].message.content
input_tokens = getattr(completion.usage, "prompt_tokens", 0) if hasattr(completion, "usage") and completion.usage else len(messages) * 100
output_tokens = getattr(completion.usage, "completion_tokens", 0) if hasattr(completion, "usage") and completion.usage else len(response) // 4
return response, input_tokens, output_tokens
def build_cascade_messages(
instance: dict,
repo_dir: Path,
previous_failure: Optional[str] = None,
) -> list:
"""Build the message list for the cascade agent."""
problem = instance.get("problem_statement", "")
hint = instance.get("hints_text", "")
system_prompt = """You are a software engineer fixing a bug in an open-source project.
Your task is to produce a correct patch that fixes the issue described.
You have access to a bash shell in the repository directory. Use it to:
- Explore the codebase (ls, find, grep, git log)
- Read files (cat, head)
- Run existing tests (pytest)
- Edit files with sed or write tools
Output format:
- For bash commands: <bash>command here</bash>
- For your final patch: <patch>diff here</patch>
- When done and tests pass: <submit>Done</submit>
Be thorough. Read the relevant code, understand the bug, make a minimal fix,
and verify it passes the tests."""
# Build the task description
task = f"""Repository: {instance['repo']}
Base commit: {instance['base_commit'][:12]}
PROBLEM:
{problem}"""
if hint:
task += f"\n\nHINT: {hint}"
task += f"""
The repository is at {repo_dir}. Your bash commands will run from that directory.
Start by exploring the codebase to understand the issue, then implement and test your fix."""
if previous_failure:
system_prompt += f"\n\nYour previous attempt failed with the following issues:\n{previous_failure}\nPlease fix your approach."
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": task},
]
def extract_patch(response: str) -> Optional[str]:
"""Extract a git patch from the agent's response."""
# Try <patch> tags
m = re.search(r'<patch>(.*?)</patch>', response, re.DOTALL)
if m:
return m.group(1).strip()
# Try ```diff blocks
m = re.search(r'```diff\s*\n(.*?)```', response, re.DOTALL)
if m:
return m.group(1).strip()
# Try ```patch blocks
m = re.search(r'```patch\s*\n(.*?)```', response, re.DOTALL)
if m:
return m.group(1).strip()
# Try to find diff content directly
diff_match = re.search(r'(diff --git.*?)(?:\n```|\n<patch>|\n<submit>|\Z)', response, re.DOTALL)
if diff_match:
return diff_match.group(1).strip()
return None
def run_cascade_agent(
instance: dict,
repo_dir: Path,
max_turns: int = 30,
) -> dict:
"""
Run the cascade agent on one instance.
Tiers:
T1: meta-llama/Llama-3.1-8B-Instruct (free, fast, weak)
T2: meta-llama/Llama-3.3-70B-Instruct (free, moderate)
Returns: {patch, resolved, tier, tokens, turns, cost}
"""
TIERS = [
{"name": "T1", "model": "meta-llama/Llama-3.1-8B-Instruct", "max_turns": max_turns, "cost_per_1k": 0.0},
{"name": "T2", "model": "meta-llama/Llama-3.3-70B-Instruct", "max_turns": max_turns, "cost_per_1k": 0.0},
]
result = {
"instance_id": instance["instance_id"],
"patch": None,
"resolved": False,
"tier_used": None,
"total_turns": 0,
"total_input_tokens": 0,
"total_output_tokens": 0,
"error": None,
}
previous_failures = []
for tier in TIERS:
print(f"\n [{tier['name']}] {tier['model']} (max {tier['max_turns']} turns)")
messages = build_cascade_messages(
instance, repo_dir,
previous_failure="\n".join(previous_failures) if previous_failures else None
)
tier_input_tokens = 0
tier_output_tokens = 0
for turn in range(tier['max_turns']):
try:
response, in_tok, out_tok = call_hf_model(
tier['model'], messages,
max_tokens=4096,
temperature=0.2
)
tier_input_tokens += in_tok
tier_output_tokens += out_tok
# Add response to conversation
messages.append({"role": "assistant", "content": response})
print(f" Turn {turn+1}: {in_tok}+{out_tok} tokens")
# Check for patch submission
patch = extract_patch(response)
if patch:
print(f" → Patch found! ({len(patch)} chars)")
result["patch"] = patch
result["tier_used"] = tier["name"]
result["total_turns"] = turn + 1
result["total_input_tokens"] = tier_input_tokens
result["total_output_tokens"] = tier_output_tokens
return result
# Check for bash commands
bash_commands = re.findall(r'<bash>(.*?)</bash>', response, re.DOTALL)
for cmd in bash_commands:
cmd = cmd.strip()
print(f" $ {cmd[:100]}...")
rc, stdout, stderr = run(
["bash", "-c", cmd],
cwd=str(repo_dir),
timeout=60
)
output = stdout.strip()[:2000]
if stderr.strip():
output += f"\n[stderr: {stderr.strip()[:500]}]"
if rc != 0:
output += f"\n[EXIT CODE: {rc}]"
messages.append({"role": "user", "content": f"<output>\n{output}\n</output>"})
# Check for submit
if "<submit>" in response:
print(f" → Submitted but no patch found")
break
except Exception as e:
print(f" Error: {e}")
previous_failures.append(f"[{tier['name']}] turn {turn+1} error: {str(e)[:200]}")
break
previous_failures.append(f"[{tier['name']}] failed after {tier['max_turns']} turns — no patch produced")
result["error"] = "All tiers exhausted without producing a patch"
return result
# ============================================================
# PATCH VERIFICATION
# ============================================================
def verify_cascade_patch(
instance: dict,
model_patch: str,
repo_dir: Path,
env_name: str = None,
) -> dict:
"""
Verify that a model-generated patch passes SWE-bench tests.
1. Reset repo to base_commit
2. Apply model patch
3. Apply test_patch
4. Run FAIL_TO_PASS tests
5. Run PASS_TO_PASS tests (regression check)
"""
result = {
"resolved": False,
"all_f2p_pass": False,
"all_p2p_pass": False,
"f2p_failures": [],
"p2p_failures": [],
"error": None,
}
try:
# Reset to base_commit
base_commit = instance.get("base_commit", "")
if base_commit:
rc, _, _ = run(["git", "checkout", "-f", base_commit], cwd=str(repo_dir))
if rc != 0:
result["error"] = f"could not checkout base_commit {base_commit[:12]}"
return result
# Apply model patch
patch_file = repo_dir / "aco_model.patch"
patch_file.write_text(model_patch)
rc, out, err = run(
["git", "apply", "--check", str(patch_file)],
cwd=str(repo_dir)
)
if rc != 0:
result["error"] = f"patch --check failed: {err[:300]}"
return result
rc, out, err = run(
["git", "apply", str(patch_file)],
cwd=str(repo_dir)
)
if rc != 0:
# Try with --reject
rc, out, err = run(
["git", "apply", "--reject", str(patch_file)],
cwd=str(repo_dir)
)
if rc != 0:
result["error"] = f"patch apply failed: {err[:300]}"
return result
# Apply test_patch
test_patch = instance.get("test_patch", "")
if not test_patch:
result["error"] = "no test_patch in instance"
return result
test_file = repo_dir / "aco_test.patch"
test_file.write_text(test_patch)
rc, out, err = run(
["git", "apply", "--check", str(test_file)],
cwd=str(repo_dir)
)
if rc == 0:
rc, out, err = run(
["git", "apply", str(test_file)],
cwd=str(repo_dir)
)
if rc != 0:
rc, out, err = run(
["git", "apply", "--reject", str(test_file)],
cwd=str(repo_dir)
)
# Run FAIL_TO_PASS tests
f2p = instance.get("FAIL_TO_PASS", [])
if not f2p:
result["error"] = "no FAIL_TO_PASS tests"
return result
print(f" Running {len(f2p)} FAIL_TO_PASS tests...")
cmd_prefix = ["conda", "run", "-n", env_name] if env_name else []
cmd = cmd_prefix + ["python", "-m", "pytest", "-v", "--tb=short", "-x"] + f2p
rc, out, err = run(cmd, cwd=str(repo_dir), timeout=300)
if rc == 0:
result["all_f2p_pass"] = True
# Run PASS_TO_PASS regression tests
p2p = instance.get("PASS_TO_PASS", [])
if p2p:
print(f" Running {len(p2p)} PASS_TO_PASS regression tests...")
cmd2 = cmd_prefix + ["python", "-m", "pytest", "-v", "--tb=short", "-x"] + p2p[:20]
rc2, out2, err2 = run(cmd2, cwd=str(repo_dir), timeout=300)
if rc2 == 0:
result["all_p2p_pass"] = True
result["resolved"] = True
else:
result["error"] = f"P2P regression: {(out2+err2)[:300]}"
result["p2p_failures"] = [l.strip() for l in (out2+err2).split('\n') if 'FAILED' in l and '::' in l]
else:
result["resolved"] = True
else:
result["error"] = f"F2P failures: {(out+err)[:500]}"
result["f2p_failures"] = [l.strip() for l in (out+err).split('\n') if 'FAILED' in l and '::' in l]
return result
except Exception as e:
result["error"] = f"verification error: {str(e)[:300]}"
return result
# ============================================================
# MAIN VALIDATION PIPELINE
# ============================================================
def validate_one(instance: dict) -> dict:
"""
Full validation pipeline for one instance:
1. Clone repo
2. Set up conda environment
3. Run cascade agent
4. Verify patch
5. Report results
"""
inst_id = instance["instance_id"]
repo = instance.get("repo", "")
env_setup_commit = instance.get("environment_setup_commit", "")
base_commit = instance.get("base_commit", "")
result = {
"instance_id": inst_id,
"repo": repo,
"timestamp": datetime.now().isoformat(),
"stages": {},
"final_resolved": False,
"tier_used": None,
"total_cost": 0.0,
"error": None,
}
print(f"\n{'='*70}")
print(f"VALIDATING: {inst_id}")
print(f" Repo: {repo} Base: {base_commit[:12]} EnvSetup: {env_setup_commit[:12]}")
print(f"{'='*70}")
with tempfile.TemporaryDirectory(prefix=f"aco_valid_{inst_id.replace('/', '_')}_") as tmpdir:
work_dir = Path(tmpdir)
repo_dir = work_dir / "repo"
env_name = f"aco_{inst_id.replace('__', '_').replace('-', '_')[:40]}"
# STAGE 1: Clone repo
print("\n--- Stage 1: Clone repo ---")
repo_url = REPO_URLS.get(repo, f"https://github.com/{repo}.git")
t0 = time.time()
rc, out, err = run(["git", "clone", "--depth", "50", repo_url, str(repo_dir)], timeout=300)
if rc != 0:
# Retry without depth limit
rc, out, err = run(["git", "clone", repo_url, str(repo_dir)], timeout=600)
result["stages"]["clone"] = {"success": rc == 0, "duration": time.time() - t0}
if rc != 0:
result["error"] = f"clone failed: {err[:300]}"
return result
# STAGE 2: Set up environment
print("\n--- Stage 2: Set up conda environment ---")
t0 = time.time()
# Checkout env_setup_commit to find environment.yml
if env_setup_commit:
run(["git", "fetch", "origin", env_setup_commit], cwd=str(repo_dir), timeout=60)
run(["git", "checkout", env_setup_commit], cwd=str(repo_dir), timeout=30)
# Find environment.yml
env_candidates = [
"environment.yml", "dev/environment.yml", ".github/environment.yml",
"ci/environment.yml", ".azure-pipelines/environment.yml",
]
env_yml = None
for c in env_candidates:
p = repo_dir / c
if p.exists():
env_yml = p
break
if not env_yml:
for p in repo_dir.rglob("environment.yml"):
if p.stat().st_size > 10:
env_yml = p
break
env_setup_ok = False
if env_yml:
print(f" Using environment.yml: {env_yml.relative_to(repo_dir)}")
rc, out, err = run(
["conda", "env", "create", "-f", str(env_yml), "-n", env_name, "--quiet"],
timeout=600
)
env_setup_ok = (rc == 0)
else:
print(f" No environment.yml found, creating python=3.10 env")
rc, out, err = run(
["conda", "create", "-n", env_name, "python=3.10", "pip", "-y", "--quiet"],
timeout=300
)
env_setup_ok = (rc == 0)
if not env_setup_ok:
result["stages"]["environment"] = {"success": False, "error": err[:300], "duration": time.time() - t0}
result["error"] = f"env setup failed: {err[:300]}"
return result
# Checkout base_commit and install
if base_commit:
run(["git", "fetch", "origin", base_commit], cwd=str(repo_dir), timeout=60)
run(["git", "checkout", base_commit], cwd=str(repo_dir), timeout=30)
run(["conda", "run", "-n", env_name, "pip", "install", "-e", ".", "--quiet"],
cwd=str(repo_dir), timeout=300)
result["stages"]["environment"] = {"success": True, "duration": time.time() - t0}
print(f" Environment ready in {time.time() - t0:.1f}s")
# STAGE 3: Run cascade agent
print("\n--- Stage 3: Run cascade agent ---")
t0 = time.time()
agent_result = run_cascade_agent(instance, repo_dir, max_turns=30)
result["stages"]["agent"] = {
"success": agent_result["patch"] is not None,
"tier": agent_result["tier_used"],
"turns": agent_result["total_turns"],
"input_tokens": agent_result["total_input_tokens"],
"output_tokens": agent_result["total_output_tokens"],
"duration": time.time() - t0,
}
if not agent_result["patch"]:
result["error"] = "No patch produced by any tier"
return result
# STAGE 4: Verify patch
print("\n--- Stage 4: Verify patch ---")
t0 = time.time()
verify_result = verify_cascade_patch(instance, agent_result["patch"], repo_dir, env_name)
result["stages"]["verify"] = {
"resolved": verify_result["resolved"],
"all_f2p_pass": verify_result.get("all_f2p_pass", False),
"all_p2p_pass": verify_result.get("all_p2p_pass", False),
"error": verify_result.get("error"),
"duration": time.time() - t0,
}
result["final_resolved"] = verify_result["resolved"]
result["tier_used"] = agent_result["tier_used"]
if verify_result.get("error"):
result["error"] = verify_result["error"]
# Cleanup
print(f"\n Cleaning up conda env {env_name}...")
run(["conda", "env", "remove", "-n", env_name, "-y", "--quiet"], timeout=30)
return result
def main():
parser = argparse.ArgumentParser(description="Cascade Validation Runner")
parser.add_argument("--instance", type=str, help="Single instance ID")
parser.add_argument("--batch", type=int, default=10, help="Number of instances to validate")
parser.add_argument("--target", choices=["cascade-only", "frontier-only", "all"], default="cascade-only")
parser.add_argument("--output", type=str, default="validation_results.jsonl")
args = parser.parse_args()
from datasets import load_dataset
print("Loading SWE-bench_Verified...")
ds = load_dataset("princeton-nlp/SWE-bench_Verified", split="test")
# Select instances
if args.instance:
instances = [dict(row) for row in ds if row["instance_id"] == args.instance]
if not instances:
print(f"Instance {args.instance} not found")
sys.exit(1)
elif args.target == "cascade-only":
instances = [dict(row) for row in ds if row["instance_id"] in CASCADE_ONLY_INSTANCES]
instances = instances[:args.batch]
elif args.target == "frontier-only":
instances = [dict(row) for row in ds if row["instance_id"] in FRONTIER_ONLY_INSTANCES]
instances = instances[:args.batch]
else:
instances = [dict(row) for row in ds][:args.batch]
print(f"Selected {len(instances)} instances for validation\n")
results = []
for i, instance in enumerate(instances):
print(f"\n{'#'*70}")
print(f" [{i+1}/{len(instances)}] {instance['instance_id']}")
print(f"{'#'*70}")
try:
result = validate_one(instance)
results.append(result)
# Print summary
status = "✅ RESOLVED" if result["final_resolved"] else "❌ FAILED"
print(f"\n {status} | Tier: {result['tier_used']} | error: {result.get('error', 'none')}")
except Exception as e:
print(f"\n ❌ CRASH: {e}")
traceback.print_exc()
results.append({
"instance_id": instance["instance_id"],
"final_resolved": False,
"error": str(e),
})
# Save incrementally
with open(args.output, "w") as f:
for r in results:
f.write(json.dumps(r) + "\n")
print(f"\n ← Saved to {args.output} ({len(results)} results so far)")
# Final report
resolved = [r for r in results if r["final_resolved"]]
t1_resolved = [r for r in resolved if r.get("tier_used") == "T1"]
t2_resolved = [r for r in resolved if r.get("tier_used") == "T2"]
print(f"\n{'='*70}")
print(f"VALIDATION COMPLETE")
print(f"{'='*70}")
print(f" Total: {len(results)}")
print(f" Resolved: {len(resolved)} ({len(resolved)/max(len(results),1)*100:.1f}%)")
print(f" T1 (Llama-3.1-8B): {len(t1_resolved)}")
print(f" T2 (Llama-3.3-70B): {len(t2_resolved)}")
print(f" Failed: {len(results) - len(resolved)}")
print(f" Results: {args.output}")
if __name__ == "__main__":
main()