Spaces:
Running on Zero
Running on Zero
File size: 10,117 Bytes
5f43c7d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 | """impact.py — "Actions worth reviewing", risk level, and session outcome.
The Session/Project Report's safety lens. DETERMINISTIC, NO model (Non-negotiable
#1): it scans the Bash commands a session ran for NAMED, high-impact operations
and reports them with the turn(s) they happened in, plus a rolled-up risk level and
a plain outcome read. SUGGEST, never assert (NN#7).
The rules are DATA, not code: they live in `narrator/knowledge/impact-rules.json`
(editable — add a tool by adding a rule, no code change) and cover the common
stack (railway, vercel, netlify, fly, cloudflare, aws, gcloud, azure, kubernetes,
docker, terraform, supabase, heroku, n8n, …). The guiding principle, per the
owner: in Her, ANY modification to a running/deployed service counts — deploy,
restart, scale, exec-in, hosted-var change — not just a literal "deploy". So the
detector flags the OPERATION (e.g. `railway ssh`/`run`, `kubectl exec`, a deploy),
which is robust even when the real change runs from a file the command-text can't
see (`psql -f migrate.sql`).
PRECISION via structural guards applied to EVERY rule (so it doesn't cry wolf):
* read/search heads are skipped — `grep "CREATE ROLE"`, `cat .env | grep`,
`cat > f <<SQL` are looking FOR / writing the text, not doing it.
* `--help` / `--version` / `-h` / a bare `help` subcommand are skipped (probing,
not acting — e.g. `railway up --help`).
* SQL-privilege/data rules REQUIRE a real database client on the command's FIRST
line (psql/mysql/surreal/…), so a python heredoc carrying "CREATE ROLE" as a
string, or a grep for it, never counts.
Operates on the CONTRACT dicts (turn dicts post to_dict), never raw JSONL.
"""
from __future__ import annotations
import json
import os
import re
from typing import Any, Optional
_HERE = os.path.dirname(os.path.abspath(__file__))
_REPO = os.path.dirname(os.path.dirname(_HERE))
_RULES_PATH = os.path.join(_REPO, "narrator", "knowledge", "impact-rules.json")
# Heads that mean the line is READING/SEARCHING for a pattern, not doing it.
_READ_HEADS = (
"grep", "egrep", "fgrep", "rg", "ag", "ack", "sed", "awk", "cat", "echo",
"printf", "less", "more", "head", "tail", "wc", "find", "cut", "sort", "uniq",
"diff", "comm", "tr", "column", "jq", "yq", "tee",
)
_READ_HEAD_RE = re.compile(r"^\s*(" + "|".join(_READ_HEADS) + r")\b")
# A --help / --version / -h / bare-`help` invocation: probing a command, not running it.
_HELP_RE = re.compile(r"(--help\b|--version\b|(?:^|\s)-h(?=\s|$)|(?:^|\s)help(?=\s|$))", re.IGNORECASE)
# Segment separators (also newline, for heredoc commands).
_SEP_RE = re.compile(r"&&|\|\|?|;|\n")
# A real DB CLIENT must be present (first line) for a SQL rule to count as RUN.
_DB_EXEC_RE = re.compile(
r"\b(psql|mysql|mariadb|surreal(?:\s+sql)?|mongosh|mongo|sqlite3|cockroach|"
r"createuser|createdb|dropuser|pg_restore|railway\s+connect|railway\s+run)\b",
re.IGNORECASE,
)
# Per-tag severity ordering for display + rollup. DEV (local servers/containers)
# is last — it's flagged for visibility but only ever reaches "Low" risk.
_TAG_ORDER = {"LIVE": 0, "SECURITY": 1, "DATA": 2, "NETWORK": 3, "CONFIG": 4, "DEV": 5}
_RULES_CACHE: Optional[tuple[int, list[dict[str, Any]]]] = None
def _load_rules() -> list[dict[str, Any]]:
"""Compile the editable ruleset (cached by mtime; robust to missing/corrupt -> [])."""
global _RULES_CACHE
try:
mtime = os.stat(_RULES_PATH).st_mtime_ns
except OSError:
return []
if _RULES_CACHE and _RULES_CACHE[0] == mtime:
return _RULES_CACHE[1]
try:
with open(_RULES_PATH, encoding="utf-8") as f:
raw = json.load(f)
except (OSError, ValueError):
return []
out: list[dict[str, Any]] = []
for r in raw.get("rules", []) if isinstance(raw, dict) else []:
if not isinstance(r, dict) or not r.get("match") or not r.get("tag"):
continue
try:
rx = re.compile(r["match"], 0 if r.get("caseSensitive") else re.IGNORECASE)
except re.error:
continue # a bad regex in the data file never breaks analysis
out.append({
"tag": r["tag"], "title": r.get("title", r["tag"]),
"detail": r.get("detail", ""), "rx": rx,
"multiline": bool(r.get("multiline")),
"requiresDbClient": bool(r.get("requiresDbClient")),
})
_RULES_CACHE = (mtime, out)
return out
def _first_line(cmd: str) -> str:
return cmd.strip().split("\n", 1)[0]
def _segment(text: str, pos: int) -> str:
"""The command segment (between top-level separators / newlines) containing
the match at `pos` — used to test read-head and --help context."""
start = 0
for m in _SEP_RE.finditer(text[:pos]):
start = m.end()
nxt = _SEP_RE.search(text, pos)
end = nxt.start() if nxt else len(text)
return text[start:end]
def _scan_command(cmd: str) -> list[tuple[str, str, str]]:
"""[(tag, title, detail)] for one Bash command (deduped per command)."""
if not cmd:
return []
first = _first_line(cmd)
out: list[tuple[str, str, str]] = []
seen: set[tuple[str, str]] = set()
for rule in _load_rules():
hay = cmd if rule["multiline"] else first
if rule["requiresDbClient"] and not _DB_EXEC_RE.search(first):
continue # SQL text with no real db client on the invocation line
# Check EVERY match, not just the first — a command often echoes a label
# ("echo '=== railway connect ==='; railway connect …"): the first match is
# inside the echo (a read-head segment) while the REAL one is a later
# segment. Fire on the first match whose segment passes the guards.
for m in rule["rx"].finditer(hay):
seg = _segment(hay, m.start())
if _READ_HEAD_RE.match(seg) or _HELP_RE.search(seg):
continue # reading/searching for it, or just probing --help
key = (rule["tag"], rule["title"])
if key not in seen:
seen.add(key)
out.append((rule["tag"], rule["title"], rule["detail"]))
break
return out
def detect_impact(
turns: list[dict[str, Any]],
binaries: Optional[list[dict[str, Any]]] = None,
) -> dict[str, Any]:
"""Return {riskLevel, riskReason, actions:[…], outcome:{label,detail}}.
`actions` items: {tag, title, detail, turns:[i]}. Empty actions + an honest
outcome is a valid result (NN#6)."""
agg: dict[tuple[str, str], dict[str, Any]] = {}
for t in turns:
ti = t.get("i")
for tc in t.get("tools", []) or []:
if (tc.get("name") or "") != "Bash":
continue
inp = tc.get("input") if isinstance(tc.get("input"), dict) else {}
cmd = str(inp.get("command", "") or "")
for tag, title, detail in _scan_command(cmd):
row = agg.setdefault((tag, title), {"tag": tag, "title": title, "detail": detail, "turns": set()})
row["turns"].add(ti)
actions = []
for row in agg.values():
r = dict(row)
r["turns"] = sorted(row["turns"])
actions.append(r)
actions.sort(key=lambda a: (_TAG_ORDER.get(a["tag"], 9), a["turns"][0] if a["turns"] else 1e9, a["title"]))
sensitive_bins = [b for b in (binaries or []) if b.get("security")]
level, risk_reason = risk_level(actions, len(sensitive_bins))
return {
"riskLevel": level,
"riskReason": risk_reason,
"actions": actions,
"outcome": _outcome(turns),
}
def risk_level(actions: list[dict[str, Any]], sensitive_count: int = 0) -> tuple[str, str]:
"""(level, reason) rolled up from a set of actions + count of sensitive tools.
Shared by the per-session and whole-project rollups so they agree. Calibrated so
a single deploy reads Medium and High needs a genuinely alarming combination;
merely USING a sensitive tool only reaches Low."""
n = {tag: sum(1 for a in actions if a.get("tag") == tag) for tag in _TAG_ORDER}
live, sec, data, net, cfg, dev = n["LIVE"], n["SECURITY"], n["DATA"], n["NETWORK"], n["CONFIG"], n["DEV"]
if (sec and (live or data)) or sec >= 2 or live >= 3 or (data and live):
level = "High"
elif live or sec or data:
level = "Medium"
elif net or cfg or dev or sensitive_count:
level = "Low"
else:
level = "None"
bits = [f"{n[t]} {t.lower()}" for t in ("LIVE", "SECURITY", "DATA", "NETWORK", "CONFIG", "DEV") if n[t]]
if not bits and sensitive_count:
bits.append(f"{sensitive_count} sensitive tool{'s' if sensitive_count != 1 else ''} used")
return level, (", ".join(bits) if bits else "no high-impact actions detected")
def _outcome(turns: list[dict[str, Any]]) -> dict[str, str]:
"""A deterministic, modest read of how the session went — a rollup of already-
computed counts (errored tools, retry-loop guides), never a model judgment."""
total = errored = loops = 0
last_errored = False
for t in turns:
tl = t.get("tools", []) or []
errored += sum(1 for tc in tl if tc.get("errored"))
total += len(tl)
g = t.get("guide")
if g and g.get("kind") == "loop":
loops += 1
if turns:
last_errored = any(tc.get("errored") for tc in (turns[-1].get("tools", []) or []))
ratio = (errored / total) if total else 0.0
if loops == 0 and ratio < 0.12 and not last_errored:
label = "Succeeded"
detail = "No retry loops" + (f" · {errored}/{total} tool calls errored" if errored else " · clean run")
elif ratio < 0.30 and not (last_errored and loops):
label = "Completed with retries"
detail = (f"{loops} retry loop{'s' if loops != 1 else ''}" if loops else f"{errored}/{total} tool calls errored")
else:
label = "Rough / check the end"
detail = f"{errored}/{total} tool calls errored" + (f", {loops} retry loop{'s' if loops != 1 else ''}" if loops else "")
return {"label": label, "detail": detail}
|