File size: 10,117 Bytes
5f43c7d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
"""impact.py — "Actions worth reviewing", risk level, and session outcome.

The Session/Project Report's safety lens. DETERMINISTIC, NO model (Non-negotiable
#1): it scans the Bash commands a session ran for NAMED, high-impact operations
and reports them with the turn(s) they happened in, plus a rolled-up risk level and
a plain outcome read. SUGGEST, never assert (NN#7).

The rules are DATA, not code: they live in `narrator/knowledge/impact-rules.json`
(editable — add a tool by adding a rule, no code change) and cover the common
stack (railway, vercel, netlify, fly, cloudflare, aws, gcloud, azure, kubernetes,
docker, terraform, supabase, heroku, n8n, …). The guiding principle, per the
owner: in Her, ANY modification to a running/deployed service counts — deploy,
restart, scale, exec-in, hosted-var change — not just a literal "deploy". So the
detector flags the OPERATION (e.g. `railway ssh`/`run`, `kubectl exec`, a deploy),
which is robust even when the real change runs from a file the command-text can't
see (`psql -f migrate.sql`).

PRECISION via structural guards applied to EVERY rule (so it doesn't cry wolf):
  * read/search heads are skipped — `grep "CREATE ROLE"`, `cat .env | grep`,
    `cat > f <<SQL` are looking FOR / writing the text, not doing it.
  * `--help` / `--version` / `-h` / a bare `help` subcommand are skipped (probing,
    not acting — e.g. `railway up --help`).
  * SQL-privilege/data rules REQUIRE a real database client on the command's FIRST
    line (psql/mysql/surreal/…), so a python heredoc carrying "CREATE ROLE" as a
    string, or a grep for it, never counts.

Operates on the CONTRACT dicts (turn dicts post to_dict), never raw JSONL.
"""
from __future__ import annotations

import json
import os
import re
from typing import Any, Optional

_HERE = os.path.dirname(os.path.abspath(__file__))
_REPO = os.path.dirname(os.path.dirname(_HERE))
_RULES_PATH = os.path.join(_REPO, "narrator", "knowledge", "impact-rules.json")

# Heads that mean the line is READING/SEARCHING for a pattern, not doing it.
_READ_HEADS = (
    "grep", "egrep", "fgrep", "rg", "ag", "ack", "sed", "awk", "cat", "echo",
    "printf", "less", "more", "head", "tail", "wc", "find", "cut", "sort", "uniq",
    "diff", "comm", "tr", "column", "jq", "yq", "tee",
)
_READ_HEAD_RE = re.compile(r"^\s*(" + "|".join(_READ_HEADS) + r")\b")
# A --help / --version / -h / bare-`help` invocation: probing a command, not running it.
_HELP_RE = re.compile(r"(--help\b|--version\b|(?:^|\s)-h(?=\s|$)|(?:^|\s)help(?=\s|$))", re.IGNORECASE)
# Segment separators (also newline, for heredoc commands).
_SEP_RE = re.compile(r"&&|\|\|?|;|\n")
# A real DB CLIENT must be present (first line) for a SQL rule to count as RUN.
_DB_EXEC_RE = re.compile(
    r"\b(psql|mysql|mariadb|surreal(?:\s+sql)?|mongosh|mongo|sqlite3|cockroach|"
    r"createuser|createdb|dropuser|pg_restore|railway\s+connect|railway\s+run)\b",
    re.IGNORECASE,
)

# Per-tag severity ordering for display + rollup. DEV (local servers/containers)
# is last — it's flagged for visibility but only ever reaches "Low" risk.
_TAG_ORDER = {"LIVE": 0, "SECURITY": 1, "DATA": 2, "NETWORK": 3, "CONFIG": 4, "DEV": 5}

_RULES_CACHE: Optional[tuple[int, list[dict[str, Any]]]] = None


def _load_rules() -> list[dict[str, Any]]:
    """Compile the editable ruleset (cached by mtime; robust to missing/corrupt -> [])."""
    global _RULES_CACHE
    try:
        mtime = os.stat(_RULES_PATH).st_mtime_ns
    except OSError:
        return []
    if _RULES_CACHE and _RULES_CACHE[0] == mtime:
        return _RULES_CACHE[1]
    try:
        with open(_RULES_PATH, encoding="utf-8") as f:
            raw = json.load(f)
    except (OSError, ValueError):
        return []
    out: list[dict[str, Any]] = []
    for r in raw.get("rules", []) if isinstance(raw, dict) else []:
        if not isinstance(r, dict) or not r.get("match") or not r.get("tag"):
            continue
        try:
            rx = re.compile(r["match"], 0 if r.get("caseSensitive") else re.IGNORECASE)
        except re.error:
            continue  # a bad regex in the data file never breaks analysis
        out.append({
            "tag": r["tag"], "title": r.get("title", r["tag"]),
            "detail": r.get("detail", ""), "rx": rx,
            "multiline": bool(r.get("multiline")),
            "requiresDbClient": bool(r.get("requiresDbClient")),
        })
    _RULES_CACHE = (mtime, out)
    return out


def _first_line(cmd: str) -> str:
    return cmd.strip().split("\n", 1)[0]


def _segment(text: str, pos: int) -> str:
    """The command segment (between top-level separators / newlines) containing
    the match at `pos` — used to test read-head and --help context."""
    start = 0
    for m in _SEP_RE.finditer(text[:pos]):
        start = m.end()
    nxt = _SEP_RE.search(text, pos)
    end = nxt.start() if nxt else len(text)
    return text[start:end]


