"""binaries.py — deterministic extraction of the REAL binaries run via Bash. Karen's feature request: a session that shows "74 tool calls, all Bash/npx" is useless. The forensic value is the binary actually invoked — `npx remotion …` is *remotion*, `cd repo && npx jest` is *jest*, `railway up` is *railway*. This module pulls that binary out of the command, so it can be surfaced as a first-class **entity, separate from tool calls** (tool calls stay = the tool/MCP that ran, as today). Pure code, NO model (Non-negotiable #1); operates on the normalized Turn[]/ToolCall contract (turn dicts, like `entities.extract_entities`), never raw JSONL. It reuses the structural tokenizer from `engine.core.clusters` (segment split, env-assignment / wrapper / navigation skipping, the `_UBIQUITOUS` denylist) and adds the one thing clusters deliberately doesn't do: **runner-descent**. clusters treats `npx` as ubiquitous and stops there; here we step *through* the runner to the package it ran, because that package is the whole point. What is surfaced (vs skipped): * runner-descent — `npx|bunx|uvx|pnpm dlx|yarn dlx|npm exec|pipx run|deno run| go run|python -m …` → the package/module it runs. * bare external — `railway`, `docker`, `terraform`, `aws`, `vercel`, … → itself (head not in `_UBIQUITOUS`, not a runner), with its first subcommand kept for color (`aws s3 …` → aws, sub `s3`). * skipped — shell builtins / coreutils / core runtimes & VCS that a user universally knows (`git`, `npm`, `node`, `make`, `grep`, …): noise, not a finding. A runner with nothing to descend to (bare `npx`) is therefore never itself a "binary". The bare-binary metadata (product name, blurb, logo, security) is merged in from a JSON registry (see `engine.core.binaries_db`); the enricher fills unknowns in the background. Matching here is deterministic; enrichment is proposed, never asserted. """ from __future__ import annotations import re from typing import Any, Optional # Reuse the proven structural denylist + token rules from clusters (so the two # agree on what is "universally known" and never need parallel maintenance). We do # NOT reuse clusters' `_SEG_RE` splitter — it splits on `|`/`;` even inside quotes, # which is invisible when you only take the FIRST binary (clusters) but produces # junk when you collect EVERY segment (here). So this module uses its own # quote-aware splitter + command-substitution unwrapping below. from engine.core.clusters import ( _ASSIGN_RE, _NAV_HEADS, _UBIQUITOUS, _WRAPPERS, ) # Single-token runners: the NEXT non-flag token is the package/binary they run. _RUNNERS_1 = {"npx", "bunx", "uvx"} # Two-token runners: ` ` (e.g. `pnpm dlx remotion`, `npm exec foo`). _RUNNERS_2 = { ("npm", "exec"), ("pnpm", "dlx"), ("pnpm", "exec"), ("yarn", "dlx"), ("yarn", "exec"), ("bun", "x"), ("pipx", "run"), ("deno", "run"), ("go", "run"), } # ` -m ` — module name is the "binary" (`python -m pytest` → pytest). _DASH_M = {"python", "python3", "py", "uv"} # Universally-known shell builtins + system/coreutils NOT already in # clusters._UBIQUITOUS that would otherwise leak as fake "binaries" (e.g. `exit 0`, # `lsof`, `pkill`). Kept LOCAL so clusters' tested behavior is untouched. This is # the same STRUCTURAL denylist idea as _UBIQUITOUS (stable for years, no service # ever needs adding) — NOT a domain map. _EXTRA_SKIP = { # shell builtins / keywords "exit", "local", "declare", "typeset", "let", "shift", "getopts", "hash", "compgen", "complete", "unalias", "disown", "caller", "mapfile", "readarray", "times", "ulimit", "enable", "logout", "suspend", "help", "history", "fc", "coproc", "readonly", # process / system inspection "lsof", "pkill", "pgrep", "killall", "timeout", "watch", "nproc", "uptime", "vm_stat", "memory_pressure", "iostat", "vmstat", "free", "sysctl", "dmesg", "launchctl", "systemctl", "service", "defaults", "pmset", "caffeinate", "sw_vers", "system_profiler", "uname", "hostname", "arch", "whoami", "id", "groups", "who", "tty", "stty", "printenv", "getconf", "locale", # hashing / encoding / text utils "md5", "md5sum", "shasum", "sha1sum", "sha256sum", "sha512sum", "cksum", "base64", "base32", "hexdump", "xxd", "od", "strings", "nl", "tac", "rev", "fold", "fmt", "expand", "unexpand", "pr", "cmp", "look", "paste", "split", "csplit", "iconv", "column", "tree", "realpath", "readlink", "mktemp", "install", "truncate", "shred", "sync", "mkfifo", # mac clipboard / media / pdf system tools "pbcopy", "pbpaste", "say", "osascript", "sips", "plutil", "qlmanage", # net inspection "netstat", "ifconfig", "route", "arp", "traceroute", "nslookup", "telnet", "tcpdump", "socat", # universally-known package manager / VCS tier (like git/npm in _UBIQUITOUS) "brew", "apt", "apt-get", "dnf", "yum", "pacman", "snap", "port", } # Python stdlib utility modules: `python -m json.tool` is correct but noise, not a # product. Real packages (`python -m pytest`/`ruff`/…) still pass through. _STDLIB_M_SKIP = { "json.tool", "py_compile", "venv", "http.server", "site", "ensurepip", "compileall", "pip", "this", "antigravity", "smtpd", "cgi", "pdb", "timeit", } # Redirect operator tokens to skip when scanning for a command head. _REDIR = {">", ">>", "<", "<<", "<<<", "2>", "2>>", "&>", "&>>", "1>", "2>&1", "1>&2", ">&", "<&", "|&"} # npx-style flags to skip before the package; `-p`/`--package` also eats its value. _RUNNER_FLAGS_VALUE = {"-p", "--package", "-c", "--call"} # A package spec accepted after a runner: bare name or @scope/name, optional @version # (stripped). Rejects paths, flags, $(...) and shell noise. _PKG_RE = re.compile(r"^(@[A-Za-z0-9._-]+/)?[A-Za-z0-9][A-Za-z0-9._-]*(@[^\s]+)?$") # A bare command name (mirrors clusters._NAME_RE but kept local to avoid coupling). _NAME_RE = re.compile(r"^[A-Za-z][A-Za-z0-9_.+-]*$") # A plausible subcommand token for color (`aws s3`, `railway up`, `gh pr`): a bare # word, NOT a flag, path, filename (no '.'/'/'), var, or redirect. _SUB_RE = re.compile(r"^[A-Za-z][A-Za-z0-9:_-]*$") _SUBS_CAP = 6 # never let one binary's subcommand set grow unbounded def _strip_version(pkg: str) -> str: """`remotion@4.0.0` → `remotion`; `@scope/cli@1` → `@scope/cli`. Keeps scope.""" if pkg.startswith("@"): slash = pkg.find("/") if slash != -1: at = pkg.find("@", slash) return pkg[:at] if at != -1 else pkg return pkg at = pkg.find("@") return pkg[:at] if at > 0 else pkg def _tokens(seg: str) -> list[str]: """Quote-aware word split of one segment: strips matching quotes and keeps a quoted path (incl. internal spaces) as ONE token, so `"/Applications/Google Chrome.app/.../Google Chrome"` does not fragment into `Google`/`Chrome`. Tolerant of unbalanced quotes (never raises — commands in the wild are messy).""" toks: list[str] = [] buf: list[str] = [] q: Optional[str] = None for c in seg: if q: if c == q: q = None else: buf.append(c) continue if c in ("'", '"'): q = c continue if c.isspace(): if buf: toks.append("".join(buf)); buf = [] continue buf.append(c) if buf: toks.append("".join(buf)) return toks def _package_after(toks: list[str], start: int) -> str: """First real package token at/after `start`, skipping runner flags and redirects. '' if none.""" i = start while i < len(toks): tok = toks[i] if tok in _RUNNER_FLAGS_VALUE: i += 2 # flag + its value continue if tok in _REDIR or tok.startswith("-"): i += 1 # redirect / bare flag (-y, --yes, --no-install, …) continue # the next non-flag token is the package if _PKG_RE.match(tok): return _strip_version(tok) return "" return "" def _segment_binary(seg: str) -> Optional[tuple[str, str, str]]: """(binary, via, sub) for ONE command segment, or None. `via` ∈ {'direct','npx','bunx','uvx','npm exec','pnpm dlx',…,'python -m'}. `sub` is the first subcommand token for a bare CLI (color), else ''. Skips leading env-assignments, wrappers (sudo/env/…) and navigation heads (cd/export/…) exactly like clusters, then either descends a runner or returns the bare external binary. Ubiquitous heads (git/npm/node/make/grep/…) → None. """ toks = _tokens(seg) i = 0 while i < len(toks) and ( _ASSIGN_RE.match(toks[i]) or toks[i] in _WRAPPERS or toks[i] in _REDIR ): i += 1 if i >= len(toks): return None raw = toks[i] # device paths / redirects / non-command heads → nothing here if raw.startswith("/dev/") or raw in _REDIR or any(ch in raw for ch in "<>"): return None head = raw.rsplit("/", 1)[-1] # /usr/local/bin/railway → railway if head in _NAV_HEADS: return None # cd/export/source/control-keyword segment — nothing here # --- runner-descent: the package is the binary ------------------------- # if head in _RUNNERS_1: pkg = _package_after(toks, i + 1) return (pkg, head, "") if pkg else None if i + 1 < len(toks) and (head, toks[i + 1]) in _RUNNERS_2: via = f"{head} {toks[i + 1]}" pkg = _package_after(toks, i + 2) # `go run ./cmd/...` / `deno run script.ts` → a path, not a package: drop. return (pkg, via, "") if pkg else None if head in _DASH_M and i + 1 < len(toks) and toks[i + 1] == "-m": mod = _package_after(toks, i + 2) if not mod or mod in _STDLIB_M_SKIP: return None return (mod, f"{head} -m", "") # --- bare external CLI (not a runner, not universally-known) ------------ # # A '.' in a bare head means it's a filename (foo.stderr, bar.py), not a CLI — # real binaries use '-'/'_' (pg_restore, llama-server, pdftoppm), never '.'. if head in _UBIQUITOUS or head in _EXTRA_SKIP or "." in head or not _NAME_RE.match(head): return None sub = "" if i + 1 < len(toks) and _SUB_RE.match(toks[i + 1]): sub = toks[i + 1] return (head, "direct", sub) def _split_segments(line: str) -> list[str]: """Quote-aware split on top-level `&&` `||` `|` `;` `&` — separators inside single/double quotes or `$(…)`/backticks are NOT split (so a `|` inside a grep regex never becomes a pipe).""" segs: list[str] = [] buf: list[str] = [] i, n = 0, len(line) quote: Optional[str] = None depth = 0 # inside $(…) / `…` while i < n: c = line[i] if quote: buf.append(c) if c == quote: quote = None i += 1 continue if c in ("'", '"'): quote = c; buf.append(c); i += 1; continue if depth > 0: buf.append(c) if c == "(": depth += 1 elif c == ")": depth -= 1 i += 1 continue if c == "$" and i + 1 < n and line[i + 1] == "(": depth = 1; buf.append("$("); i += 2; continue if line.startswith("&&", i) or line.startswith("||", i): segs.append("".join(buf)); buf = []; i += 2; continue if c in (";", "|", "&", "\n"): segs.append("".join(buf)); buf = []; i += 1; continue buf.append(c); i += 1 if buf: segs.append("".join(buf)) return segs def _extract_subs(s: str) -> tuple[str, list[str]]: """Pull `$(…)` and backtick command-substitution bodies out of `s`. Returns (outer_with_subs_blanked, [inner_command, …]) so the OUTER parse sees `PUBURL= ` (a bare assignment → no binary) and the INNER `railway variables …` is analyzed on its own. Quote-aware; one level here, recursion handles nesting. """ inners: list[str] = [] out: list[str] = [] i, n = 0, len(s) quote: Optional[str] = None while i < n: c = s[i] if quote: out.append(c) if c == quote: quote = None i += 1 continue if c in ("'", '"'): quote = c; out.append(c); i += 1; continue if c == "$" and i + 1 < n and s[i + 1] == "(": depth, j, inner = 1, i + 2, [] while j < n and depth > 0: cj = s[j] if cj == "(": depth += 1 elif cj == ")": depth -= 1 if depth > 0: inner.append(cj) j += 1 inners.append("".join(inner)); out.append(" "); i = j; continue if c == "`": j, inner = i + 1, [] while j < n and s[j] != "`": inner.append(s[j]); j += 1 inners.append("".join(inner)); out.append(" "); i = j + 1; continue out.append(c); i += 1 return "".join(out), inners def extract_command_binaries(cmd: str, _depth: int = 0) -> dict[str, dict[str, Any]]: """All interesting binaries invoked by ONE Bash command, deduped. Returns {binary: {'via': str, 'subs': set[str]}}. First LINE only (heredoc bodies / multiline scripts never become binaries — same rule as clusters); every quote-aware `&&`/`||`/`|`/`;` segment is inspected, so `railway up && docker build` yields BOTH, and command substitutions (`X=$(railway …)`) are unwrapped and their inner command analyzed too. """ out: dict[str, dict[str, Any]] = {} if not cmd or _depth > 4: return out first_line = cmd.strip().split("\n", 1)[0] outer, inners = _extract_subs(first_line) def _add(name: str, via: str, sub: str) -> None: row = out.setdefault(name, {"via": via, "subs": set()}) if sub and len(row["subs"]) < _SUBS_CAP: row["subs"].add(sub) for seg in _split_segments(outer): hit = _segment_binary(seg) if hit: _add(*hit) for inner in inners: # recurse into $(…)/`…` bodies for name, meta in extract_command_binaries(inner, _depth + 1).items(): _add(name, meta["via"], "") out[name]["subs"].update(meta["subs"]) return out def extract_binaries( turns: list[dict[str, Any]], db: Optional[dict[str, dict[str, Any]]] = None, ) -> list[dict[str, Any]]: """Per-session binary inventory, sorted by descending use, each traceable to the turns it ran in — and merged with registry metadata when known. `turns` are the CONTRACT dicts (post `to_dict`), exactly like the input to `entities.extract_entities`. `count` = number of Bash tool_calls that invoked the binary (deduped within a call). Each row: { binary, name, count, turns:[i], via, subs:[…], identified, product?, blurb?, homepage?, logo?, security?, source?, updated? } Unknown binaries still appear (identified:false) with the bare name — the enricher upgrades them later. `db` is the merged curated+learned registry. """ agg: dict[str, dict[str, Any]] = {} for t in turns: ti = t.get("i") for tc in t.get("tools", []) or []: if (tc.get("name") or "") != "Bash": continue inp = tc.get("input") if isinstance(tc.get("input"), dict) else {} cmd = str(inp.get("command", "") or "") for name, meta in extract_command_binaries(cmd).items(): row = agg.setdefault( name, {"name": name, "count": 0, "turns": set(), "via": meta["via"], "subs": set()}, ) row["count"] += 1 row["turns"].add(ti) row["subs"].update(meta["subs"]) db = db or {} out: list[dict[str, Any]] = [] for name, row in agg.items(): r: dict[str, Any] = { "binary": name, "name": name, "count": row["count"], "turns": sorted(row["turns"]), "via": row["via"], "subs": sorted(row["subs"]), } meta = db.get(name) or db.get(name.lower()) if isinstance(meta, dict) and (meta.get("product") or meta.get("blurb")): r["identified"] = True for k in ("product", "blurb", "homepage", "logo", "security", "source", "updated"): if meta.get(k) is not None: r[k] = meta[k] else: r["identified"] = False out.append(r) out.sort(key=lambda x: (-x["count"], x["name"])) return out def unknown_binary_names(binaries: list[dict[str, Any]]) -> list[dict[str, str]]: """The not-yet-identified binaries, as {name, via} — the enricher's work queue. Bare data only (name + how it was invoked); never any command text.""" return [ {"name": b["binary"], "via": b.get("via", "direct")} for b in binaries if not b.get("identified") ]