"""hygiene.py — extra FIXABLE signals beyond loops/rereads/clusters. NON-NEGOTIABLE #1/#6: pure code, NO model; each detector fires only on a NAMED, FIXABLE pattern and maps to a cited Anthropic best practice. The LLM later writes the scoped advice; this module only decides whether the pattern occurred. Three detectors (the "safe, high-value" set): * read_burst — many DISTINCT files read in ONE turn -> use a subagent * unverified_edit — the session edited code but ran no test/build/lint at all * near_repeat — a near-identical command re-run several times in a turn All thresholds live here, in one place, conservative on purpose. """ from __future__ import annotations import re from typing import Any, Optional # A single turn that reads this many DISTINCT files is doing unscoped exploration # that belongs in a subagent (Anthropic: "the infinite exploration"). READ_BURST_MIN = 12 # A session that makes this many edits with ZERO verification commands anywhere is # the "trust-then-verify gap". Session-level (one finding), not per-turn. EDIT_MIN = 5 # A near-identical (normalized) command re-run this many times in one turn is worth # a redirect. Higher than loops.py's >=2 hint bar, to stay quiet on benign iteration. NEAR_REPEAT_MIN = 4 # A turn that MANUALLY probes an MCP server (curl to a /mcp endpoint, reads # .mcp.json, or sends a JSON-RPC initialize) while using NO loaded mcp__* tool is # reaching for a server that isn't in the session. MCP servers are discovered at # STARTUP, so probing can't make one appear — it just burns round-trips (the exact # pattern that started this build: the smruti MCP wasn't loaded, the agent curled # its /mcp endpoint, and the human eventually had to exit and restart). Fire when a # turn clears this many probe commands. Conservative: a single config peek won't trip. MCP_PROBE_MIN = 3 _MCP_PROBE_RE = re.compile( r"https?://[^\s'\"]*?/mcp(?:[/?\s\"']|$)" # curl/http to an /mcp endpoint r"|\.mcp\.json" # reading the MCP server config r"|[\"']?jsonrpc[\"']?\s*[:=]" # a JSON-RPC payload r"|method[\"']?\s*[:=]\s*[\"'](?:initialize|tools/list|tools/call)", re.IGNORECASE, ) # Commands that count as "verifying the work" (test / build / lint / typecheck). _VERIFY_RE = re.compile( r"\b(pytest|unittest|nose2|jest|vitest|mocha|ava|rspec|phpunit|" r"go\s+test|cargo\s+test|cargo\s+build|go\s+build|dotnet\s+test|ctest|tox|" r"npm\s+(run\s+)?(test|build|lint|typecheck)|yarn\s+(test|build|lint)|" r"pnpm\s+(test|build|lint)|gradle|mvn\s+(test|verify|package)|\./gradlew|" r"make(\s|$)|tsc(\s|$)|eslint|ruff|mypy|flake8|pylint|pyright|" r"(bash|sh|\./)\S*test\S*\.(sh|py|js|ts))\b", re.IGNORECASE, ) def _read_files(turn) -> set: files = set() for tc in turn.tools: if getattr(tc, "name", "") == "Read": inp = tc.input if isinstance(tc.input, dict) else {} fp = inp.get("file_path") if fp: files.add(fp) return files def detect_read_bursts(turns) -> list[dict[str, Any]]: """Turns that read >= READ_BURST_MIN distinct files (unscoped exploration).""" out = [] for t in turns: n = len(_read_files(t)) if n >= READ_BURST_MIN: out.append({"turn": t.i, "files": n}) return out def detect_unverified_edits(turns) -> Optional[dict[str, Any]]: """Session-level: >= EDIT_MIN edits AND no verification command anywhere -> one finding (or None). The turns list (capped) is for citation chips.""" edit_turns: list[int] = [] n_edits = 0 verified = False for t in turns: te = 0 for tc in t.tools: name = getattr(tc, "name", "") if name in ("Edit", "Write"): te += 1 elif name == "Bash": inp = tc.input if isinstance(tc.input, dict) else {} if _VERIFY_RE.search(str(inp.get("command", "") or "")): verified = True if te: n_edits += te edit_turns.append(t.i) if n_edits >= EDIT_MIN and not verified: return {"turns": edit_turns[:6], "edits": n_edits} return None def detect_near_repeats(loops_by_turn) -> list[dict[str, Any]]: """Near-identical (normalized) commands re-run >= NEAR_REPEAT_MIN times in a turn. Consumes loops.py's already-computed `near_identical` hints (never the real-loop set, which is advised separately).""" out = [] for ti, tl in sorted(loops_by_turn.items()): for ni in tl.near_identical: if ni.count >= NEAR_REPEAT_MIN: out.append({"turn": ti, "command": ni.normalized[:80], "count": ni.count}) return out def detect_unloaded_mcp(turns) -> Optional[dict[str, Any]]: """Session-level: turns where the agent MANUALLY probed an MCP server (curl a /mcp endpoint, read .mcp.json, JSON-RPC initialize) and used NO loaded mcp__* tool in that turn — i.e. reaching for a server that isn't in the session. Returns {turns:[i], probes:n} or None. The fix is a RESTART, not more probing (MCP servers load at startup) — cited to Anthropic's 'course-correct early / restart rather than re-correct a polluted context' guidance. The turns list drives citation chips; the scoped narrator names the actual server.""" probe_turns: list[int] = [] total = 0 for t in turns: probes = 0 used_mcp = False for tc in t.tools: name = getattr(tc, "name", "") if name.startswith("mcp__"): used_mcp = True elif name == "Bash": inp = tc.input if isinstance(tc.input, dict) else {} if _MCP_PROBE_RE.search(str(inp.get("command", "") or "")): probes += 1 if probes >= MCP_PROBE_MIN and not used_mcp: probe_turns.append(t.i) total += probes if probe_turns: return {"turns": probe_turns, "probes": total} return None