def _scan_command(cmd: str) -> list[tuple[str, str, str]]:
    """[(tag, title, detail)] for one Bash command (deduped per command)."""
    if not cmd:
        return []
    first = _first_line(cmd)
    out: list[tuple[str, str, str]] = []
    seen: set[tuple[str, str]] = set()
    for rule in _load_rules():
        hay = cmd if rule["multiline"] else first
        if rule["requiresDbClient"] and not _DB_EXEC_RE.search(first):
            continue  # SQL text with no real db client on the invocation line
        # Check EVERY match, not just the first — a command often echoes a label
        # ("echo '=== railway connect ==='; railway connect …"): the first match is
        # inside the echo (a read-head segment) while the REAL one is a later
        # segment. Fire on the first match whose segment passes the guards.
        for m in rule["rx"].finditer(hay):
            seg = _segment(hay, m.start())
            if _READ_HEAD_RE.match(seg) or _HELP_RE.search(seg):
                continue  # reading/searching for it, or just probing --help
            key = (rule["tag"], rule["title"])
            if key not in seen:
                seen.add(key)
                out.append((rule["tag"], rule["title"], rule["detail"]))
            break
    return out


def detect_impact(
    turns: list[dict[str, Any]],
    binaries: Optional[list[dict[str, Any]]] = None,
) -> dict[str, Any]:
    """Return {riskLevel, riskReason, actions:[…], outcome:{label,detail}}.

    `actions` items: {tag, title, detail, turns:[i]}. Empty actions + an honest
    outcome is a valid result (NN#6)."""
    agg: dict[tuple[str, str], dict[str, Any]] = {}
    for t in turns:
        ti = t.get("i")
        for tc in t.get("tools", []) or []:
            if (tc.get("name") or "") != "Bash":
                continue
            inp = tc.get("input") if isinstance(tc.get("input"), dict) else {}
            cmd = str(inp.get("command", "") or "")
            for tag, title, detail in _scan_command(cmd):
                row = agg.setdefault((tag, title), {"tag": tag, "title": title, "detail": detail, "turns": set()})
                row["turns"].add(ti)

    actions = []
    for row in agg.values():
        r = dict(row)
        r["turns"] = sorted(row["turns"])
        actions.append(r)
    actions.sort(key=lambda a: (_TAG_ORDER.get(a["tag"], 9), a["turns"][0] if a["turns"] else 1e9, a["title"]))

    sensitive_bins = [b for b in (binaries or []) if b.get("security")]
    level, risk_reason = risk_level(actions, len(sensitive_bins))
    return {
        "riskLevel": level,
        "riskReason": risk_reason,
        "actions": actions,
        "outcome": _outcome(turns),
    }


def risk_level(actions: list[dict[str, Any]], sensitive_count: int = 0) -> tuple[str, str]:
    """(level, reason) rolled up from a set of actions + count of sensitive tools.
    Shared by the per-session and whole-project rollups so they agree. Calibrated so
    a single deploy reads Medium and High needs a genuinely alarming combination;
    merely USING a sensitive tool only reaches Low."""
    n = {tag: sum(1 for a in actions if a.get("tag") == tag) for tag in _TAG_ORDER}
    live, sec, data, net, cfg, dev = n["LIVE"], n["SECURITY"], n["DATA"], n["NETWORK"], n["CONFIG"], n["DEV"]
    if (sec and (live or data)) or sec >= 2 or live >= 3 or (data and live):
        level = "High"
    elif live or sec or data:
        level = "Medium"
    elif net or cfg or dev or sensitive_count:
        level = "Low"
    else:
        level = "None"
    bits = [f"{n[t]} {t.lower()}" for t in ("LIVE", "SECURITY", "DATA", "NETWORK", "CONFIG", "DEV") if n[t]]
    if not bits and sensitive_count:
        bits.append(f"{sensitive_count} sensitive tool{'s' if sensitive_count != 1 else ''} used")
    return level, (", ".join(bits) if bits else "no high-impact actions detected")


def _outcome(turns: list[dict[str, Any]]) -> dict[str, str]:
    """A deterministic, modest read of how the session went — a rollup of already-
    computed counts (errored tools, retry-loop guides), never a model judgment."""
    total = errored = loops = 0
    last_errored = False
    for t in turns:
        tl = t.get("tools", []) or []
        errored += sum(1 for tc in tl if tc.get("errored"))
        total += len(tl)
        g = t.get("guide")
        if g and g.get("kind") == "loop":
            loops += 1
    if turns:
        last_errored = any(tc.get("errored") for tc in (turns[-1].get("tools", []) or []))
    ratio = (errored / total) if total else 0.0

    if loops == 0 and ratio < 0.12 and not last_errored:
        label = "Succeeded"
        detail = "No retry loops" + (f" · {errored}/{total} tool calls errored" if errored else " · clean run")
    elif ratio < 0.30 and not (last_errored and loops):
        label = "Completed with retries"
        detail = (f"{loops} retry loop{'s' if loops != 1 else ''}" if loops else f"{errored}/{total} tool calls errored")
    else:
        label = "Rough / check the end"
        detail = f"{errored}/{total} tool calls errored" + (f", {loops} retry loop{'s' if loops != 1 else ''}" if loops else "")
    return {"label": label, "detail": detail}