"""impact.py — "Actions worth reviewing", risk level, and session outcome. The Session/Project Report's safety lens. DETERMINISTIC, NO model (Non-negotiable #1): it scans the Bash commands a session ran for NAMED, high-impact operations and reports them with the turn(s) they happened in, plus a rolled-up risk level and a plain outcome read. SUGGEST, never assert (NN#7). The rules are DATA, not code: they live in `narrator/knowledge/impact-rules.json` (editable — add a tool by adding a rule, no code change) and cover the common stack (railway, vercel, netlify, fly, cloudflare, aws, gcloud, azure, kubernetes, docker, terraform, supabase, heroku, n8n, …). The guiding principle, per the owner: in Her, ANY modification to a running/deployed service counts — deploy, restart, scale, exec-in, hosted-var change — not just a literal "deploy". So the detector flags the OPERATION (e.g. `railway ssh`/`run`, `kubectl exec`, a deploy), which is robust even when the real change runs from a file the command-text can't see (`psql -f migrate.sql`). PRECISION via structural guards applied to EVERY rule (so it doesn't cry wolf): * read/search heads are skipped — `grep "CREATE ROLE"`, `cat .env | grep`, `cat > f < list[dict[str, Any]]: """Compile the editable ruleset (cached by mtime; robust to missing/corrupt -> []).""" global _RULES_CACHE try: mtime = os.stat(_RULES_PATH).st_mtime_ns except OSError: return [] if _RULES_CACHE and _RULES_CACHE[0] == mtime: return _RULES_CACHE[1] try: with open(_RULES_PATH, encoding="utf-8") as f: raw = json.load(f) except (OSError, ValueError): return [] out: list[dict[str, Any]] = [] for r in raw.get("rules", []) if isinstance(raw, dict) else []: if not isinstance(r, dict) or not r.get("match") or not r.get("tag"): continue try: rx = re.compile(r["match"], 0 if r.get("caseSensitive") else re.IGNORECASE) except re.error: continue # a bad regex in the data file never breaks analysis out.append({ "tag": r["tag"], "title": r.get("title", r["tag"]), "detail": r.get("detail", ""), "rx": rx, "multiline": bool(r.get("multiline")), "requiresDbClient": bool(r.get("requiresDbClient")), }) _RULES_CACHE = (mtime, out) return out def _first_line(cmd: str) -> str: return cmd.strip().split("\n", 1)[0] def _segment(text: str, pos: int) -> str: """The command segment (between top-level separators / newlines) containing the match at `pos` — used to test read-head and --help context.""" start = 0 for m in _SEP_RE.finditer(text[:pos]): start = m.end() nxt = _SEP_RE.search(text, pos) end = nxt.start() if nxt else len(text) return text[start:end] def _scan_command(cmd: str) -> list[tuple[str, str, str]]: """[(tag, title, detail)] for one Bash command (deduped per command).""" if not cmd: return [] first = _first_line(cmd) out: list[tuple[str, str, str]] = [] seen: set[tuple[str, str]] = set() for rule in _load_rules(): hay = cmd if rule["multiline"] else first if rule["requiresDbClient"] and not _DB_EXEC_RE.search(first): continue # SQL text with no real db client on the invocation line # Check EVERY match, not just the first — a command often echoes a label # ("echo '=== railway connect ==='; railway connect …"): the first match is # inside the echo (a read-head segment) while the REAL one is a later # segment. Fire on the first match whose segment passes the guards. for m in rule["rx"].finditer(hay): seg = _segment(hay, m.start()) if _READ_HEAD_RE.match(seg) or _HELP_RE.search(seg): continue # reading/searching for it, or just probing --help key = (rule["tag"], rule["title"]) if key not in seen: seen.add(key) out.append((rule["tag"], rule["title"], rule["detail"])) break return out def detect_impact( turns: list[dict[str, Any]], binaries: Optional[list[dict[str, Any]]] = None, ) -> dict[str, Any]: """Return {riskLevel, riskReason, actions:[…], outcome:{label,detail}}. `actions` items: {tag, title, detail, turns:[i]}. Empty actions + an honest outcome is a valid result (NN#6).""" agg: dict[tuple[str, str], dict[str, Any]] = {} for t in turns: ti = t.get("i") for tc in t.get("tools", []) or []: if (tc.get("name") or "") != "Bash": continue inp = tc.get("input") if isinstance(tc.get("input"), dict) else {} cmd = str(inp.get("command", "") or "") for tag, title, detail in _scan_command(cmd): row = agg.setdefault((tag, title), {"tag": tag, "title": title, "detail": detail, "turns": set()}) row["turns"].add(ti) actions = [] for row in agg.values(): r = dict(row) r["turns"] = sorted(row["turns"]) actions.append(r) actions.sort(key=lambda a: (_TAG_ORDER.get(a["tag"], 9), a["turns"][0] if a["turns"] else 1e9, a["title"])) sensitive_bins = [b for b in (binaries or []) if b.get("security")] level, risk_reason = risk_level(actions, len(sensitive_bins)) return { "riskLevel": level, "riskReason": risk_reason, "actions": actions, "outcome": _outcome(turns), } def risk_level(actions: list[dict[str, Any]], sensitive_count: int = 0) -> tuple[str, str]: """(level, reason) rolled up from a set of actions + count of sensitive tools. Shared by the per-session and whole-project rollups so they agree. Calibrated so a single deploy reads Medium and High needs a genuinely alarming combination; merely USING a sensitive tool only reaches Low.""" n = {tag: sum(1 for a in actions if a.get("tag") == tag) for tag in _TAG_ORDER} live, sec, data, net, cfg, dev = n["LIVE"], n["SECURITY"], n["DATA"], n["NETWORK"], n["CONFIG"], n["DEV"] if (sec and (live or data)) or sec >= 2 or live >= 3 or (data and live): level = "High" elif live or sec or data: level = "Medium" elif net or cfg or dev or sensitive_count: level = "Low" else: level = "None" bits = [f"{n[t]} {t.lower()}" for t in ("LIVE", "SECURITY", "DATA", "NETWORK", "CONFIG", "DEV") if n[t]] if not bits and sensitive_count: bits.append(f"{sensitive_count} sensitive tool{'s' if sensitive_count != 1 else ''} used") return level, (", ".join(bits) if bits else "no high-impact actions detected") def _outcome(turns: list[dict[str, Any]]) -> dict[str, str]: """A deterministic, modest read of how the session went — a rollup of already- computed counts (errored tools, retry-loop guides), never a model judgment.""" total = errored = loops = 0 last_errored = False for t in turns: tl = t.get("tools", []) or [] errored += sum(1 for tc in tl if tc.get("errored")) total += len(tl) g = t.get("guide") if g and g.get("kind") == "loop": loops += 1 if turns: last_errored = any(tc.get("errored") for tc in (turns[-1].get("tools", []) or [])) ratio = (errored / total) if total else 0.0 if loops == 0 and ratio < 0.12 and not last_errored: label = "Succeeded" detail = "No retry loops" + (f" · {errored}/{total} tool calls errored" if errored else " · clean run") elif ratio < 0.30 and not (last_errored and loops): label = "Completed with retries" detail = (f"{loops} retry loop{'s' if loops != 1 else ''}" if loops else f"{errored}/{total} tool calls errored") else: label = "Rough / check the end" detail = f"{errored}/{total} tool calls errored" + (f", {loops} retry loop{'s' if loops != 1 else ''}" if loops else "") return {"label": label, "detail": detail}