| """ |
| Quick single-instance cascade validation. |
| |
| Runs the cascade agent + verification on ONE instance. |
| This is the minimal proof that the cascade works end-to-end. |
| |
| Usage: |
| python quick_validate.py |
| python quick_validate.py --instance django__django-11815 |
| """ |
|
|
| import json |
| import os |
| import re |
| import subprocess |
| import sys |
| import tempfile |
| import time |
| import traceback |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Optional, Tuple |
|
|
| from huggingface_hub import InferenceClient |
|
|
|
|
| |
| |
| |
| DEFAULT_INSTANCE = "django__django-14315" |
|
|
| |
| T1_MODEL = "meta-llama/Llama-3.1-8B-Instruct" |
| T2_MODEL = "meta-llama/Llama-3.3-70B-Instruct" |
|
|
|
|
| def run(cmd, cwd=None, timeout=120): |
| result = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, timeout=timeout, shell=True) |
| return result.returncode, result.stdout, result.stderr |
|
|
|
|
| def call_model(client, messages, max_tokens=4096): |
| """Call HF inference, return (text, input_tokens, output_tokens).""" |
| try: |
| completion = client.chat.completions.create( |
| model=client.model, |
| messages=messages, |
| max_tokens=max_tokens, |
| temperature=0.2, |
| ) |
| text = completion.choices[0].message.content |
| itok = completion.usage.prompt_tokens if hasattr(completion, 'usage') and completion.usage else 0 |
| otok = completion.usage.completion_tokens if hasattr(completion, 'usage') and completion.usage else 0 |
| return text, itok, otok |
| except Exception as e: |
| return f"[ERROR: {e}]", 0, 0 |
|
|
|
|
| def extract_patch(text): |
| """Extract a diff/patch from model output.""" |
| for tag in ['patch', 'diff']: |
| m = re.search(rf'<{tag}>(.*?)</{tag}>', text, re.DOTALL) |
| if m: |
| return m.group(1).strip() |
| for block in ['diff', 'patch']: |
| m = re.search(rf'```{block}\s*\n(.*?)```', text, re.DOTALL) |
| if m: |
| return m.group(1).strip() |
| diff_match = re.search(r'(diff --git a/.*?(?:\n(?:@@|\+\+\+|diff --git|```|</).*)*)', text, re.DOTALL) |
| if diff_match: |
| return diff_match.group(1).strip() |
| return None |
|
|
|
|
| def run_cascade(instance, repo_dir): |
| """Run T1 then T2. Returns {patch, tier, turns, tokens}.""" |
| |
| problem = instance.get("problem_statement", "") |
| |
| system = f"""You are fixing a bug in {instance['repo']}. Repository is at {repo_dir}. |
| |
| Output format: |
| - Bash commands: <bash>command</bash> |
| - Final patch: <patch>your diff here</patch> |
| - Done: <submit>Done</submit> |
| |
| First explore the codebase to understand the issue, then make a minimal fix and verify it.""" |
|
|
| messages = [ |
| {"role": "system", "content": system}, |
| {"role": "user", "content": f"PROBLEM:\n{problem}\n\nStart by exploring the repository."} |
| ] |
| |
| tiers = [ |
| ("T1", T1_MODEL, 30), |
| ("T2", T2_MODEL, 30), |
| ] |
| |
| for tier_name, model_id, max_turns in tiers: |
| print(f"\n[{tier_name}] Running {model_id}...") |
| client = InferenceClient(model_id) |
| tier_turns = 0 |
| tier_itok = 0 |
| tier_otok = 0 |
| |
| for turn in range(max_turns): |
| text, itok, otok = call_model(client, messages, max_tokens=4096) |
| tier_turns += 1 |
| tier_itok += itok |
| tier_otok += otok |
| messages.append({"role": "assistant", "content": text}) |
| |
| |
| patch = extract_patch(text) |
| if patch: |
| print(f" ✅ Patch found ({len(patch)} chars) at turn {turn+1}") |
| return {"patch": patch, "tier": tier_name, "turns": tier_turns, "input_tokens": tier_itok, "output_tokens": tier_otok} |
| |
| |
| cmds = re.findall(r'<bash>(.*?)</bash>', text, re.DOTALL) |
| for cmd in cmds: |
| cmd = cmd.strip() |
| print(f" $ {cmd[:80]}...") |
| rc, stdout, stderr = run(cmd, cwd=str(repo_dir), timeout=30) |
| output = (stdout + stderr)[:1500] |
| if rc != 0: |
| output += f" [EXIT:{rc}]" |
| messages.append({"role": "user", "content": f"<output>\n{output}\n</output>"}) |
| |
| if "<submit>" in text: |
| break |
| |
| return {"patch": None, "tier": None, "turns": 0, "input_tokens": 0, "output_tokens": 0} |
|
|
|
|
| def verify_patch(instance, model_patch, repo_dir, env_name=None): |
| """Apply patch + test_patch, run FAIL_TO_PASS tests.""" |
| base_commit = instance.get("base_commit", "") |
| test_patch = instance.get("test_patch", "") |
| f2p = instance.get("FAIL_TO_PASS", []) |
| |
| if not base_commit or not test_patch or not f2p: |
| return {"resolved": False, "error": "missing base_commit/test_patch/FAIL_TO_PASS"} |
| |
| |
| run(f"git checkout -f {base_commit}", cwd=str(repo_dir)) |
| |
| |
| patch_file = repo_dir / "_aco.patch" |
| patch_file.write_text(model_patch) |
| rc, out, err = run(f"git apply --check {patch_file}", cwd=str(repo_dir)) |
| if rc != 0: |
| return {"resolved": False, "error": f"patch --check: {err[:200]}"} |
| rc, out, err = run(f"git apply {patch_file}", cwd=str(repo_dir)) |
| if rc != 0: |
| rc, out, err = run(f"git apply --reject {patch_file}", cwd=str(repo_dir)) |
| if rc != 0: |
| return {"resolved": False, "error": f"patch apply: {err[:200]}"} |
| |
| |
| test_file = repo_dir / "_aco_test.patch" |
| test_file.write_text(test_patch) |
| rc, out, err = run(f"git apply --check {test_file}", cwd=str(repo_dir)) |
| if rc == 0: |
| run(f"git apply {test_file}", cwd=str(repo_dir)) |
| |
| |
| cmd_prefix = f"conda run -n {env_name} " if env_name else "" |
| cmd = f"{cmd_prefix}python -m pytest -v --tb=short -x {' '.join(f2p)}" |
| print(f" Running: pytest {' '.join(f2p[:2])}...") |
| rc, out, err = run(cmd, cwd=str(repo_dir), timeout=300) |
| |
| if rc == 0: |
| |
| p2p = instance.get("PASS_TO_PASS", []) |
| if p2p: |
| cmd2 = f"{cmd_prefix}python -m pytest -v --tb=short -x {' '.join(p2p[:15])}" |
| rc2, out2, err2 = run(cmd2, cwd=str(repo_dir), timeout=300) |
| if rc2 == 0: |
| return {"resolved": True, "regressions": False} |
| else: |
| return {"resolved": False, "error": f"regression: {(out2+err2)[:200]}", "regressions": True} |
| return {"resolved": True, "regressions": False} |
| |
| |
| failures = [l.strip() for l in (out+err).split('\n') if 'FAILED' in l] |
| return {"resolved": False, "error": f"{len(failures)} F2P failures", "failures": failures[:5]} |
|
|
|
|
| def main(): |
| from datasets import load_dataset |
| |
| instance_id = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_INSTANCE |
| print(f"Validating: {instance_id}") |
| |
| ds = load_dataset("princeton-nlp/SWE-bench_Verified", split="test") |
| instance = None |
| for row in ds: |
| if row["instance_id"] == instance_id: |
| instance = dict(row) |
| break |
| |
| if not instance: |
| print(f"Instance {instance_id} not found!") |
| sys.exit(1) |
| |
| print(f" Repo: {instance['repo']}") |
| print(f" Base: {instance['base_commit'][:12]}") |
| print(f" F2P: {len(instance.get('FAIL_TO_PASS', []))} tests") |
| |
| with tempfile.TemporaryDirectory(prefix=f"aco_quick_") as tmpdir: |
| repo_dir = Path(tmpdir) / "repo" |
| env_name = f"aco_q_{instance_id.replace('__','_').replace('-','_')[:30]}" |
| |
| |
| repo = instance["repo"] |
| url = f"https://github.com/{repo}.git" |
| print(f"\n[CLONE] {url}") |
| rc, out, err = run(f"git clone --depth 50 {url} {repo_dir}", timeout=180) |
| if rc != 0: |
| rc, out, err = run(f"git clone {url} {repo_dir}", timeout=600) |
| if rc != 0: |
| print(f"CLONE FAILED: {err[:300]}") |
| sys.exit(1) |
| |
| |
| env_commit = instance.get("environment_setup_commit", "") |
| if env_commit: |
| run(f"cd {repo_dir} && git fetch origin {env_commit}", timeout=60) |
| run(f"cd {repo_dir} && git checkout {env_commit}", timeout=30) |
| |
| env_yml = None |
| for c in ["environment.yml", "dev/environment.yml", ".github/environment.yml"]: |
| if (repo_dir / c).exists(): |
| env_yml = c |
| break |
| |
| print(f"\n[ENV] Creating conda env '{env_name}'...") |
| if env_yml: |
| rc, out, err = run(f"cd {repo_dir} && conda env create -f {env_yml} -n {env_name} --quiet", timeout=600) |
| else: |
| rc, out, err = run(f"conda create -n {env_name} python=3.10 pip -y --quiet", timeout=300) |
| |
| if rc != 0: |
| print(f"ENV SETUP FAILED: {err[:300]}") |
| sys.exit(1) |
| |
| |
| base_commit = instance["base_commit"] |
| run(f"cd {repo_dir} && git fetch origin {base_commit}", timeout=60) |
| run(f"cd {repo_dir} && git checkout {base_commit}", timeout=30) |
| rc, out, err = run(f"cd {repo_dir} && conda run -n {env_name} pip install -e . --quiet", timeout=300) |
| if rc != 0: |
| print(f"PIP INSTALL FAILED (continuing): {err[:200]}") |
| |
| print(f"\n[CASCADE] Running agent...") |
| t0 = time.time() |
| agent_result = run_cascade(instance, repo_dir) |
| agent_time = time.time() - t0 |
| |
| print(f" Patch: {'FOUND' if agent_result['patch'] else 'NOT FOUND'}") |
| print(f" Tier: {agent_result['tier']}") |
| print(f" Time: {agent_time:.1f}s") |
| |
| if not agent_result["patch"]: |
| print("FAILED: No patch produced") |
| sys.exit(1) |
| |
| print(f"\n[VERIFY] Testing patch...") |
| verify_result = verify_patch(instance, agent_result["patch"], repo_dir, env_name) |
| |
| print(f"\n{'='*60}") |
| print(f"RESULT: {'✅ RESOLVED' if verify_result['resolved'] else '❌ NOT RESOLVED'}") |
| print(f"{'='*60}") |
| print(f" Instance: {instance_id}") |
| print(f" Tier: {agent_result['tier']}") |
| print(f" Turns: {agent_result['turns']}") |
| print(f" Tokens: {agent_result['input_tokens']} in / {agent_result['output_tokens']} out") |
| print(f" Agent time: {agent_time:.1f}s") |
| if not verify_result["resolved"]: |
| print(f" Error: {verify_result.get('error', 'unknown')}") |
| |
| |
| final = { |
| "instance_id": instance_id, |
| "repo": instance["repo"], |
| "timestamp": datetime.now().isoformat(), |
| "resolved": verify_result["resolved"], |
| "tier": agent_result["tier"], |
| "turns": agent_result["turns"], |
| "input_tokens": agent_result["input_tokens"], |
| "output_tokens": agent_result["output_tokens"], |
| "agent_time_seconds": agent_time, |
| "error": verify_result.get("error"), |
| } |
| |
| result_path = f"quick_validate_{instance_id}.json" |
| with open(result_path, "w") as f: |
| json.dump(final, f, indent=2) |
| print(f"\n Saved: {result_path}") |
| |
| |
| run(f"conda env remove -n {env_name} -y --quiet", timeout=30) |
| |
| return 0 if verify_result["resolved"] else 1 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|