her / engine /core /hygiene.py
geekwrestler's picture
Squash history (purge pre-scrub demo session blobs)
5f43c7d
"""hygiene.py — extra FIXABLE signals beyond loops/rereads/clusters.
NON-NEGOTIABLE #1/#6: pure code, NO model; each detector fires only on a NAMED,
FIXABLE pattern and maps to a cited Anthropic best practice. The LLM later writes
the scoped advice; this module only decides whether the pattern occurred.
Three detectors (the "safe, high-value" set):
* read_burst — many DISTINCT files read in ONE turn -> use a subagent
* unverified_edit — the session edited code but ran no test/build/lint at all
* near_repeat — a near-identical command re-run several times in a turn
All thresholds live here, in one place, conservative on purpose.
"""
from __future__ import annotations
import re
from typing import Any, Optional
# A single turn that reads this many DISTINCT files is doing unscoped exploration
# that belongs in a subagent (Anthropic: "the infinite exploration").
READ_BURST_MIN = 12
# A session that makes this many edits with ZERO verification commands anywhere is
# the "trust-then-verify gap". Session-level (one finding), not per-turn.
EDIT_MIN = 5
# A near-identical (normalized) command re-run this many times in one turn is worth
# a redirect. Higher than loops.py's >=2 hint bar, to stay quiet on benign iteration.
NEAR_REPEAT_MIN = 4
# A turn that MANUALLY probes an MCP server (curl to a /mcp endpoint, reads
# .mcp.json, or sends a JSON-RPC initialize) while using NO loaded mcp__* tool is
# reaching for a server that isn't in the session. MCP servers are discovered at
# STARTUP, so probing can't make one appear — it just burns round-trips (the exact
# pattern that started this build: the smruti MCP wasn't loaded, the agent curled
# its /mcp endpoint, and the human eventually had to exit and restart). Fire when a
# turn clears this many probe commands. Conservative: a single config peek won't trip.
MCP_PROBE_MIN = 3
_MCP_PROBE_RE = re.compile(
r"https?://[^\s'\"]*?/mcp(?:[/?\s\"']|$)" # curl/http to an /mcp endpoint
r"|\.mcp\.json" # reading the MCP server config
r"|[\"']?jsonrpc[\"']?\s*[:=]" # a JSON-RPC payload
r"|method[\"']?\s*[:=]\s*[\"'](?:initialize|tools/list|tools/call)",
re.IGNORECASE,
)
# Commands that count as "verifying the work" (test / build / lint / typecheck).
_VERIFY_RE = re.compile(
r"\b(pytest|unittest|nose2|jest|vitest|mocha|ava|rspec|phpunit|"
r"go\s+test|cargo\s+test|cargo\s+build|go\s+build|dotnet\s+test|ctest|tox|"
r"npm\s+(run\s+)?(test|build|lint|typecheck)|yarn\s+(test|build|lint)|"
r"pnpm\s+(test|build|lint)|gradle|mvn\s+(test|verify|package)|\./gradlew|"
r"make(\s|$)|tsc(\s|$)|eslint|ruff|mypy|flake8|pylint|pyright|"
r"(bash|sh|\./)\S*test\S*\.(sh|py|js|ts))\b",
re.IGNORECASE,
)
def _read_files(turn) -> set:
files = set()
for tc in turn.tools:
if getattr(tc, "name", "") == "Read":
inp = tc.input if isinstance(tc.input, dict) else {}
fp = inp.get("file_path")
if fp:
files.add(fp)
return files
def detect_read_bursts(turns) -> list[dict[str, Any]]:
"""Turns that read >= READ_BURST_MIN distinct files (unscoped exploration)."""
out = []
for t in turns:
n = len(_read_files(t))
if n >= READ_BURST_MIN:
out.append({"turn": t.i, "files": n})
return out
def detect_unverified_edits(turns) -> Optional[dict[str, Any]]:
"""Session-level: >= EDIT_MIN edits AND no verification command anywhere -> one
finding (or None). The turns list (capped) is for citation chips."""
edit_turns: list[int] = []
n_edits = 0
verified = False
for t in turns:
te = 0
for tc in t.tools:
name = getattr(tc, "name", "")
if name in ("Edit", "Write"):
te += 1
elif name == "Bash":
inp = tc.input if isinstance(tc.input, dict) else {}
if _VERIFY_RE.search(str(inp.get("command", "") or "")):
verified = True
if te:
n_edits += te
edit_turns.append(t.i)
if n_edits >= EDIT_MIN and not verified:
return {"turns": edit_turns[:6], "edits": n_edits}
return None
def detect_near_repeats(loops_by_turn) -> list[dict[str, Any]]:
"""Near-identical (normalized) commands re-run >= NEAR_REPEAT_MIN times in a
turn. Consumes loops.py's already-computed `near_identical` hints (never the
real-loop set, which is advised separately)."""
out = []
for ti, tl in sorted(loops_by_turn.items()):
for ni in tl.near_identical:
if ni.count >= NEAR_REPEAT_MIN:
out.append({"turn": ti, "command": ni.normalized[:80], "count": ni.count})
return out
def detect_unloaded_mcp(turns) -> Optional[dict[str, Any]]:
"""Session-level: turns where the agent MANUALLY probed an MCP server (curl a
/mcp endpoint, read .mcp.json, JSON-RPC initialize) and used NO loaded mcp__*
tool in that turn — i.e. reaching for a server that isn't in the session.
Returns {turns:[i], probes:n} or None. The fix is a RESTART, not more probing
(MCP servers load at startup) — cited to Anthropic's 'course-correct early /
restart rather than re-correct a polluted context' guidance. The turns list
drives citation chips; the scoped narrator names the actual server."""
probe_turns: list[int] = []
total = 0
for t in turns:
probes = 0
used_mcp = False
for tc in t.tools:
name = getattr(tc, "name", "")
if name.startswith("mcp__"):
used_mcp = True
elif name == "Bash":
inp = tc.input if isinstance(tc.input, dict) else {}
if _MCP_PROBE_RE.search(str(inp.get("command", "") or "")):
probes += 1
if probes >= MCP_PROBE_MIN and not used_mcp:
probe_turns.append(t.i)
total += probes
if probe_turns:
return {"turns": probe_turns, "probes": total}
return None