Spaces:
Sleeping
Sleeping
File size: 13,387 Bytes
dfbd16e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 | # server/failure_classifier.py
"""
Typed Failure Classification Engine.
Classifies agent failures into precise, actionable categories rather than
vague scores. Each failure type has a root cause, evidence, and remediation.
Failure taxonomy:
WRONG_FILE_NAVIGATION β agent read irrelevant files, missed key files
BLIND_WRITE β agent wrote code without reading first
HALLUCINATED_CODE β agent wrote syntactically/logically wrong code
NEVER_TESTED β agent submitted without running any tests
LOOPING_BEHAVIOR β agent repeated same action 3+ times
CONTEXT_OVERFLOW β agent read enormous amounts of irrelevant data
SECURITY_VIOLATION β agent wrote dangerous code
CORRECT β no failure detected
"""
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
@dataclass
class FailureInstance:
"""One classified failure event."""
failure_type: str # e.g. "WRONG_FILE_NAVIGATION"
severity: str # "critical" | "major" | "minor"
step_number: int # Which step triggered it
evidence: str # Specific observation
root_cause: str # Why this happens
remediation: str # How to fix in next run
@dataclass
class FailureReport:
"""Full failure analysis for one episode."""
episode_id: str
task: str
primary_failure: str # Most severe failure type
failures: List[FailureInstance] = field(default_factory=list)
success: bool = False
failure_summary: str = ""
retry_hint: str = "" # Actionable hint for the next attempt
def to_dict(self) -> dict:
return {
"episode_id": self.episode_id,
"task": self.task,
"success": self.success,
"primary_failure": self.primary_failure,
"failure_count": len(self.failures),
"failures": [
{
"type": f.failure_type,
"severity": f.severity,
"step": f.step_number,
"evidence": f.evidence,
"root_cause": f.root_cause,
"remediation": f.remediation,
}
for f in self.failures
],
"failure_summary": self.failure_summary,
"retry_hint": self.retry_hint,
}
# ββ Severity ordering for picking primary failure βββββββββββββββββββββββββββββ
SEVERITY_RANK = {"critical": 3, "major": 2, "minor": 1}
FAILURE_REMEDIATION = {
"WRONG_FILE_NAVIGATION": (
"Read the failing test file first to understand the module under test, "
"then navigate directly to the imported source files."
),
"BLIND_WRITE": (
"Always read the target file before writing. Use read_file β write_file β run_tests."
),
"HALLUCINATED_CODE": (
"Re-read the source file, understand the function signature, "
"then write a minimal targeted fix. Run tests to verify."
),
"NEVER_TESTED": (
"Always call run_tests after writing a fix. "
"Submit only when test pass rate has demonstrably improved."
),
"LOOPING_BEHAVIOR": (
"Stop repeating the same action. Use search_code to find the bug location, "
"then navigate directly to it."
),
"CONTEXT_OVERFLOW": (
"Focus on files explicitly referenced in the failing test's imports. "
"Avoid reading utility files unless the test error specifically mentions them."
),
"SECURITY_VIOLATION": (
"Do not use os.system, eval, exec, or subprocess in fixes. "
"Write pure Python logic without shell calls."
),
"CORRECT": "No remediation needed.",
}
class FailureClassifier:
"""
Classifies agent failures from trajectory data.
Usage:
clf = FailureClassifier()
report = clf.classify(
episode_id="abc123",
task="task1",
trajectory_steps=[...],
variant_meta={...},
files_read=[...],
files_written=[...],
final_score=0.0,
)
"""
def classify(
self,
episode_id: str,
task: str,
trajectory_steps: List[dict],
variant_meta: Dict[str, Any],
files_read: List[str],
files_written: List[str],
final_score: float,
security_violations: int = 0,
) -> FailureReport:
"""Run all classifiers and build a structured failure report."""
failures: List[FailureInstance] = []
success = final_score >= 0.5
if success and security_violations == 0:
return FailureReport(
episode_id=episode_id,
task=task,
primary_failure="CORRECT",
failures=[],
success=True,
failure_summary="Agent succeeded without errors.",
retry_hint="",
)
action_sequence = [s.get("action_type", "") for s in trajectory_steps]
# ββ Classifier 1: Wrong File Navigation βββββββββββββββββββββββββββββββ
relevant = set(
variant_meta.get("bug_files", []) +
variant_meta.get("interface_files", []) +
variant_meta.get("read_first_files", []) +
variant_meta.get("files_to_implement", [])
)
if relevant and files_read:
irrelevant_reads = [f for f in files_read if f not in relevant
and not f.startswith("tests/")]
if len(irrelevant_reads) > 1 and not any(f in files_read for f in relevant):
failures.append(FailureInstance(
failure_type="WRONG_FILE_NAVIGATION",
severity="critical",
step_number=1,
evidence=f"Read {len(irrelevant_reads)} irrelevant files: {irrelevant_reads[:3]}. "
f"Never read key files: {list(relevant)[:3]}",
root_cause="Agent navigated to wrong part of the codebase entirely.",
remediation=FAILURE_REMEDIATION["WRONG_FILE_NAVIGATION"],
))
# ββ Classifier 2: Blind Write βββββββββββββββββββββββββββββββββββββββββ
write_indices = [i for i, a in enumerate(action_sequence) if a == "write_file"]
for wi in write_indices:
reads_before = [a for a in action_sequence[:wi] if a == "read_file"]
if not reads_before:
step = trajectory_steps[wi]
failures.append(FailureInstance(
failure_type="BLIND_WRITE",
severity="critical",
step_number=wi + 1,
evidence=f"write_file at step {wi+1} with zero prior read_file actions.",
root_cause="Agent attempted to fix code without reading it first β likely hallucinating.",
remediation=FAILURE_REMEDIATION["BLIND_WRITE"],
))
# ββ Classifier 3: Hallucinated Code βββββββββββββββββββββββββββββββββββ
# Detect write followed by immediate test failure
for i, step in enumerate(trajectory_steps):
if step.get("action_type") == "run_tests":
prev_write = None
for j in range(i - 1, -1, -1):
if trajectory_steps[j].get("action_type") == "write_file":
prev_write = j
break
if prev_write is not None:
pass_rate = step.get("test_pass_rate", None)
if pass_rate is not None and pass_rate < 0.3:
failures.append(FailureInstance(
failure_type="HALLUCINATED_CODE",
severity="major",
step_number=i + 1,
evidence=f"Test pass rate {pass_rate:.2f} after write at step {prev_write+1}. "
f"Code change made things worse.",
root_cause="Agent wrote syntactically correct but semantically wrong code.",
remediation=FAILURE_REMEDIATION["HALLUCINATED_CODE"],
))
# ββ Classifier 4: Never Tested ββββββββββββββββββββββββββββββββββββββββ
has_tests = "run_tests" in action_sequence
has_writes = "write_file" in action_sequence
has_submit = "submit" in action_sequence
if has_submit and has_writes and not has_tests:
failures.append(FailureInstance(
failure_type="NEVER_TESTED",
severity="major",
step_number=len(action_sequence),
evidence="Agent wrote code changes but submitted without running any tests.",
root_cause="No feedback loop β agent cannot know if its fix worked.",
remediation=FAILURE_REMEDIATION["NEVER_TESTED"],
))
# ββ Classifier 5: Looping Behavior ββββββββββββββββββββββββββββββββββββ
read_paths = [
(i, s.get("action_path"))
for i, s in enumerate(trajectory_steps)
if s.get("action_type") == "read_file" and s.get("action_path")
]
path_counts: Dict[str, List[int]] = {}
for idx, path in read_paths:
path_counts.setdefault(path, []).append(idx)
for path, indices in path_counts.items():
if len(indices) >= 3:
failures.append(FailureInstance(
failure_type="LOOPING_BEHAVIOR",
severity="major",
step_number=indices[2] + 1,
evidence=f"Read '{path}' {len(indices)} times (steps {[i+1 for i in indices]}). "
f"Agent is stuck in a read loop.",
root_cause="Agent cannot extract the needed information and keeps retrying.",
remediation=FAILURE_REMEDIATION["LOOPING_BEHAVIOR"],
))
# ββ Classifier 6: Context Overflow ββββββββββββββββββββββββββββββββββββ
total_content = sum(
s.get("action_content_length") or 0
for s in trajectory_steps
if s.get("action_type") == "read_file"
)
if total_content > 50_000 and final_score < 0.5:
failures.append(FailureInstance(
failure_type="CONTEXT_OVERFLOW",
severity="minor",
step_number=len(trajectory_steps),
evidence=f"Agent read {total_content:,} chars total. "
f"Most of this was likely irrelevant context.",
root_cause="Agent wasted token budget reading unnecessary files.",
remediation=FAILURE_REMEDIATION["CONTEXT_OVERFLOW"],
))
# ββ Classifier 7: Security Violation βββββββββββββββββββββββββββββββββ
if security_violations > 0:
sec_steps = [
s for s in trajectory_steps if s.get("security_flags")
]
for ss in sec_steps:
failures.append(FailureInstance(
failure_type="SECURITY_VIOLATION",
severity="critical",
step_number=ss.get("step_number", 0),
evidence=f"Flags: {ss.get('security_flags', [])}",
root_cause="Agent wrote unsafe code patterns that would be dangerous in production.",
remediation=FAILURE_REMEDIATION["SECURITY_VIOLATION"],
))
# ββ Build report ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
if not failures:
# Failed but no specific classifier triggered β generic low score
primary = "HALLUCINATED_CODE"
summary = f"Score {final_score:.2f} β fix was written but insufficient. Re-read the source files more carefully."
hint = "Read test file β read all src files β write targeted fix β run tests β submit."
else:
# Pick most severe failure as primary
failures.sort(key=lambda f: SEVERITY_RANK.get(f.severity, 0), reverse=True)
primary = failures[0].failure_type
summary = "; ".join(f"{f.failure_type} (step {f.step_number})" for f in failures[:3])
hint = failures[0].remediation
return FailureReport(
episode_id=episode_id,
task=task,
primary_failure=primary,
failures=failures,
success=success,
failure_summary=summary,
retry_hint=hint,
)
|