her / engine /core /impact.py
geekwrestler's picture
Squash history (purge pre-scrub demo session blobs)
5f43c7d
"""impact.py — "Actions worth reviewing", risk level, and session outcome.
The Session/Project Report's safety lens. DETERMINISTIC, NO model (Non-negotiable
#1): it scans the Bash commands a session ran for NAMED, high-impact operations
and reports them with the turn(s) they happened in, plus a rolled-up risk level and
a plain outcome read. SUGGEST, never assert (NN#7).
The rules are DATA, not code: they live in `narrator/knowledge/impact-rules.json`
(editable — add a tool by adding a rule, no code change) and cover the common
stack (railway, vercel, netlify, fly, cloudflare, aws, gcloud, azure, kubernetes,
docker, terraform, supabase, heroku, n8n, …). The guiding principle, per the
owner: in Her, ANY modification to a running/deployed service counts — deploy,
restart, scale, exec-in, hosted-var change — not just a literal "deploy". So the
detector flags the OPERATION (e.g. `railway ssh`/`run`, `kubectl exec`, a deploy),
which is robust even when the real change runs from a file the command-text can't
see (`psql -f migrate.sql`).
PRECISION via structural guards applied to EVERY rule (so it doesn't cry wolf):
* read/search heads are skipped — `grep "CREATE ROLE"`, `cat .env | grep`,
`cat > f <<SQL` are looking FOR / writing the text, not doing it.
* `--help` / `--version` / `-h` / a bare `help` subcommand are skipped (probing,
not acting — e.g. `railway up --help`).
* SQL-privilege/data rules REQUIRE a real database client on the command's FIRST
line (psql/mysql/surreal/…), so a python heredoc carrying "CREATE ROLE" as a
string, or a grep for it, never counts.
Operates on the CONTRACT dicts (turn dicts post to_dict), never raw JSONL.
"""
from __future__ import annotations
import json
import os
import re
from typing import Any, Optional
_HERE = os.path.dirname(os.path.abspath(__file__))
_REPO = os.path.dirname(os.path.dirname(_HERE))
_RULES_PATH = os.path.join(_REPO, "narrator", "knowledge", "impact-rules.json")
# Heads that mean the line is READING/SEARCHING for a pattern, not doing it.
_READ_HEADS = (
"grep", "egrep", "fgrep", "rg", "ag", "ack", "sed", "awk", "cat", "echo",
"printf", "less", "more", "head", "tail", "wc", "find", "cut", "sort", "uniq",
"diff", "comm", "tr", "column", "jq", "yq", "tee",
)
_READ_HEAD_RE = re.compile(r"^\s*(" + "|".join(_READ_HEADS) + r")\b")
# A --help / --version / -h / bare-`help` invocation: probing a command, not running it.
_HELP_RE = re.compile(r"(--help\b|--version\b|(?:^|\s)-h(?=\s|$)|(?:^|\s)help(?=\s|$))", re.IGNORECASE)
# Segment separators (also newline, for heredoc commands).
_SEP_RE = re.compile(r"&&|\|\|?|;|\n")
# A real DB CLIENT must be present (first line) for a SQL rule to count as RUN.
_DB_EXEC_RE = re.compile(
r"\b(psql|mysql|mariadb|surreal(?:\s+sql)?|mongosh|mongo|sqlite3|cockroach|"
r"createuser|createdb|dropuser|pg_restore|railway\s+connect|railway\s+run)\b",
re.IGNORECASE,
)
# Per-tag severity ordering for display + rollup. DEV (local servers/containers)
# is last — it's flagged for visibility but only ever reaches "Low" risk.
_TAG_ORDER = {"LIVE": 0, "SECURITY": 1, "DATA": 2, "NETWORK": 3, "CONFIG": 4, "DEV": 5}
_RULES_CACHE: Optional[tuple[int, list[dict[str, Any]]]] = None
def _load_rules() -> list[dict[str, Any]]:
"""Compile the editable ruleset (cached by mtime; robust to missing/corrupt -> [])."""
global _RULES_CACHE
try:
mtime = os.stat(_RULES_PATH).st_mtime_ns
except OSError:
return []
if _RULES_CACHE and _RULES_CACHE[0] == mtime:
return _RULES_CACHE[1]
try:
with open(_RULES_PATH, encoding="utf-8") as f:
raw = json.load(f)
except (OSError, ValueError):
return []
out: list[dict[str, Any]] = []
for r in raw.get("rules", []) if isinstance(raw, dict) else []:
if not isinstance(r, dict) or not r.get("match") or not r.get("tag"):
continue
try:
rx = re.compile(r["match"], 0 if r.get("caseSensitive") else re.IGNORECASE)
except re.error:
continue # a bad regex in the data file never breaks analysis
out.append({
"tag": r["tag"], "title": r.get("title", r["tag"]),
"detail": r.get("detail", ""), "rx": rx,
"multiline": bool(r.get("multiline")),
"requiresDbClient": bool(r.get("requiresDbClient")),
})
_RULES_CACHE = (mtime, out)
return out
def _first_line(cmd: str) -> str:
return cmd.strip().split("\n", 1)[0]
def _segment(text: str, pos: int) -> str:
"""The command segment (between top-level separators / newlines) containing
the match at `pos` — used to test read-head and --help context."""
start = 0
for m in _SEP_RE.finditer(text[:pos]):
start = m.end()
nxt = _SEP_RE.search(text, pos)
end = nxt.start() if nxt else len(text)
return text[start:end]
def _scan_command(cmd: str) -> list[tuple[str, str, str]]:
"""[(tag, title, detail)] for one Bash command (deduped per command)."""
if not cmd:
return []
first = _first_line(cmd)
out: list[tuple[str, str, str]] = []
seen: set[tuple[str, str]] = set()
for rule in _load_rules():
hay = cmd if rule["multiline"] else first
if rule["requiresDbClient"] and not _DB_EXEC_RE.search(first):
continue # SQL text with no real db client on the invocation line
# Check EVERY match, not just the first — a command often echoes a label
# ("echo '=== railway connect ==='; railway connect …"): the first match is
# inside the echo (a read-head segment) while the REAL one is a later
# segment. Fire on the first match whose segment passes the guards.
for m in rule["rx"].finditer(hay):
seg = _segment(hay, m.start())
if _READ_HEAD_RE.match(seg) or _HELP_RE.search(seg):
continue # reading/searching for it, or just probing --help
key = (rule["tag"], rule["title"])
if key not in seen:
seen.add(key)
out.append((rule["tag"], rule["title"], rule["detail"]))
break
return out
def detect_impact(
turns: list[dict[str, Any]],
binaries: Optional[list[dict[str, Any]]] = None,
) -> dict[str, Any]:
"""Return {riskLevel, riskReason, actions:[…], outcome:{label,detail}}.
`actions` items: {tag, title, detail, turns:[i]}. Empty actions + an honest
outcome is a valid result (NN#6)."""
agg: dict[tuple[str, str], dict[str, Any]] = {}
for t in turns:
ti = t.get("i")
for tc in t.get("tools", []) or []:
if (tc.get("name") or "") != "Bash":
continue
inp = tc.get("input") if isinstance(tc.get("input"), dict) else {}
cmd = str(inp.get("command", "") or "")
for tag, title, detail in _scan_command(cmd):
row = agg.setdefault((tag, title), {"tag": tag, "title": title, "detail": detail, "turns": set()})
row["turns"].add(ti)
actions = []
for row in agg.values():
r = dict(row)
r["turns"] = sorted(row["turns"])
actions.append(r)
actions.sort(key=lambda a: (_TAG_ORDER.get(a["tag"], 9), a["turns"][0] if a["turns"] else 1e9, a["title"]))
sensitive_bins = [b for b in (binaries or []) if b.get("security")]
level, risk_reason = risk_level(actions, len(sensitive_bins))
return {
"riskLevel": level,
"riskReason": risk_reason,
"actions": actions,
"outcome": _outcome(turns),
}
def risk_level(actions: list[dict[str, Any]], sensitive_count: int = 0) -> tuple[str, str]:
"""(level, reason) rolled up from a set of actions + count of sensitive tools.
Shared by the per-session and whole-project rollups so they agree. Calibrated so
a single deploy reads Medium and High needs a genuinely alarming combination;
merely USING a sensitive tool only reaches Low."""
n = {tag: sum(1 for a in actions if a.get("tag") == tag) for tag in _TAG_ORDER}
live, sec, data, net, cfg, dev = n["LIVE"], n["SECURITY"], n["DATA"], n["NETWORK"], n["CONFIG"], n["DEV"]
if (sec and (live or data)) or sec >= 2 or live >= 3 or (data and live):
level = "High"
elif live or sec or data:
level = "Medium"
elif net or cfg or dev or sensitive_count:
level = "Low"
else:
level = "None"
bits = [f"{n[t]} {t.lower()}" for t in ("LIVE", "SECURITY", "DATA", "NETWORK", "CONFIG", "DEV") if n[t]]
if not bits and sensitive_count:
bits.append(f"{sensitive_count} sensitive tool{'s' if sensitive_count != 1 else ''} used")
return level, (", ".join(bits) if bits else "no high-impact actions detected")
def _outcome(turns: list[dict[str, Any]]) -> dict[str, str]:
"""A deterministic, modest read of how the session went — a rollup of already-
computed counts (errored tools, retry-loop guides), never a model judgment."""
total = errored = loops = 0
last_errored = False
for t in turns:
tl = t.get("tools", []) or []
errored += sum(1 for tc in tl if tc.get("errored"))
total += len(tl)
g = t.get("guide")
if g and g.get("kind") == "loop":
loops += 1
if turns:
last_errored = any(tc.get("errored") for tc in (turns[-1].get("tools", []) or []))
ratio = (errored / total) if total else 0.0
if loops == 0 and ratio < 0.12 and not last_errored:
label = "Succeeded"
detail = "No retry loops" + (f" · {errored}/{total} tool calls errored" if errored else " · clean run")
elif ratio < 0.30 and not (last_errored and loops):
label = "Completed with retries"
detail = (f"{loops} retry loop{'s' if loops != 1 else ''}" if loops else f"{errored}/{total} tool calls errored")
else:
label = "Rough / check the end"
detail = f"{errored}/{total} tool calls errored" + (f", {loops} retry loop{'s' if loops != 1 else ''}" if loops else "")
return {"label": label, "detail": detail}