her / engine /binaries.py
geekwrestler's picture
Squash history (purge pre-scrub demo session blobs)
5f43c7d
"""binaries.py — deterministic extraction of the REAL binaries run via Bash.
Karen's feature request: a session that shows "74 tool calls, all Bash/npx" is
useless. The forensic value is the binary actually invoked — `npx remotion …` is
*remotion*, `cd repo && npx jest` is *jest*, `railway up` is *railway*. This module
pulls that binary out of the command, so it can be surfaced as a first-class
**entity, separate from tool calls** (tool calls stay = the tool/MCP that ran, as
today). Pure code, NO model (Non-negotiable #1); operates on the normalized
Turn[]/ToolCall contract (turn dicts, like `entities.extract_entities`), never raw
JSONL.
It reuses the structural tokenizer from `engine.core.clusters` (segment split,
env-assignment / wrapper / navigation skipping, the `_UBIQUITOUS` denylist) and
adds the one thing clusters deliberately doesn't do: **runner-descent**. clusters
treats `npx` as ubiquitous and stops there; here we step *through* the runner to
the package it ran, because that package is the whole point.
What is surfaced (vs skipped):
* runner-descent — `npx|bunx|uvx|pnpm dlx|yarn dlx|npm exec|pipx run|deno run|
go run|python -m …` → the package/module it runs.
* bare external — `railway`, `docker`, `terraform`, `aws`, `vercel`, … → itself
(head not in `_UBIQUITOUS`, not a runner), with its first
subcommand kept for color (`aws s3 …` → aws, sub `s3`).
* skipped — shell builtins / coreutils / core runtimes & VCS that a user
universally knows (`git`, `npm`, `node`, `make`, `grep`, …):
noise, not a finding. A runner with nothing to descend to
(bare `npx`) is therefore never itself a "binary".
The bare-binary metadata (product name, blurb, logo, security) is merged in from a
JSON registry (see `engine.core.binaries_db`); the enricher fills unknowns in the
background. Matching here is deterministic; enrichment is proposed, never asserted.
"""
from __future__ import annotations
import re
from typing import Any, Optional
# Reuse the proven structural denylist + token rules from clusters (so the two
# agree on what is "universally known" and never need parallel maintenance). We do
# NOT reuse clusters' `_SEG_RE` splitter — it splits on `|`/`;` even inside quotes,
# which is invisible when you only take the FIRST binary (clusters) but produces
# junk when you collect EVERY segment (here). So this module uses its own
# quote-aware splitter + command-substitution unwrapping below.
from engine.core.clusters import (
_ASSIGN_RE,
_NAV_HEADS,
_UBIQUITOUS,
_WRAPPERS,
)
# Single-token runners: the NEXT non-flag token is the package/binary they run.
_RUNNERS_1 = {"npx", "bunx", "uvx"}
# Two-token runners: `<a> <b> <pkg>` (e.g. `pnpm dlx remotion`, `npm exec foo`).
_RUNNERS_2 = {
("npm", "exec"), ("pnpm", "dlx"), ("pnpm", "exec"), ("yarn", "dlx"),
("yarn", "exec"), ("bun", "x"), ("pipx", "run"), ("deno", "run"),
("go", "run"),
}
# `<interp> -m <module>` — module name is the "binary" (`python -m pytest` → pytest).
_DASH_M = {"python", "python3", "py", "uv"}
# Universally-known shell builtins + system/coreutils NOT already in
# clusters._UBIQUITOUS that would otherwise leak as fake "binaries" (e.g. `exit 0`,
# `lsof`, `pkill`). Kept LOCAL so clusters' tested behavior is untouched. This is
# the same STRUCTURAL denylist idea as _UBIQUITOUS (stable for years, no service
# ever needs adding) — NOT a domain map.
_EXTRA_SKIP = {
# shell builtins / keywords
"exit", "local", "declare", "typeset", "let", "shift", "getopts", "hash",
"compgen", "complete", "unalias", "disown", "caller", "mapfile", "readarray",
"times", "ulimit", "enable", "logout", "suspend", "help", "history", "fc",
"coproc", "readonly",
# process / system inspection
"lsof", "pkill", "pgrep", "killall", "timeout", "watch", "nproc", "uptime",
"vm_stat", "memory_pressure", "iostat", "vmstat", "free", "sysctl", "dmesg",
"launchctl", "systemctl", "service", "defaults", "pmset", "caffeinate",
"sw_vers", "system_profiler", "uname", "hostname", "arch", "whoami", "id",
"groups", "who", "tty", "stty", "printenv", "getconf", "locale",
# hashing / encoding / text utils
"md5", "md5sum", "shasum", "sha1sum", "sha256sum", "sha512sum", "cksum",
"base64", "base32", "hexdump", "xxd", "od", "strings", "nl", "tac", "rev",
"fold", "fmt", "expand", "unexpand", "pr", "cmp", "look", "paste", "split",
"csplit", "iconv", "column", "tree", "realpath", "readlink", "mktemp",
"install", "truncate", "shred", "sync", "mkfifo",
# mac clipboard / media / pdf system tools
"pbcopy", "pbpaste", "say", "osascript", "sips", "plutil", "qlmanage",
# net inspection
"netstat", "ifconfig", "route", "arp", "traceroute", "nslookup", "telnet",
"tcpdump", "socat",
# universally-known package manager / VCS tier (like git/npm in _UBIQUITOUS)
"brew", "apt", "apt-get", "dnf", "yum", "pacman", "snap", "port",
}
# Python stdlib utility modules: `python -m json.tool` is correct but noise, not a
# product. Real packages (`python -m pytest`/`ruff`/…) still pass through.
_STDLIB_M_SKIP = {
"json.tool", "py_compile", "venv", "http.server", "site", "ensurepip",
"compileall", "pip", "this", "antigravity", "smtpd", "cgi", "pdb", "timeit",
}
# Redirect operator tokens to skip when scanning for a command head.
_REDIR = {">", ">>", "<", "<<", "<<<", "2>", "2>>", "&>", "&>>", "1>", "2>&1",
"1>&2", ">&", "<&", "|&"}
# npx-style flags to skip before the package; `-p`/`--package` also eats its value.
_RUNNER_FLAGS_VALUE = {"-p", "--package", "-c", "--call"}
# A package spec accepted after a runner: bare name or @scope/name, optional @version
# (stripped). Rejects paths, flags, $(...) and shell noise.
_PKG_RE = re.compile(r"^(@[A-Za-z0-9._-]+/)?[A-Za-z0-9][A-Za-z0-9._-]*(@[^\s]+)?$")
# A bare command name (mirrors clusters._NAME_RE but kept local to avoid coupling).
_NAME_RE = re.compile(r"^[A-Za-z][A-Za-z0-9_.+-]*$")
# A plausible subcommand token for color (`aws s3`, `railway up`, `gh pr`): a bare
# word, NOT a flag, path, filename (no '.'/'/'), var, or redirect.
_SUB_RE = re.compile(r"^[A-Za-z][A-Za-z0-9:_-]*$")
_SUBS_CAP = 6 # never let one binary's subcommand set grow unbounded
def _strip_version(pkg: str) -> str:
"""`remotion@4.0.0` → `remotion`; `@scope/cli@1` → `@scope/cli`. Keeps scope."""
if pkg.startswith("@"):
slash = pkg.find("/")
if slash != -1:
at = pkg.find("@", slash)
return pkg[:at] if at != -1 else pkg
return pkg
at = pkg.find("@")
return pkg[:at] if at > 0 else pkg
def _tokens(seg: str) -> list[str]:
"""Quote-aware word split of one segment: strips matching quotes and keeps a
quoted path (incl. internal spaces) as ONE token, so `"/Applications/Google
Chrome.app/.../Google Chrome"` does not fragment into `Google`/`Chrome`.
Tolerant of unbalanced quotes (never raises — commands in the wild are messy)."""
toks: list[str] = []
buf: list[str] = []
q: Optional[str] = None
for c in seg:
if q:
if c == q:
q = None
else:
buf.append(c)
continue
if c in ("'", '"'):
q = c
continue
if c.isspace():
if buf:
toks.append("".join(buf)); buf = []
continue
buf.append(c)
if buf:
toks.append("".join(buf))
return toks
def _package_after(toks: list[str], start: int) -> str:
"""First real package token at/after `start`, skipping runner flags and
redirects. '' if none."""
i = start
while i < len(toks):
tok = toks[i]
if tok in _RUNNER_FLAGS_VALUE:
i += 2 # flag + its value
continue
if tok in _REDIR or tok.startswith("-"):
i += 1 # redirect / bare flag (-y, --yes, --no-install, …)
continue
# the next non-flag token is the package
if _PKG_RE.match(tok):
return _strip_version(tok)
return ""
return ""
def _segment_binary(seg: str) -> Optional[tuple[str, str, str]]:
"""(binary, via, sub) for ONE command segment, or None.
`via` ∈ {'direct','npx','bunx','uvx','npm exec','pnpm dlx',…,'python -m'}.
`sub` is the first subcommand token for a bare CLI (color), else ''.
Skips leading env-assignments, wrappers (sudo/env/…) and navigation heads
(cd/export/…) exactly like clusters, then either descends a runner or returns
the bare external binary. Ubiquitous heads (git/npm/node/make/grep/…) → None.
"""
toks = _tokens(seg)
i = 0
while i < len(toks) and (
_ASSIGN_RE.match(toks[i]) or toks[i] in _WRAPPERS or toks[i] in _REDIR
):
i += 1
if i >= len(toks):
return None
raw = toks[i]
# device paths / redirects / non-command heads → nothing here
if raw.startswith("/dev/") or raw in _REDIR or any(ch in raw for ch in "<>"):
return None
head = raw.rsplit("/", 1)[-1] # /usr/local/bin/railway → railway
if head in _NAV_HEADS:
return None # cd/export/source/control-keyword segment — nothing here
# --- runner-descent: the package is the binary ------------------------- #
if head in _RUNNERS_1:
pkg = _package_after(toks, i + 1)
return (pkg, head, "") if pkg else None
if i + 1 < len(toks) and (head, toks[i + 1]) in _RUNNERS_2:
via = f"{head} {toks[i + 1]}"
pkg = _package_after(toks, i + 2)
# `go run ./cmd/...` / `deno run script.ts` → a path, not a package: drop.
return (pkg, via, "") if pkg else None
if head in _DASH_M and i + 1 < len(toks) and toks[i + 1] == "-m":
mod = _package_after(toks, i + 2)
if not mod or mod in _STDLIB_M_SKIP:
return None
return (mod, f"{head} -m", "")
# --- bare external CLI (not a runner, not universally-known) ------------ #
# A '.' in a bare head means it's a filename (foo.stderr, bar.py), not a CLI —
# real binaries use '-'/'_' (pg_restore, llama-server, pdftoppm), never '.'.
if head in _UBIQUITOUS or head in _EXTRA_SKIP or "." in head or not _NAME_RE.match(head):
return None
sub = ""
if i + 1 < len(toks) and _SUB_RE.match(toks[i + 1]):
sub = toks[i + 1]
return (head, "direct", sub)
def _split_segments(line: str) -> list[str]:
"""Quote-aware split on top-level `&&` `||` `|` `;` `&` — separators inside
single/double quotes or `$(…)`/backticks are NOT split (so a `|` inside a
grep regex never becomes a pipe)."""
segs: list[str] = []
buf: list[str] = []
i, n = 0, len(line)
quote: Optional[str] = None
depth = 0 # inside $(…) / `…`
while i < n:
c = line[i]
if quote:
buf.append(c)
if c == quote:
quote = None
i += 1
continue
if c in ("'", '"'):
quote = c; buf.append(c); i += 1; continue
if depth > 0:
buf.append(c)
if c == "(":
depth += 1
elif c == ")":
depth -= 1
i += 1
continue
if c == "$" and i + 1 < n and line[i + 1] == "(":
depth = 1; buf.append("$("); i += 2; continue
if line.startswith("&&", i) or line.startswith("||", i):
segs.append("".join(buf)); buf = []; i += 2; continue
if c in (";", "|", "&", "\n"):
segs.append("".join(buf)); buf = []; i += 1; continue
buf.append(c); i += 1
if buf:
segs.append("".join(buf))
return segs
def _extract_subs(s: str) -> tuple[str, list[str]]:
"""Pull `$(…)` and backtick command-substitution bodies out of `s`.
Returns (outer_with_subs_blanked, [inner_command, …]) so the OUTER parse sees
`PUBURL= ` (a bare assignment → no binary) and the INNER `railway variables …`
is analyzed on its own. Quote-aware; one level here, recursion handles nesting.
"""
inners: list[str] = []
out: list[str] = []
i, n = 0, len(s)
quote: Optional[str] = None
while i < n:
c = s[i]
if quote:
out.append(c)
if c == quote:
quote = None
i += 1
continue
if c in ("'", '"'):
quote = c; out.append(c); i += 1; continue
if c == "$" and i + 1 < n and s[i + 1] == "(":
depth, j, inner = 1, i + 2, []
while j < n and depth > 0:
cj = s[j]
if cj == "(":
depth += 1
elif cj == ")":
depth -= 1
if depth > 0:
inner.append(cj)
j += 1
inners.append("".join(inner)); out.append(" "); i = j; continue
if c == "`":
j, inner = i + 1, []
while j < n and s[j] != "`":
inner.append(s[j]); j += 1
inners.append("".join(inner)); out.append(" "); i = j + 1; continue
out.append(c); i += 1
return "".join(out), inners
def extract_command_binaries(cmd: str, _depth: int = 0) -> dict[str, dict[str, Any]]:
"""All interesting binaries invoked by ONE Bash command, deduped.
Returns {binary: {'via': str, 'subs': set[str]}}. First LINE only (heredoc
bodies / multiline scripts never become binaries — same rule as clusters);
every quote-aware `&&`/`||`/`|`/`;` segment is inspected, so `railway up &&
docker build` yields BOTH, and command substitutions (`X=$(railway …)`) are
unwrapped and their inner command analyzed too.
"""
out: dict[str, dict[str, Any]] = {}
if not cmd or _depth > 4:
return out
first_line = cmd.strip().split("\n", 1)[0]
outer, inners = _extract_subs(first_line)
def _add(name: str, via: str, sub: str) -> None:
row = out.setdefault(name, {"via": via, "subs": set()})
if sub and len(row["subs"]) < _SUBS_CAP:
row["subs"].add(sub)
for seg in _split_segments(outer):
hit = _segment_binary(seg)
if hit:
_add(*hit)
for inner in inners: # recurse into $(…)/`…` bodies
for name, meta in extract_command_binaries(inner, _depth + 1).items():
_add(name, meta["via"], "")
out[name]["subs"].update(meta["subs"])
return out
def extract_binaries(
turns: list[dict[str, Any]],
db: Optional[dict[str, dict[str, Any]]] = None,
) -> list[dict[str, Any]]:
"""Per-session binary inventory, sorted by descending use, each traceable to
the turns it ran in — and merged with registry metadata when known.
`turns` are the CONTRACT dicts (post `to_dict`), exactly like the input to
`entities.extract_entities`. `count` = number of Bash tool_calls that invoked
the binary (deduped within a call). Each row:
{ binary, name, count, turns:[i], via, subs:[…],
identified, product?, blurb?, homepage?, logo?, security?, source?, updated? }
Unknown binaries still appear (identified:false) with the bare name — the
enricher upgrades them later. `db` is the merged curated+learned registry.
"""
agg: dict[str, dict[str, Any]] = {}
for t in turns:
ti = t.get("i")
for tc in t.get("tools", []) or []:
if (tc.get("name") or "") != "Bash":
continue
inp = tc.get("input") if isinstance(tc.get("input"), dict) else {}
cmd = str(inp.get("command", "") or "")
for name, meta in extract_command_binaries(cmd).items():
row = agg.setdefault(
name,
{"name": name, "count": 0, "turns": set(), "via": meta["via"], "subs": set()},
)
row["count"] += 1
row["turns"].add(ti)
row["subs"].update(meta["subs"])
db = db or {}
out: list[dict[str, Any]] = []
for name, row in agg.items():
r: dict[str, Any] = {
"binary": name,
"name": name,
"count": row["count"],
"turns": sorted(row["turns"]),
"via": row["via"],
"subs": sorted(row["subs"]),
}
meta = db.get(name) or db.get(name.lower())
if isinstance(meta, dict) and (meta.get("product") or meta.get("blurb")):
r["identified"] = True
for k in ("product", "blurb", "homepage", "logo", "security", "source", "updated"):
if meta.get(k) is not None:
r[k] = meta[k]
else:
r["identified"] = False
out.append(r)
out.sort(key=lambda x: (-x["count"], x["name"]))
return out
def unknown_binary_names(binaries: list[dict[str, Any]]) -> list[dict[str, str]]:
"""The not-yet-identified binaries, as {name, via} — the enricher's work queue.
Bare data only (name + how it was invoked); never any command text."""
return [
{"name": b["binary"], "via": b.get("via", "direct")}
for b in binaries
if not b.get("identified")
]