Spaces:
Running on Zero
Running on Zero
| """hygiene.py — extra FIXABLE signals beyond loops/rereads/clusters. | |
| NON-NEGOTIABLE #1/#6: pure code, NO model; each detector fires only on a NAMED, | |
| FIXABLE pattern and maps to a cited Anthropic best practice. The LLM later writes | |
| the scoped advice; this module only decides whether the pattern occurred. | |
| Three detectors (the "safe, high-value" set): | |
| * read_burst — many DISTINCT files read in ONE turn -> use a subagent | |
| * unverified_edit — the session edited code but ran no test/build/lint at all | |
| * near_repeat — a near-identical command re-run several times in a turn | |
| All thresholds live here, in one place, conservative on purpose. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| from typing import Any, Optional | |
| # A single turn that reads this many DISTINCT files is doing unscoped exploration | |
| # that belongs in a subagent (Anthropic: "the infinite exploration"). | |
| READ_BURST_MIN = 12 | |
| # A session that makes this many edits with ZERO verification commands anywhere is | |
| # the "trust-then-verify gap". Session-level (one finding), not per-turn. | |
| EDIT_MIN = 5 | |
| # A near-identical (normalized) command re-run this many times in one turn is worth | |
| # a redirect. Higher than loops.py's >=2 hint bar, to stay quiet on benign iteration. | |
| NEAR_REPEAT_MIN = 4 | |
| # A turn that MANUALLY probes an MCP server (curl to a /mcp endpoint, reads | |
| # .mcp.json, or sends a JSON-RPC initialize) while using NO loaded mcp__* tool is | |
| # reaching for a server that isn't in the session. MCP servers are discovered at | |
| # STARTUP, so probing can't make one appear — it just burns round-trips (the exact | |
| # pattern that started this build: the smruti MCP wasn't loaded, the agent curled | |
| # its /mcp endpoint, and the human eventually had to exit and restart). Fire when a | |
| # turn clears this many probe commands. Conservative: a single config peek won't trip. | |
| MCP_PROBE_MIN = 3 | |
| _MCP_PROBE_RE = re.compile( | |
| r"https?://[^\s'\"]*?/mcp(?:[/?\s\"']|$)" # curl/http to an /mcp endpoint | |
| r"|\.mcp\.json" # reading the MCP server config | |
| r"|[\"']?jsonrpc[\"']?\s*[:=]" # a JSON-RPC payload | |
| r"|method[\"']?\s*[:=]\s*[\"'](?:initialize|tools/list|tools/call)", | |
| re.IGNORECASE, | |
| ) | |
| # Commands that count as "verifying the work" (test / build / lint / typecheck). | |
| _VERIFY_RE = re.compile( | |
| r"\b(pytest|unittest|nose2|jest|vitest|mocha|ava|rspec|phpunit|" | |
| r"go\s+test|cargo\s+test|cargo\s+build|go\s+build|dotnet\s+test|ctest|tox|" | |
| r"npm\s+(run\s+)?(test|build|lint|typecheck)|yarn\s+(test|build|lint)|" | |
| r"pnpm\s+(test|build|lint)|gradle|mvn\s+(test|verify|package)|\./gradlew|" | |
| r"make(\s|$)|tsc(\s|$)|eslint|ruff|mypy|flake8|pylint|pyright|" | |
| r"(bash|sh|\./)\S*test\S*\.(sh|py|js|ts))\b", | |
| re.IGNORECASE, | |
| ) | |
| def _read_files(turn) -> set: | |
| files = set() | |
| for tc in turn.tools: | |
| if getattr(tc, "name", "") == "Read": | |
| inp = tc.input if isinstance(tc.input, dict) else {} | |
| fp = inp.get("file_path") | |
| if fp: | |
| files.add(fp) | |
| return files | |
| def detect_read_bursts(turns) -> list[dict[str, Any]]: | |
| """Turns that read >= READ_BURST_MIN distinct files (unscoped exploration).""" | |
| out = [] | |
| for t in turns: | |
| n = len(_read_files(t)) | |
| if n >= READ_BURST_MIN: | |
| out.append({"turn": t.i, "files": n}) | |
| return out | |
| def detect_unverified_edits(turns) -> Optional[dict[str, Any]]: | |
| """Session-level: >= EDIT_MIN edits AND no verification command anywhere -> one | |
| finding (or None). The turns list (capped) is for citation chips.""" | |
| edit_turns: list[int] = [] | |
| n_edits = 0 | |
| verified = False | |
| for t in turns: | |
| te = 0 | |
| for tc in t.tools: | |
| name = getattr(tc, "name", "") | |
| if name in ("Edit", "Write"): | |
| te += 1 | |
| elif name == "Bash": | |
| inp = tc.input if isinstance(tc.input, dict) else {} | |
| if _VERIFY_RE.search(str(inp.get("command", "") or "")): | |
| verified = True | |
| if te: | |
| n_edits += te | |
| edit_turns.append(t.i) | |
| if n_edits >= EDIT_MIN and not verified: | |
| return {"turns": edit_turns[:6], "edits": n_edits} | |
| return None | |
| def detect_near_repeats(loops_by_turn) -> list[dict[str, Any]]: | |
| """Near-identical (normalized) commands re-run >= NEAR_REPEAT_MIN times in a | |
| turn. Consumes loops.py's already-computed `near_identical` hints (never the | |
| real-loop set, which is advised separately).""" | |
| out = [] | |
| for ti, tl in sorted(loops_by_turn.items()): | |
| for ni in tl.near_identical: | |
| if ni.count >= NEAR_REPEAT_MIN: | |
| out.append({"turn": ti, "command": ni.normalized[:80], "count": ni.count}) | |
| return out | |
| def detect_unloaded_mcp(turns) -> Optional[dict[str, Any]]: | |
| """Session-level: turns where the agent MANUALLY probed an MCP server (curl a | |
| /mcp endpoint, read .mcp.json, JSON-RPC initialize) and used NO loaded mcp__* | |
| tool in that turn — i.e. reaching for a server that isn't in the session. | |
| Returns {turns:[i], probes:n} or None. The fix is a RESTART, not more probing | |
| (MCP servers load at startup) — cited to Anthropic's 'course-correct early / | |
| restart rather than re-correct a polluted context' guidance. The turns list | |
| drives citation chips; the scoped narrator names the actual server.""" | |
| probe_turns: list[int] = [] | |
| total = 0 | |
| for t in turns: | |
| probes = 0 | |
| used_mcp = False | |
| for tc in t.tools: | |
| name = getattr(tc, "name", "") | |
| if name.startswith("mcp__"): | |
| used_mcp = True | |
| elif name == "Bash": | |
| inp = tc.input if isinstance(tc.input, dict) else {} | |
| if _MCP_PROBE_RE.search(str(inp.get("command", "") or "")): | |
| probes += 1 | |
| if probes >= MCP_PROBE_MIN and not used_mcp: | |
| probe_turns.append(t.i) | |
| total += probes | |
| if probe_turns: | |
| return {"turns": probe_turns, "probes": total} | |
| return None | |