Spaces:
Running on Zero
Running on Zero
| """impact.py — "Actions worth reviewing", risk level, and session outcome. | |
| The Session/Project Report's safety lens. DETERMINISTIC, NO model (Non-negotiable | |
| #1): it scans the Bash commands a session ran for NAMED, high-impact operations | |
| and reports them with the turn(s) they happened in, plus a rolled-up risk level and | |
| a plain outcome read. SUGGEST, never assert (NN#7). | |
| The rules are DATA, not code: they live in `narrator/knowledge/impact-rules.json` | |
| (editable — add a tool by adding a rule, no code change) and cover the common | |
| stack (railway, vercel, netlify, fly, cloudflare, aws, gcloud, azure, kubernetes, | |
| docker, terraform, supabase, heroku, n8n, …). The guiding principle, per the | |
| owner: in Her, ANY modification to a running/deployed service counts — deploy, | |
| restart, scale, exec-in, hosted-var change — not just a literal "deploy". So the | |
| detector flags the OPERATION (e.g. `railway ssh`/`run`, `kubectl exec`, a deploy), | |
| which is robust even when the real change runs from a file the command-text can't | |
| see (`psql -f migrate.sql`). | |
| PRECISION via structural guards applied to EVERY rule (so it doesn't cry wolf): | |
| * read/search heads are skipped — `grep "CREATE ROLE"`, `cat .env | grep`, | |
| `cat > f <<SQL` are looking FOR / writing the text, not doing it. | |
| * `--help` / `--version` / `-h` / a bare `help` subcommand are skipped (probing, | |
| not acting — e.g. `railway up --help`). | |
| * SQL-privilege/data rules REQUIRE a real database client on the command's FIRST | |
| line (psql/mysql/surreal/…), so a python heredoc carrying "CREATE ROLE" as a | |
| string, or a grep for it, never counts. | |
| Operates on the CONTRACT dicts (turn dicts post to_dict), never raw JSONL. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import re | |
| from typing import Any, Optional | |
| _HERE = os.path.dirname(os.path.abspath(__file__)) | |
| _REPO = os.path.dirname(os.path.dirname(_HERE)) | |
| _RULES_PATH = os.path.join(_REPO, "narrator", "knowledge", "impact-rules.json") | |
| # Heads that mean the line is READING/SEARCHING for a pattern, not doing it. | |
| _READ_HEADS = ( | |
| "grep", "egrep", "fgrep", "rg", "ag", "ack", "sed", "awk", "cat", "echo", | |
| "printf", "less", "more", "head", "tail", "wc", "find", "cut", "sort", "uniq", | |
| "diff", "comm", "tr", "column", "jq", "yq", "tee", | |
| ) | |
| _READ_HEAD_RE = re.compile(r"^\s*(" + "|".join(_READ_HEADS) + r")\b") | |
| # A --help / --version / -h / bare-`help` invocation: probing a command, not running it. | |
| _HELP_RE = re.compile(r"(--help\b|--version\b|(?:^|\s)-h(?=\s|$)|(?:^|\s)help(?=\s|$))", re.IGNORECASE) | |
| # Segment separators (also newline, for heredoc commands). | |
| _SEP_RE = re.compile(r"&&|\|\|?|;|\n") | |
| # A real DB CLIENT must be present (first line) for a SQL rule to count as RUN. | |
| _DB_EXEC_RE = re.compile( | |
| r"\b(psql|mysql|mariadb|surreal(?:\s+sql)?|mongosh|mongo|sqlite3|cockroach|" | |
| r"createuser|createdb|dropuser|pg_restore|railway\s+connect|railway\s+run)\b", | |
| re.IGNORECASE, | |
| ) | |
| # Per-tag severity ordering for display + rollup. DEV (local servers/containers) | |
| # is last — it's flagged for visibility but only ever reaches "Low" risk. | |
| _TAG_ORDER = {"LIVE": 0, "SECURITY": 1, "DATA": 2, "NETWORK": 3, "CONFIG": 4, "DEV": 5} | |
| _RULES_CACHE: Optional[tuple[int, list[dict[str, Any]]]] = None | |
| def _load_rules() -> list[dict[str, Any]]: | |
| """Compile the editable ruleset (cached by mtime; robust to missing/corrupt -> []).""" | |
| global _RULES_CACHE | |
| try: | |
| mtime = os.stat(_RULES_PATH).st_mtime_ns | |
| except OSError: | |
| return [] | |
| if _RULES_CACHE and _RULES_CACHE[0] == mtime: | |
| return _RULES_CACHE[1] | |
| try: | |
| with open(_RULES_PATH, encoding="utf-8") as f: | |
| raw = json.load(f) | |
| except (OSError, ValueError): | |
| return [] | |
| out: list[dict[str, Any]] = [] | |
| for r in raw.get("rules", []) if isinstance(raw, dict) else []: | |
| if not isinstance(r, dict) or not r.get("match") or not r.get("tag"): | |
| continue | |
| try: | |
| rx = re.compile(r["match"], 0 if r.get("caseSensitive") else re.IGNORECASE) | |
| except re.error: | |
| continue # a bad regex in the data file never breaks analysis | |
| out.append({ | |
| "tag": r["tag"], "title": r.get("title", r["tag"]), | |
| "detail": r.get("detail", ""), "rx": rx, | |
| "multiline": bool(r.get("multiline")), | |
| "requiresDbClient": bool(r.get("requiresDbClient")), | |
| }) | |
| _RULES_CACHE = (mtime, out) | |
| return out | |
| def _first_line(cmd: str) -> str: | |
| return cmd.strip().split("\n", 1)[0] | |
| def _segment(text: str, pos: int) -> str: | |
| """The command segment (between top-level separators / newlines) containing | |
| the match at `pos` — used to test read-head and --help context.""" | |
| start = 0 | |
| for m in _SEP_RE.finditer(text[:pos]): | |
| start = m.end() | |
| nxt = _SEP_RE.search(text, pos) | |
| end = nxt.start() if nxt else len(text) | |
| return text[start:end] | |
| def _scan_command(cmd: str) -> list[tuple[str, str, str]]: | |
| """[(tag, title, detail)] for one Bash command (deduped per command).""" | |
| if not cmd: | |
| return [] | |
| first = _first_line(cmd) | |
| out: list[tuple[str, str, str]] = [] | |
| seen: set[tuple[str, str]] = set() | |
| for rule in _load_rules(): | |
| hay = cmd if rule["multiline"] else first | |
| if rule["requiresDbClient"] and not _DB_EXEC_RE.search(first): | |
| continue # SQL text with no real db client on the invocation line | |
| # Check EVERY match, not just the first — a command often echoes a label | |
| # ("echo '=== railway connect ==='; railway connect …"): the first match is | |
| # inside the echo (a read-head segment) while the REAL one is a later | |
| # segment. Fire on the first match whose segment passes the guards. | |
| for m in rule["rx"].finditer(hay): | |
| seg = _segment(hay, m.start()) | |
| if _READ_HEAD_RE.match(seg) or _HELP_RE.search(seg): | |
| continue # reading/searching for it, or just probing --help | |
| key = (rule["tag"], rule["title"]) | |
| if key not in seen: | |
| seen.add(key) | |
| out.append((rule["tag"], rule["title"], rule["detail"])) | |
| break | |
| return out | |
| def detect_impact( | |
| turns: list[dict[str, Any]], | |
| binaries: Optional[list[dict[str, Any]]] = None, | |
| ) -> dict[str, Any]: | |
| """Return {riskLevel, riskReason, actions:[…], outcome:{label,detail}}. | |
| `actions` items: {tag, title, detail, turns:[i]}. Empty actions + an honest | |
| outcome is a valid result (NN#6).""" | |
| agg: dict[tuple[str, str], dict[str, Any]] = {} | |
| for t in turns: | |
| ti = t.get("i") | |
| for tc in t.get("tools", []) or []: | |
| if (tc.get("name") or "") != "Bash": | |
| continue | |
| inp = tc.get("input") if isinstance(tc.get("input"), dict) else {} | |
| cmd = str(inp.get("command", "") or "") | |
| for tag, title, detail in _scan_command(cmd): | |
| row = agg.setdefault((tag, title), {"tag": tag, "title": title, "detail": detail, "turns": set()}) | |
| row["turns"].add(ti) | |
| actions = [] | |
| for row in agg.values(): | |
| r = dict(row) | |
| r["turns"] = sorted(row["turns"]) | |
| actions.append(r) | |
| actions.sort(key=lambda a: (_TAG_ORDER.get(a["tag"], 9), a["turns"][0] if a["turns"] else 1e9, a["title"])) | |
| sensitive_bins = [b for b in (binaries or []) if b.get("security")] | |
| level, risk_reason = risk_level(actions, len(sensitive_bins)) | |
| return { | |
| "riskLevel": level, | |
| "riskReason": risk_reason, | |
| "actions": actions, | |
| "outcome": _outcome(turns), | |
| } | |
| def risk_level(actions: list[dict[str, Any]], sensitive_count: int = 0) -> tuple[str, str]: | |
| """(level, reason) rolled up from a set of actions + count of sensitive tools. | |
| Shared by the per-session and whole-project rollups so they agree. Calibrated so | |
| a single deploy reads Medium and High needs a genuinely alarming combination; | |
| merely USING a sensitive tool only reaches Low.""" | |
| n = {tag: sum(1 for a in actions if a.get("tag") == tag) for tag in _TAG_ORDER} | |
| live, sec, data, net, cfg, dev = n["LIVE"], n["SECURITY"], n["DATA"], n["NETWORK"], n["CONFIG"], n["DEV"] | |
| if (sec and (live or data)) or sec >= 2 or live >= 3 or (data and live): | |
| level = "High" | |
| elif live or sec or data: | |
| level = "Medium" | |
| elif net or cfg or dev or sensitive_count: | |
| level = "Low" | |
| else: | |
| level = "None" | |
| bits = [f"{n[t]} {t.lower()}" for t in ("LIVE", "SECURITY", "DATA", "NETWORK", "CONFIG", "DEV") if n[t]] | |
| if not bits and sensitive_count: | |
| bits.append(f"{sensitive_count} sensitive tool{'s' if sensitive_count != 1 else ''} used") | |
| return level, (", ".join(bits) if bits else "no high-impact actions detected") | |
| def _outcome(turns: list[dict[str, Any]]) -> dict[str, str]: | |
| """A deterministic, modest read of how the session went — a rollup of already- | |
| computed counts (errored tools, retry-loop guides), never a model judgment.""" | |
| total = errored = loops = 0 | |
| last_errored = False | |
| for t in turns: | |
| tl = t.get("tools", []) or [] | |
| errored += sum(1 for tc in tl if tc.get("errored")) | |
| total += len(tl) | |
| g = t.get("guide") | |
| if g and g.get("kind") == "loop": | |
| loops += 1 | |
| if turns: | |
| last_errored = any(tc.get("errored") for tc in (turns[-1].get("tools", []) or [])) | |
| ratio = (errored / total) if total else 0.0 | |
| if loops == 0 and ratio < 0.12 and not last_errored: | |
| label = "Succeeded" | |
| detail = "No retry loops" + (f" · {errored}/{total} tool calls errored" if errored else " · clean run") | |
| elif ratio < 0.30 and not (last_errored and loops): | |
| label = "Completed with retries" | |
| detail = (f"{loops} retry loop{'s' if loops != 1 else ''}" if loops else f"{errored}/{total} tool calls errored") | |
| else: | |
| label = "Rough / check the end" | |
| detail = f"{errored}/{total} tool calls errored" + (f", {loops} retry loop{'s' if loops != 1 else ''}" if loops else "") | |
| return {"label": label, "detail": detail} | |