""" All 10 agent tools — implemented as plain Python functions wrapped in ToolDispatcher. Each tool returns (reward_delta: float, done: bool, output: Any). The EpisodeManager calls ToolDispatcher.dispatch(tool, params). """ from __future__ import annotations import re from typing import TYPE_CHECKING, Any, Dict, Tuple if TYPE_CHECKING: from app.engine.manager import EpisodeManager # --------------------------------------------------------------------------- # Schema exposed to the LLM (OpenEnv tool_spec format) # --------------------------------------------------------------------------- TOOL_SPECS = [ { "name": "view_file", "description": ( "Read the contents of a file in a service codebase. " "Use this BEFORE editing to understand the code." ), "parameters": { "type": "object", "properties": { "service": {"type": "string", "enum": ["ad_ranking", "capi_pipeline", "whatsapp_sync"]}, "filename": {"type": "string", "description": "e.g. ranker.py"}, }, "required": ["service", "filename"], }, }, { "name": "edit_line", "description": ( "Replace a single line in a file. SURGICAL edits only — " "do NOT rewrite whole functions. One line at a time." ), "parameters": { "type": "object", "properties": { "service": {"type": "string"}, "filename": {"type": "string"}, "line_number": {"type": "integer", "description": "1-based line number"}, "new_code": {"type": "string", "description": "Replacement line (preserve indentation)"}, }, "required": ["service", "filename", "line_number", "new_code"], }, }, { "name": "run_tests", "description": ( "Execute a test suite for a service. " "suite options: 'unit' (fast, 1 step), " "'integration' (2 steps), 'load' (3 steps), 'security' (2 steps)." ), "parameters": { "type": "object", "properties": { "service": {"type": "string"}, "suite": {"type": "string", "enum": ["unit", "integration", "load", "security"], "default": "unit"}, }, "required": ["service"], }, }, { "name": "check_dependency", "description": "Show the data-flow relationship between two services.", "parameters": { "type": "object", "properties": { "service_a": {"type": "string"}, "service_b": {"type": "string"}, }, "required": ["service_a", "service_b"], }, }, { "name": "read_logs", "description": "Pull recent logs for a service filtered by log level.", "parameters": { "type": "object", "properties": { "service": {"type": "string"}, "log_level": {"type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG"], "default": "ERROR"}, "last_n_lines": {"type": "integer", "default": 20}, }, "required": ["service"], }, }, { "name": "git_blame", "description": "Find who/what last changed a specific line — reveals AI-generated code.", "parameters": { "type": "object", "properties": { "service": {"type": "string"}, "filename": {"type": "string"}, "line_number": {"type": "integer"}, }, "required": ["service", "filename", "line_number"], }, }, { "name": "rollback", "description": ( "Roll back a service's database migration by version string. " "HIGH COST — use only when a bad migration is the root cause." ), "parameters": { "type": "object", "properties": { "service": {"type": "string"}, "version": {"type": "string", "description": "Migration version, e.g. '003'"}, }, "required": ["service", "version"], }, }, { "name": "query_metrics_history", "description": "Show how a metric changed over time — reveals when the problem started.", "parameters": { "type": "object", "properties": { "service": {"type": "string"}, "metric": {"type": "string", "enum": ["cpu_percent", "memory_mb", "error_rate", "p99_latency_ms", "request_queue"]}, "hours_back": {"type": "integer", "default": 6}, }, "required": ["service", "metric"], }, }, { "name": "ask_senior_sre", "description": ( "Ask the on-call Senior SRE for a hint. " "Costs 2 reward steps. Use when genuinely stuck." ), "parameters": { "type": "object", "properties": { "question": {"type": "string"}, }, "required": ["question"], }, }, { "name": "write_incident_report", "description": ( "Close the incident by submitting a post-mortem report. " "MUST be called after fixing the bug to end the episode." ), "parameters": { "type": "object", "properties": { "root_cause": {"type": "string"}, "fix_applied": {"type": "string"}, "services_affected": {"type": "array", "items": {"type": "string"}}, "severity_classification": {"type": "string", "enum": ["P0", "P1", "P2"]}, }, "required": ["root_cause", "fix_applied", "services_affected", "severity_classification"], }, }, ] # --------------------------------------------------------------------------- # Per-task contextual log data # --------------------------------------------------------------------------- _TASK_LOGS: Dict[int, Dict[str, Dict[str, str]]] = { 1: { "ad_ranking": { "ERROR": ( "[2026-04-24 03:14:21] ERROR ad_ranking.ranker: " "AttributeError: 'dict' object has no attribute 'get_clicks'\n" " File ranker.py, line 22, in score_ads\n" " click_rate = ad.get_clicks() / max(ad.get('impressions', 1), 1)\n" "[2026-04-24 03:14:22] ERROR ad_ranking.ranker: same error (x487 in last 60s)" ), "DEBUG": ( "[2026-04-24 03:14:20] DEBUG ad_ranking.ranker: fetch_candidate_ads returned 12 ads\n" "[2026-04-24 03:14:21] DEBUG ad_ranking.ranker: entering score_ads with 12 ads\n" "[2026-04-24 03:14:21] DEBUG ad_ranking.ranker: processing ad_id=ad_001 — CRASH" ), }, }, 2: { "capi_pipeline": { "WARN": ( "[2026-04-24 03:00:05] WARN capi_pipeline.transformer: " "event_time 1700000000 converted to 1700000 — check threshold\n" "[2026-04-24 03:00:05] WARN capi_pipeline.transformer: " "event_time 1745392000 converted to 1745392 — data from 1970-01-20" ), "DEBUG": ( "[2026-04-24 02:14:03] DEBUG capi_pipeline.transformer: " "_normalize_timestamp called with ts=1700000000\n" "[2026-04-24 02:14:03] DEBUG capi_pipeline.transformer: " "ts > 1_000_000_000 → True, returning ts // 1000 = 1700000\n" "[2026-04-24 02:14:03] DEBUG capi_pipeline.transformer: " "EXPECTED: ts > 1_000_000_000_000 for millisecond timestamps" ), "ERROR": "[2026-04-24 03:00:00] INFO capi_pipeline: No errors — pipeline healthy", }, "ad_ranking": { "WARN": ( "[2026-04-24 03:01:00] WARN ad_ranking.ranker: " "ROAS attribution window: events from 1970-01-20 (expected: 2023+)\n" "[2026-04-24 03:01:01] WARN ad_ranking.attribution: " "Conversion events all timestamped <86400 (one day in 1970)" ), }, }, 3: { "whatsapp_sync": { "ERROR": ( "[2026-04-24 03:10:00] ERROR whatsapp_sync.handler: " "asyncpg.exceptions.TooManyConnectionsError: pool exhausted\n" "[2026-04-24 03:10:02] ERROR whatsapp_sync.handler: " "sync_user_messages acquire() blocked for user_id=8841923\n" "[2026-04-24 03:10:05] ERROR whatsapp_sync.handler: " "490/500 connections allocated — 0 available" ), "DEBUG": ( "[2026-04-24 03:09:00] DEBUG whatsapp_sync.handler: " "sync_user_messages — db_pool.acquire() called\n" "[2026-04-24 03:09:00] DEBUG whatsapp_sync.handler: " "sync_user_messages — conn acquired, fetching messages\n" "[2026-04-24 03:09:00] DEBUG whatsapp_sync.handler: " "sync_user_messages — messages fetched, returning\n" "NOTE: No 'release' log line — connection never returned to pool" ), }, }, 4: { "whatsapp_sync": { "ERROR": ( "[2026-04-24 02:14:31] ERROR whatsapp_sync.db: " "asyncpg.ForeignKeyViolationError: " "insert into user_preferences violates FK constraint\n" "[2026-04-24 02:14:31] ERROR whatsapp_sync.db: " "migration 003 failed — circular FK: messages ↔ message_threads\n" "[2026-04-24 02:14:31] ERROR whatsapp_sync.db: " "ALTER TABLE messages failed — message_threads.id referenced before table commit" ), }, "ad_ranking": { "ERROR": ( "[2026-04-24 02:15:00] ERROR ad_ranking: " "DB pool returning FK violation errors from upstream\n" "[2026-04-24 02:15:01] WARN ad_ranking: " "This is a SYMPTOM — root cause is in whatsapp_sync migration" ), }, "capi_pipeline": { "WARN": ( "[2026-04-24 02:15:00] WARN capi_pipeline: " "Event association latency +340ms — DB pool contention\n" "[2026-04-24 02:15:00] WARN capi_pipeline: " "This is a SYMPTOM — root cause is in whatsapp_sync migration" ), }, }, 5: { "capi_pipeline": { "DEBUG": ( "[2026-04-24 02:00:00] DEBUG capi_pipeline.ingestor: " "DEBUG_MODE=True — including raw payload in response\n" "[2026-04-24 02:00:00] DEBUG capi_pipeline.ingestor: " "Response size: 14,382 bytes (expected ~48 bytes)\n" "[2026-04-24 02:00:01] DEBUG capi_pipeline.ingestor: " "debug_data.user_emails contains plaintext email fields" ), "ERROR": "[2026-04-24 02:00:00] INFO capi_pipeline: No errors — unit tests all pass", }, }, } _METRICS_HISTORY: Dict[str, Dict[str, list]] = { "ad_ranking:error_rate": [ (0, 0.0), (1, 0.0), (2, 0.0), (3, 12.3), (4, 12.1), (5, 11.9), ], "capi_pipeline:error_rate": [ (0, 0.0), (1, 0.0), (2, 0.0), (3, 0.0), (4, 0.0), (5, 0.0), ], "whatsapp_sync:memory_mb": [ (0, 200), (1, 250), (2, 350), (3, 500), (4, 800), (5, 1200), ], "whatsapp_sync:request_queue": [ (0, 5), (1, 45), (2, 130), (3, 280), (4, 420), (5, 490), ], "capi_pipeline:p99_latency_ms": [ (0, 20), (1, 20), (2, 21), (3, 1100), (4, 1080), (5, 1090), ], } # --------------------------------------------------------------------------- # Senior SRE hints — contextually aware # --------------------------------------------------------------------------- def _senior_sre_hint(task_id: int, question: str, sre_memory: list, step: int) -> str: question_lower = question.lower() memory_text = " ".join(sre_memory).lower() if task_id == 1: if "get_clicks" in memory_text or "attributeerror" in question_lower: return ( "Senior SRE: The AttributeError is very specific — 'dict' has no method " "'get_clicks'. The ad objects coming from the API are plain Python dicts. " "You need dict accessor syntax, not method call syntax. " "Check line 22 of ranker.py." ) return ( "Senior SRE: Look at the stack trace carefully. " "The error is on the line that computes click_rate. " "How are you accessing the 'clicks' field on the ad object?" ) if task_id == 2: if "timestamp" in memory_text or "1970" in question_lower or "normalize" in question_lower: return ( "Senior SRE: The timestamp normalisation logic has an off-by-three-orders-of-magnitude " "bug. A Unix second timestamp is ~10 digits. A Unix millisecond timestamp is ~13 digits. " "The condition in _normalize_timestamp() uses the wrong threshold. " "What number has 13 digits?" ) if "capi" in memory_text or "capi" in question_lower: return ( "Senior SRE: You're on the right track — CAPI is the data source for ad attribution. " "Check the transformer.py file. The event_time values being emitted are wrong — " "they look like they're in 1970. Where does event_time get processed?" ) return ( "Senior SRE: The ad ranking ROAS drop is NOT a ranking algorithm bug. " "The ranking model is working correctly — it just has bad input data. " "Follow the data upstream. Where do conversion events come from?" ) if task_id == 3: if "finally" in memory_text or "release" in question_lower or "pool" in question_lower: return ( "Senior SRE: Yes — the connection pool is exhausted because connections are " "acquired but never released. In Python async code, you MUST release connections " "in a 'finally' block, otherwise an exception will skip the release call entirely. " "Add: finally: await self.db_pool.release(conn)" ) return ( "Senior SRE: The load test shows pool exhaustion. " "Under normal load each sync call is short so you don't notice. " "Under high load, the missing resource cleanup compounds. " "Look at how sync_user_messages() handles its DB connection lifecycle." ) if task_id == 4: if step > 3 and "migration" not in memory_text: return ( "Senior SRE (impatient): Stop chasing symptoms! All three services degraded " "simultaneously at 02:14 UTC — that's when the last deploy landed. " "Check the DB migration logs. The whatsapp_sync service ran a new migration " "at that exact time. Look at db.py migration 003." ) if "migration" in memory_text or "003" in question_lower: return ( "Senior SRE: Good, you found migration 003. Look at what it does: " "it adds a column 'thread_id' to messages that references message_threads. " "But message_threads also references messages. " "That's a circular FK — PostgreSQL can't resolve the constraint. " "Remove the ALTER TABLE statement from migration 003." ) return ( "Senior SRE: Three services failing simultaneously at 02:14 UTC is not a coincidence. " "Check the deploy logs and DB migration history for that exact timestamp." ) if task_id == 5: if "debug" in memory_text or "pii" in question_lower or "response" in question_lower: return ( "Senior SRE: DEBUG_MODE = True should never reach production. " "Find that flag in the ingestor and disable it. " "The security test will verify the response body no longer contains 'debug_data'." ) if step > 2: return ( "Senior SRE: The unit tests pass — that's the trap. This is a security bug " "invisible to unit tests. Run the security test suite instead: " "run_tests('capi_pipeline', 'security'). " "Read the DEBUG logs for the ingestor — you'll see the response sizes." ) return ( "Senior SRE: Something is leaking data in the CAPI ingestor. " "Response sizes are 70x larger than expected. " "What conditions cause a larger response body?" ) return "Senior SRE: Check the logs and follow the data flow upstream." # --------------------------------------------------------------------------- # Tool Dispatcher # --------------------------------------------------------------------------- class ToolDispatcher: def __init__(self, episode: "EpisodeManager"): self.ep = episode def dispatch(self, tool: str, params: Dict[str, Any]) -> Tuple[float, bool, Any]: """Route to the correct tool. Returns (reward_delta, done, output).""" handlers = { "view_file": self._view_file, "edit_line": self._edit_line, "run_tests": self._run_tests, "check_dependency": self._check_dependency, "read_logs": self._read_logs, "git_blame": self._git_blame, "rollback": self._rollback, "query_metrics_history": self._query_metrics_history, "ask_senior_sre": self._ask_senior_sre, "write_incident_report": self._write_incident_report, } fn = handlers.get(tool) if fn is None: r = self.ep.reward.step_reward(tool) return r, False, f"Unknown tool: {tool}" return fn(params) # ------------------------------------------------------------------ # 1. view_file # ------------------------------------------------------------------ def _view_file(self, p: Dict) -> Tuple[float, bool, Any]: service = p.get("service", "") filename = p.get("filename", "") found, content = self.ep.vfs.read_file(service, filename) if not found: r = self.ep.reward.step_reward("view_file") return r, False, {"error": content} lines = content.splitlines() numbered = "\n".join(f"{i+1:4d} {line}" for i, line in enumerate(lines)) # Intermediate reward: opening the right file task_def = { 1: ("ad_ranking", "ranker.py"), 2: ("capi_pipeline", "transformer.py"), 3: ("whatsapp_sync", "handler.py"), 4: ("whatsapp_sync", "db.py"), 5: ("capi_pipeline", "ingestor.py"), }.get(self.ep._task_id) r = self.ep.reward.step_reward("view_file") if task_def and (service, filename) == task_def: bonus = self.ep.reward.progress_reward("file_found") r += bonus self.ep.add_memory(f"opened root-cause file {service}/{filename}") return r, False, { "service": service, "filename": filename, "total_lines": len(lines), "content": numbered, } # ------------------------------------------------------------------ # 2. edit_line # ------------------------------------------------------------------ def _edit_line(self, p: Dict) -> Tuple[float, bool, Any]: service = p.get("service", "") filename = p.get("filename", "") line_number = int(p.get("line_number", 0)) new_code = p.get("new_code", "") # Anti-cheat: SREs cannot modify test suites during an incident. # Prevents reward hacking (e.g. deleting asserts to make tests pass). PROTECTED = ("tests/", "test_", "_test.py", "conftest.py") if any(guard in filename for guard in PROTECTED): r = self.ep.reward.step_reward("edit_line", syntax_error=True) return r, False, { "error": ( "Error: SREs cannot modify test suites during an incident. " "Fix the source code, not the tests." ) } success, msg = self.ep.vfs.edit_line( service, filename, line_number, new_code, self.ep._step ) if not success: r = self.ep.reward.step_reward("edit_line") return r, False, {"error": msg} # Syntax check: look for obvious Python syntax errors in the new line syntax_error = _has_syntax_error(new_code) r = self.ep.reward.step_reward("edit_line", syntax_error=syntax_error) self.ep.add_memory( f"edited {service}/{filename} line {line_number}: " f"{new_code[:60]!r}" ) msg_out = f"Line {line_number} updated." if syntax_error: msg_out += " WARNING: possible syntax error detected in replacement line." return r, False, {"result": msg_out, "syntax_warning": syntax_error} # ------------------------------------------------------------------ # 3. run_tests # ------------------------------------------------------------------ def _run_tests(self, p: Dict) -> Tuple[float, bool, Any]: service = p.get("service", "") suite = p.get("suite", "unit") # Suite cost (extra step penalties) suite_cost = {"unit": 0, "integration": -0.1, "load": -0.2, "security": -0.1} extra_cost = suite_cost.get(suite, 0) passed, output, partial = self.ep.grader.run(self.ep._task_id, suite) r = self.ep.reward.step_reward("run_tests") + extra_cost self.ep._last_terminal = output self.ep.add_memory(f"ran {suite} tests for {service}: {'PASS' if passed else 'FAIL'}") if passed: self.ep.metrics.mark_fixed(service) r += self.ep.reward.progress_reward("error_drop") return r, False, {"passed": passed, "suite": suite, "output": output} # ------------------------------------------------------------------ # 4. check_dependency # ------------------------------------------------------------------ def _check_dependency(self, p: Dict) -> Tuple[float, bool, Any]: from app.engine.manager import DEPENDENCY_GRAPH a = p.get("service_a", "") b = p.get("service_b", "") deps_a = DEPENDENCY_GRAPH.get(a, []) deps_b = DEPENDENCY_GRAPH.get(b, []) r = self.ep.reward.step_reward("check_dependency") relationship = "no direct dependency" if b in deps_a: relationship = f"{a} depends on {b} (data flows: {b} → {a})" self.ep.add_memory(f"confirmed: {a} depends on {b}") r += self.ep.reward.progress_reward("service_id") elif a in deps_b: relationship = f"{b} depends on {a} (data flows: {a} → {b})" return r, False, { "service_a": a, "service_b": b, "relationship": relationship, f"{a}_depends_on": deps_a, f"{b}_depends_on": deps_b, } # ------------------------------------------------------------------ # 5. read_logs # ------------------------------------------------------------------ def _read_logs(self, p: Dict) -> Tuple[float, bool, Any]: service = p.get("service", "") log_level = p.get("log_level", "ERROR") n = int(p.get("last_n_lines", 20)) task_logs = _TASK_LOGS.get(self.ep._task_id, {}) svc_logs = task_logs.get(service, {}) log_text = svc_logs.get(log_level, f"[{log_level}] No {log_level} logs for {service}") r = self.ep.reward.step_reward("read_logs") self.ep.add_memory(f"read {log_level} logs for {service}") # Partial reward for reading the right service's debug/error logs right_service = { 1: "ad_ranking", 2: "capi_pipeline", 3: "whatsapp_sync", 4: "whatsapp_sync", 5: "capi_pipeline", }.get(self.ep._task_id) if service == right_service and log_level in ("DEBUG", "ERROR"): r += self.ep.reward.progress_reward("service_id") return r, False, {"service": service, "log_level": log_level, "logs": log_text} # ------------------------------------------------------------------ # 6. git_blame # ------------------------------------------------------------------ def _git_blame(self, p: Dict) -> Tuple[float, bool, Any]: service = p.get("service", "") filename = p.get("filename", "") line_number = int(p.get("line_number", 1)) blame = self.ep.vfs.git_blame(service, filename, line_number) r = self.ep.reward.step_reward("git_blame") self.ep.add_memory(f"git blame {service}/{filename}:{line_number}") return r, False, {"blame": blame} # ------------------------------------------------------------------ # 7. rollback # ------------------------------------------------------------------ def _rollback(self, p: Dict) -> Tuple[float, bool, Any]: service = p.get("service", "") version = p.get("version", "") # Only valid for Task 4 and correct service/version is_correct = ( self.ep._task_id == 4 and service == "whatsapp_sync" and version == "003" ) if is_correct: # Remove the circular FK from the VFS (simulate rollback) _, content = self.ep.vfs.read_file("whatsapp_sync", "db.py") # Strip migration 003 block lines = content.splitlines() new_lines = [] skip = False for line in lines: if '"version": "003"' in line or "'version': '003'" in line: skip = True if skip and line.strip().startswith("}"): skip = False continue if not skip: new_lines.append(line) self.ep.vfs._files["whatsapp_sync"]["db.py"] = "\n".join(new_lines) self.ep.metrics.mark_fixed("whatsapp_sync") self.ep.metrics.mark_fixed("ad_ranking") self.ep.metrics.mark_fixed("capi_pipeline") self.ep.add_memory("rolled back migration 003 — circular FK removed") r = self.ep.reward.step_reward("rollback") r += self.ep.reward.progress_reward("error_drop") return r, False, { "result": "Migration 003 rolled back successfully. All three services recovering." } # Wrong rollback — penalise r = self.ep.reward.step_reward("rollback", syntax_error=False) r += self.ep.reward.ROLLBACK_PENALTY # extra penalty via RewardManager field return r, False, { "error": ( f"Rollback of {service} v{version} either unnecessary or incorrect. " "Verify the root cause before rolling back." ) } # ------------------------------------------------------------------ # 8. query_metrics_history # ------------------------------------------------------------------ def _query_metrics_history(self, p: Dict) -> Tuple[float, bool, Any]: service = p.get("service", "") metric = p.get("metric", "") hours_back = int(p.get("hours_back", 6)) key = f"{service}:{metric}" history = _METRICS_HISTORY.get(key, []) r = self.ep.reward.step_reward("query_metrics_history") self.ep.add_memory(f"queried {metric} history for {service}") if history: table = "\n".join( f" T-{hours_back - i}h: {val}" for i, (_, val) in enumerate(history) ) return r, False, { "service": service, "metric": metric, "history": table, "note": f"Spike visible at T-{hours_back - 3}h (correlates with 02:14 UTC deploy)", } return r, False, { "service": service, "metric": metric, "history": "No historical data for this metric combination.", } # ------------------------------------------------------------------ # 9. ask_senior_sre # ------------------------------------------------------------------ def _ask_senior_sre(self, p: Dict) -> Tuple[float, bool, Any]: question = p.get("question", "") hint = _senior_sre_hint( self.ep._task_id, question, self.ep._sre_memory, self.ep._step, ) # 2-step penalty r = self.ep.reward.step_reward("ask_senior_sre") * 2 self.ep.add_memory(f"asked senior SRE: {question[:60]}") return r, False, {"hint": hint} # ------------------------------------------------------------------ # 10. write_incident_report # ------------------------------------------------------------------ def _write_incident_report(self, p: Dict) -> Tuple[float, bool, Any]: from app.models import IncidentReport report = IncidentReport( root_cause=p.get("root_cause", ""), fix_applied=p.get("fix_applied", ""), services_affected=p.get("services_affected", []), severity_classification=p.get("severity_classification", "P1"), ) self.ep._incident_report = report report_accuracy = self.ep.grader.grade_incident_report(self.ep._task_id, report) task_def = {1: 15, 2: 20, 3: 20, 4: 25, 5: 20} sla = task_def.get(self.ep._task_id, 20) within_sla = self.ep._step <= sla # Check if tests actually passed passed, _, _ = self.ep.grader.run(self.ep._task_id) no_regressions = passed r = self.ep.reward.step_reward("write_incident_report") r += self.ep.reward.terminal_reward( tests_passed=passed, report_accuracy=report_accuracy, fixed_within_sla=within_sla, no_regressions=no_regressions, task_id=self.ep._task_id, ) summary = ( f"Incident {self.ep._incident_id} closed.\n" f"Report accuracy: {report_accuracy:.0%}\n" f"Tests passed: {passed}\n" f"Within SLA: {within_sla}\n" f"Normalized score: {self.ep.reward.normalized_score():.3f}" ) # Update difficulty controller self.ep.dc.update(self.ep._task_id, self.ep.reward.normalized_score()) return r, True, {"summary": summary, "report_accuracy": report_accuracy} # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _has_syntax_error(line: str) -> bool: """Quick heuristic check for obvious Python syntax mistakes in a single line.""" stripped = line.strip() # Unmatched brackets for open_, close_ in [("(", ")"), ("[", "]"), ("{", "}")]: if stripped.count(open_) != stripped.count(close_): return True # Ends with lone colon inside dict/call (not a block statement) # Detect obvious incomplete assignments if re.search(r"=\s*$", stripped): return True return False