Meta-SRE / app /tools /definitions.py
Anvit25's picture
Deploy Meta-SRE OpenEnv benchmark FastAPI server
ad6248e
"""
All 10 agent tools — implemented as plain Python functions wrapped in ToolDispatcher.
Each tool returns (reward_delta: float, done: bool, output: Any).
The EpisodeManager calls ToolDispatcher.dispatch(tool, params).
"""
from __future__ import annotations
import re
from typing import TYPE_CHECKING, Any, Dict, Tuple
if TYPE_CHECKING:
from app.engine.manager import EpisodeManager
# ---------------------------------------------------------------------------
# Schema exposed to the LLM (OpenEnv tool_spec format)
# ---------------------------------------------------------------------------
TOOL_SPECS = [
{
"name": "view_file",
"description": (
"Read the contents of a file in a service codebase. "
"Use this BEFORE editing to understand the code."
),
"parameters": {
"type": "object",
"properties": {
"service": {"type": "string", "enum": ["ad_ranking", "capi_pipeline", "whatsapp_sync"]},
"filename": {"type": "string", "description": "e.g. ranker.py"},
},
"required": ["service", "filename"],
},
},
{
"name": "edit_line",
"description": (
"Replace a single line in a file. SURGICAL edits only — "
"do NOT rewrite whole functions. One line at a time."
),
"parameters": {
"type": "object",
"properties": {
"service": {"type": "string"},
"filename": {"type": "string"},
"line_number": {"type": "integer", "description": "1-based line number"},
"new_code": {"type": "string", "description": "Replacement line (preserve indentation)"},
},
"required": ["service", "filename", "line_number", "new_code"],
},
},
{
"name": "run_tests",
"description": (
"Execute a test suite for a service. "
"suite options: 'unit' (fast, 1 step), "
"'integration' (2 steps), 'load' (3 steps), 'security' (2 steps)."
),
"parameters": {
"type": "object",
"properties": {
"service": {"type": "string"},
"suite": {"type": "string", "enum": ["unit", "integration", "load", "security"],
"default": "unit"},
},
"required": ["service"],
},
},
{
"name": "check_dependency",
"description": "Show the data-flow relationship between two services.",
"parameters": {
"type": "object",
"properties": {
"service_a": {"type": "string"},
"service_b": {"type": "string"},
},
"required": ["service_a", "service_b"],
},
},
{
"name": "read_logs",
"description": "Pull recent logs for a service filtered by log level.",
"parameters": {
"type": "object",
"properties": {
"service": {"type": "string"},
"log_level": {"type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG"],
"default": "ERROR"},
"last_n_lines": {"type": "integer", "default": 20},
},
"required": ["service"],
},
},
{
"name": "git_blame",
"description": "Find who/what last changed a specific line — reveals AI-generated code.",
"parameters": {
"type": "object",
"properties": {
"service": {"type": "string"},
"filename": {"type": "string"},
"line_number": {"type": "integer"},
},
"required": ["service", "filename", "line_number"],
},
},
{
"name": "rollback",
"description": (
"Roll back a service's database migration by version string. "
"HIGH COST — use only when a bad migration is the root cause."
),
"parameters": {
"type": "object",
"properties": {
"service": {"type": "string"},
"version": {"type": "string", "description": "Migration version, e.g. '003'"},
},
"required": ["service", "version"],
},
},
{
"name": "query_metrics_history",
"description": "Show how a metric changed over time — reveals when the problem started.",
"parameters": {
"type": "object",
"properties": {
"service": {"type": "string"},
"metric": {"type": "string",
"enum": ["cpu_percent", "memory_mb", "error_rate",
"p99_latency_ms", "request_queue"]},
"hours_back": {"type": "integer", "default": 6},
},
"required": ["service", "metric"],
},
},
{
"name": "ask_senior_sre",
"description": (
"Ask the on-call Senior SRE for a hint. "
"Costs 2 reward steps. Use when genuinely stuck."
),
"parameters": {
"type": "object",
"properties": {
"question": {"type": "string"},
},
"required": ["question"],
},
},
{
"name": "write_incident_report",
"description": (
"Close the incident by submitting a post-mortem report. "
"MUST be called after fixing the bug to end the episode."
),
"parameters": {
"type": "object",
"properties": {
"root_cause": {"type": "string"},
"fix_applied": {"type": "string"},
"services_affected": {"type": "array", "items": {"type": "string"}},
"severity_classification": {"type": "string", "enum": ["P0", "P1", "P2"]},
},
"required": ["root_cause", "fix_applied", "services_affected", "severity_classification"],
},
},
]
# ---------------------------------------------------------------------------
# Per-task contextual log data
# ---------------------------------------------------------------------------
_TASK_LOGS: Dict[int, Dict[str, Dict[str, str]]] = {
1: {
"ad_ranking": {
"ERROR": (
"[2026-04-24 03:14:21] ERROR ad_ranking.ranker: "
"AttributeError: 'dict' object has no attribute 'get_clicks'\n"
" File ranker.py, line 22, in score_ads\n"
" click_rate = ad.get_clicks() / max(ad.get('impressions', 1), 1)\n"
"[2026-04-24 03:14:22] ERROR ad_ranking.ranker: same error (x487 in last 60s)"
),
"DEBUG": (
"[2026-04-24 03:14:20] DEBUG ad_ranking.ranker: fetch_candidate_ads returned 12 ads\n"
"[2026-04-24 03:14:21] DEBUG ad_ranking.ranker: entering score_ads with 12 ads\n"
"[2026-04-24 03:14:21] DEBUG ad_ranking.ranker: processing ad_id=ad_001 — CRASH"
),
},
},
2: {
"capi_pipeline": {
"WARN": (
"[2026-04-24 03:00:05] WARN capi_pipeline.transformer: "
"event_time 1700000000 converted to 1700000 — check threshold\n"
"[2026-04-24 03:00:05] WARN capi_pipeline.transformer: "
"event_time 1745392000 converted to 1745392 — data from 1970-01-20"
),
"DEBUG": (
"[2026-04-24 02:14:03] DEBUG capi_pipeline.transformer: "
"_normalize_timestamp called with ts=1700000000\n"
"[2026-04-24 02:14:03] DEBUG capi_pipeline.transformer: "
"ts > 1_000_000_000 → True, returning ts // 1000 = 1700000\n"
"[2026-04-24 02:14:03] DEBUG capi_pipeline.transformer: "
"EXPECTED: ts > 1_000_000_000_000 for millisecond timestamps"
),
"ERROR": "[2026-04-24 03:00:00] INFO capi_pipeline: No errors — pipeline healthy",
},
"ad_ranking": {
"WARN": (
"[2026-04-24 03:01:00] WARN ad_ranking.ranker: "
"ROAS attribution window: events from 1970-01-20 (expected: 2023+)\n"
"[2026-04-24 03:01:01] WARN ad_ranking.attribution: "
"Conversion events all timestamped <86400 (one day in 1970)"
),
},
},
3: {
"whatsapp_sync": {
"ERROR": (
"[2026-04-24 03:10:00] ERROR whatsapp_sync.handler: "
"asyncpg.exceptions.TooManyConnectionsError: pool exhausted\n"
"[2026-04-24 03:10:02] ERROR whatsapp_sync.handler: "
"sync_user_messages acquire() blocked for user_id=8841923\n"
"[2026-04-24 03:10:05] ERROR whatsapp_sync.handler: "
"490/500 connections allocated — 0 available"
),
"DEBUG": (
"[2026-04-24 03:09:00] DEBUG whatsapp_sync.handler: "
"sync_user_messages — db_pool.acquire() called\n"
"[2026-04-24 03:09:00] DEBUG whatsapp_sync.handler: "
"sync_user_messages — conn acquired, fetching messages\n"
"[2026-04-24 03:09:00] DEBUG whatsapp_sync.handler: "
"sync_user_messages — messages fetched, returning\n"
"NOTE: No 'release' log line — connection never returned to pool"
),
},
},
4: {
"whatsapp_sync": {
"ERROR": (
"[2026-04-24 02:14:31] ERROR whatsapp_sync.db: "
"asyncpg.ForeignKeyViolationError: "
"insert into user_preferences violates FK constraint\n"
"[2026-04-24 02:14:31] ERROR whatsapp_sync.db: "
"migration 003 failed — circular FK: messages ↔ message_threads\n"
"[2026-04-24 02:14:31] ERROR whatsapp_sync.db: "
"ALTER TABLE messages failed — message_threads.id referenced before table commit"
),
},
"ad_ranking": {
"ERROR": (
"[2026-04-24 02:15:00] ERROR ad_ranking: "
"DB pool returning FK violation errors from upstream\n"
"[2026-04-24 02:15:01] WARN ad_ranking: "
"This is a SYMPTOM — root cause is in whatsapp_sync migration"
),
},
"capi_pipeline": {
"WARN": (
"[2026-04-24 02:15:00] WARN capi_pipeline: "
"Event association latency +340ms — DB pool contention\n"
"[2026-04-24 02:15:00] WARN capi_pipeline: "
"This is a SYMPTOM — root cause is in whatsapp_sync migration"
),
},
},
5: {
"capi_pipeline": {
"DEBUG": (
"[2026-04-24 02:00:00] DEBUG capi_pipeline.ingestor: "
"DEBUG_MODE=True — including raw payload in response\n"
"[2026-04-24 02:00:00] DEBUG capi_pipeline.ingestor: "
"Response size: 14,382 bytes (expected ~48 bytes)\n"
"[2026-04-24 02:00:01] DEBUG capi_pipeline.ingestor: "
"debug_data.user_emails contains plaintext email fields"
),
"ERROR": "[2026-04-24 02:00:00] INFO capi_pipeline: No errors — unit tests all pass",
},
},
}
_METRICS_HISTORY: Dict[str, Dict[str, list]] = {
"ad_ranking:error_rate": [
(0, 0.0), (1, 0.0), (2, 0.0), (3, 12.3), (4, 12.1), (5, 11.9),
],
"capi_pipeline:error_rate": [
(0, 0.0), (1, 0.0), (2, 0.0), (3, 0.0), (4, 0.0), (5, 0.0),
],
"whatsapp_sync:memory_mb": [
(0, 200), (1, 250), (2, 350), (3, 500), (4, 800), (5, 1200),
],
"whatsapp_sync:request_queue": [
(0, 5), (1, 45), (2, 130), (3, 280), (4, 420), (5, 490),
],
"capi_pipeline:p99_latency_ms": [
(0, 20), (1, 20), (2, 21), (3, 1100), (4, 1080), (5, 1090),
],
}
# ---------------------------------------------------------------------------
# Senior SRE hints — contextually aware
# ---------------------------------------------------------------------------
def _senior_sre_hint(task_id: int, question: str, sre_memory: list, step: int) -> str:
question_lower = question.lower()
memory_text = " ".join(sre_memory).lower()
if task_id == 1:
if "get_clicks" in memory_text or "attributeerror" in question_lower:
return (
"Senior SRE: The AttributeError is very specific — 'dict' has no method "
"'get_clicks'. The ad objects coming from the API are plain Python dicts. "
"You need dict accessor syntax, not method call syntax. "
"Check line 22 of ranker.py."
)
return (
"Senior SRE: Look at the stack trace carefully. "
"The error is on the line that computes click_rate. "
"How are you accessing the 'clicks' field on the ad object?"
)
if task_id == 2:
if "timestamp" in memory_text or "1970" in question_lower or "normalize" in question_lower:
return (
"Senior SRE: The timestamp normalisation logic has an off-by-three-orders-of-magnitude "
"bug. A Unix second timestamp is ~10 digits. A Unix millisecond timestamp is ~13 digits. "
"The condition in _normalize_timestamp() uses the wrong threshold. "
"What number has 13 digits?"
)
if "capi" in memory_text or "capi" in question_lower:
return (
"Senior SRE: You're on the right track — CAPI is the data source for ad attribution. "
"Check the transformer.py file. The event_time values being emitted are wrong — "
"they look like they're in 1970. Where does event_time get processed?"
)
return (
"Senior SRE: The ad ranking ROAS drop is NOT a ranking algorithm bug. "
"The ranking model is working correctly — it just has bad input data. "
"Follow the data upstream. Where do conversion events come from?"
)
if task_id == 3:
if "finally" in memory_text or "release" in question_lower or "pool" in question_lower:
return (
"Senior SRE: Yes — the connection pool is exhausted because connections are "
"acquired but never released. In Python async code, you MUST release connections "
"in a 'finally' block, otherwise an exception will skip the release call entirely. "
"Add: finally: await self.db_pool.release(conn)"
)
return (
"Senior SRE: The load test shows pool exhaustion. "
"Under normal load each sync call is short so you don't notice. "
"Under high load, the missing resource cleanup compounds. "
"Look at how sync_user_messages() handles its DB connection lifecycle."
)
if task_id == 4:
if step > 3 and "migration" not in memory_text:
return (
"Senior SRE (impatient): Stop chasing symptoms! All three services degraded "
"simultaneously at 02:14 UTC — that's when the last deploy landed. "
"Check the DB migration logs. The whatsapp_sync service ran a new migration "
"at that exact time. Look at db.py migration 003."
)
if "migration" in memory_text or "003" in question_lower:
return (
"Senior SRE: Good, you found migration 003. Look at what it does: "
"it adds a column 'thread_id' to messages that references message_threads. "
"But message_threads also references messages. "
"That's a circular FK — PostgreSQL can't resolve the constraint. "
"Remove the ALTER TABLE statement from migration 003."
)
return (
"Senior SRE: Three services failing simultaneously at 02:14 UTC is not a coincidence. "
"Check the deploy logs and DB migration history for that exact timestamp."
)
if task_id == 5:
if "debug" in memory_text or "pii" in question_lower or "response" in question_lower:
return (
"Senior SRE: DEBUG_MODE = True should never reach production. "
"Find that flag in the ingestor and disable it. "
"The security test will verify the response body no longer contains 'debug_data'."
)
if step > 2:
return (
"Senior SRE: The unit tests pass — that's the trap. This is a security bug "
"invisible to unit tests. Run the security test suite instead: "
"run_tests('capi_pipeline', 'security'). "
"Read the DEBUG logs for the ingestor — you'll see the response sizes."
)
return (
"Senior SRE: Something is leaking data in the CAPI ingestor. "
"Response sizes are 70x larger than expected. "
"What conditions cause a larger response body?"
)
return "Senior SRE: Check the logs and follow the data flow upstream."
# ---------------------------------------------------------------------------
# Tool Dispatcher
# ---------------------------------------------------------------------------
class ToolDispatcher:
def __init__(self, episode: "EpisodeManager"):
self.ep = episode
def dispatch(self, tool: str, params: Dict[str, Any]) -> Tuple[float, bool, Any]:
"""Route to the correct tool. Returns (reward_delta, done, output)."""
handlers = {
"view_file": self._view_file,
"edit_line": self._edit_line,
"run_tests": self._run_tests,
"check_dependency": self._check_dependency,
"read_logs": self._read_logs,
"git_blame": self._git_blame,
"rollback": self._rollback,
"query_metrics_history": self._query_metrics_history,
"ask_senior_sre": self._ask_senior_sre,
"write_incident_report": self._write_incident_report,
}
fn = handlers.get(tool)
if fn is None:
r = self.ep.reward.step_reward(tool)
return r, False, f"Unknown tool: {tool}"
return fn(params)
# ------------------------------------------------------------------
# 1. view_file
# ------------------------------------------------------------------
def _view_file(self, p: Dict) -> Tuple[float, bool, Any]:
service = p.get("service", "")
filename = p.get("filename", "")
found, content = self.ep.vfs.read_file(service, filename)
if not found:
r = self.ep.reward.step_reward("view_file")
return r, False, {"error": content}
lines = content.splitlines()
numbered = "\n".join(f"{i+1:4d} {line}" for i, line in enumerate(lines))
# Intermediate reward: opening the right file
task_def = {
1: ("ad_ranking", "ranker.py"),
2: ("capi_pipeline", "transformer.py"),
3: ("whatsapp_sync", "handler.py"),
4: ("whatsapp_sync", "db.py"),
5: ("capi_pipeline", "ingestor.py"),
}.get(self.ep._task_id)
r = self.ep.reward.step_reward("view_file")
if task_def and (service, filename) == task_def:
bonus = self.ep.reward.progress_reward("file_found")
r += bonus
self.ep.add_memory(f"opened root-cause file {service}/{filename}")
return r, False, {
"service": service,
"filename": filename,
"total_lines": len(lines),
"content": numbered,
}
# ------------------------------------------------------------------
# 2. edit_line
# ------------------------------------------------------------------
def _edit_line(self, p: Dict) -> Tuple[float, bool, Any]:
service = p.get("service", "")
filename = p.get("filename", "")
line_number = int(p.get("line_number", 0))
new_code = p.get("new_code", "")
# Anti-cheat: SREs cannot modify test suites during an incident.
# Prevents reward hacking (e.g. deleting asserts to make tests pass).
PROTECTED = ("tests/", "test_", "_test.py", "conftest.py")
if any(guard in filename for guard in PROTECTED):
r = self.ep.reward.step_reward("edit_line", syntax_error=True)
return r, False, {
"error": (
"Error: SREs cannot modify test suites during an incident. "
"Fix the source code, not the tests."
)
}
success, msg = self.ep.vfs.edit_line(
service, filename, line_number, new_code, self.ep._step
)
if not success:
r = self.ep.reward.step_reward("edit_line")
return r, False, {"error": msg}
# Syntax check: look for obvious Python syntax errors in the new line
syntax_error = _has_syntax_error(new_code)
r = self.ep.reward.step_reward("edit_line", syntax_error=syntax_error)
self.ep.add_memory(
f"edited {service}/{filename} line {line_number}: "
f"{new_code[:60]!r}"
)
msg_out = f"Line {line_number} updated."
if syntax_error:
msg_out += " WARNING: possible syntax error detected in replacement line."
return r, False, {"result": msg_out, "syntax_warning": syntax_error}
# ------------------------------------------------------------------
# 3. run_tests
# ------------------------------------------------------------------
def _run_tests(self, p: Dict) -> Tuple[float, bool, Any]:
service = p.get("service", "")
suite = p.get("suite", "unit")
# Suite cost (extra step penalties)
suite_cost = {"unit": 0, "integration": -0.1, "load": -0.2, "security": -0.1}
extra_cost = suite_cost.get(suite, 0)
passed, output, partial = self.ep.grader.run(self.ep._task_id, suite)
r = self.ep.reward.step_reward("run_tests") + extra_cost
self.ep._last_terminal = output
self.ep.add_memory(f"ran {suite} tests for {service}: {'PASS' if passed else 'FAIL'}")
if passed:
self.ep.metrics.mark_fixed(service)
r += self.ep.reward.progress_reward("error_drop")
return r, False, {"passed": passed, "suite": suite, "output": output}
# ------------------------------------------------------------------
# 4. check_dependency
# ------------------------------------------------------------------
def _check_dependency(self, p: Dict) -> Tuple[float, bool, Any]:
from app.engine.manager import DEPENDENCY_GRAPH
a = p.get("service_a", "")
b = p.get("service_b", "")
deps_a = DEPENDENCY_GRAPH.get(a, [])
deps_b = DEPENDENCY_GRAPH.get(b, [])
r = self.ep.reward.step_reward("check_dependency")
relationship = "no direct dependency"
if b in deps_a:
relationship = f"{a} depends on {b} (data flows: {b}{a})"
self.ep.add_memory(f"confirmed: {a} depends on {b}")
r += self.ep.reward.progress_reward("service_id")
elif a in deps_b:
relationship = f"{b} depends on {a} (data flows: {a}{b})"
return r, False, {
"service_a": a,
"service_b": b,
"relationship": relationship,
f"{a}_depends_on": deps_a,
f"{b}_depends_on": deps_b,
}
# ------------------------------------------------------------------
# 5. read_logs
# ------------------------------------------------------------------
def _read_logs(self, p: Dict) -> Tuple[float, bool, Any]:
service = p.get("service", "")
log_level = p.get("log_level", "ERROR")
n = int(p.get("last_n_lines", 20))
task_logs = _TASK_LOGS.get(self.ep._task_id, {})
svc_logs = task_logs.get(service, {})
log_text = svc_logs.get(log_level, f"[{log_level}] No {log_level} logs for {service}")
r = self.ep.reward.step_reward("read_logs")
self.ep.add_memory(f"read {log_level} logs for {service}")
# Partial reward for reading the right service's debug/error logs
right_service = {
1: "ad_ranking", 2: "capi_pipeline", 3: "whatsapp_sync",
4: "whatsapp_sync", 5: "capi_pipeline",
}.get(self.ep._task_id)
if service == right_service and log_level in ("DEBUG", "ERROR"):
r += self.ep.reward.progress_reward("service_id")
return r, False, {"service": service, "log_level": log_level, "logs": log_text}
# ------------------------------------------------------------------
# 6. git_blame
# ------------------------------------------------------------------
def _git_blame(self, p: Dict) -> Tuple[float, bool, Any]:
service = p.get("service", "")
filename = p.get("filename", "")
line_number = int(p.get("line_number", 1))
blame = self.ep.vfs.git_blame(service, filename, line_number)
r = self.ep.reward.step_reward("git_blame")
self.ep.add_memory(f"git blame {service}/{filename}:{line_number}")
return r, False, {"blame": blame}
# ------------------------------------------------------------------
# 7. rollback
# ------------------------------------------------------------------
def _rollback(self, p: Dict) -> Tuple[float, bool, Any]:
service = p.get("service", "")
version = p.get("version", "")
# Only valid for Task 4 and correct service/version
is_correct = (
self.ep._task_id == 4 and
service == "whatsapp_sync" and
version == "003"
)
if is_correct:
# Remove the circular FK from the VFS (simulate rollback)
_, content = self.ep.vfs.read_file("whatsapp_sync", "db.py")
# Strip migration 003 block
lines = content.splitlines()
new_lines = []
skip = False
for line in lines:
if '"version": "003"' in line or "'version': '003'" in line:
skip = True
if skip and line.strip().startswith("}"):
skip = False
continue
if not skip:
new_lines.append(line)
self.ep.vfs._files["whatsapp_sync"]["db.py"] = "\n".join(new_lines)
self.ep.metrics.mark_fixed("whatsapp_sync")
self.ep.metrics.mark_fixed("ad_ranking")
self.ep.metrics.mark_fixed("capi_pipeline")
self.ep.add_memory("rolled back migration 003 — circular FK removed")
r = self.ep.reward.step_reward("rollback")
r += self.ep.reward.progress_reward("error_drop")
return r, False, {
"result": "Migration 003 rolled back successfully. All three services recovering."
}
# Wrong rollback — penalise
r = self.ep.reward.step_reward("rollback", syntax_error=False)
r += self.ep.reward.ROLLBACK_PENALTY # extra penalty via RewardManager field
return r, False, {
"error": (
f"Rollback of {service} v{version} either unnecessary or incorrect. "
"Verify the root cause before rolling back."
)
}
# ------------------------------------------------------------------
# 8. query_metrics_history
# ------------------------------------------------------------------
def _query_metrics_history(self, p: Dict) -> Tuple[float, bool, Any]:
service = p.get("service", "")
metric = p.get("metric", "")
hours_back = int(p.get("hours_back", 6))
key = f"{service}:{metric}"
history = _METRICS_HISTORY.get(key, [])
r = self.ep.reward.step_reward("query_metrics_history")
self.ep.add_memory(f"queried {metric} history for {service}")
if history:
table = "\n".join(
f" T-{hours_back - i}h: {val}" for i, (_, val) in enumerate(history)
)
return r, False, {
"service": service,
"metric": metric,
"history": table,
"note": f"Spike visible at T-{hours_back - 3}h (correlates with 02:14 UTC deploy)",
}
return r, False, {
"service": service,
"metric": metric,
"history": "No historical data for this metric combination.",
}
# ------------------------------------------------------------------
# 9. ask_senior_sre
# ------------------------------------------------------------------
def _ask_senior_sre(self, p: Dict) -> Tuple[float, bool, Any]:
question = p.get("question", "")
hint = _senior_sre_hint(
self.ep._task_id,
question,
self.ep._sre_memory,
self.ep._step,
)
# 2-step penalty
r = self.ep.reward.step_reward("ask_senior_sre") * 2
self.ep.add_memory(f"asked senior SRE: {question[:60]}")
return r, False, {"hint": hint}
# ------------------------------------------------------------------
# 10. write_incident_report
# ------------------------------------------------------------------
def _write_incident_report(self, p: Dict) -> Tuple[float, bool, Any]:
from app.models import IncidentReport
report = IncidentReport(
root_cause=p.get("root_cause", ""),
fix_applied=p.get("fix_applied", ""),
services_affected=p.get("services_affected", []),
severity_classification=p.get("severity_classification", "P1"),
)
self.ep._incident_report = report
report_accuracy = self.ep.grader.grade_incident_report(self.ep._task_id, report)
task_def = {1: 15, 2: 20, 3: 20, 4: 25, 5: 20}
sla = task_def.get(self.ep._task_id, 20)
within_sla = self.ep._step <= sla
# Check if tests actually passed
passed, _, _ = self.ep.grader.run(self.ep._task_id)
no_regressions = passed
r = self.ep.reward.step_reward("write_incident_report")
r += self.ep.reward.terminal_reward(
tests_passed=passed,
report_accuracy=report_accuracy,
fixed_within_sla=within_sla,
no_regressions=no_regressions,
task_id=self.ep._task_id,
)
summary = (
f"Incident {self.ep._incident_id} closed.\n"
f"Report accuracy: {report_accuracy:.0%}\n"
f"Tests passed: {passed}\n"
f"Within SLA: {within_sla}\n"
f"Normalized score: {self.ep.reward.normalized_score():.3f}"
)
# Update difficulty controller
self.ep.dc.update(self.ep._task_id, self.ep.reward.normalized_score())
return r, True, {"summary": summary, "report_accuracy": report_accuracy}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _has_syntax_error(line: str) -> bool:
"""Quick heuristic check for obvious Python syntax mistakes in a single line."""
stripped = line.strip()
# Unmatched brackets
for open_, close_ in [("(", ")"), ("[", "]"), ("{", "}")]:
if stripped.count(open_) != stripped.count(close_):
return True
# Ends with lone colon inside dict/call (not a block statement)
# Detect obvious incomplete assignments
if re.search(r"=\s*$", stripped):
return True
return False