her / engine /core /pins.py
geekwrestler's picture
Squash history (purge pre-scrub demo session blobs)
5f43c7d
"""pins.py — deterministic detector for UNPINNED package runners (npx_unpinned).
A "generally recommended" practice, not an Anthropic one: running a tool with a
bare runner (`npx remotion`, `pnpm dlx foo`, `bunx bar`, `yarn dlx baz`) instead of
a pinned version (`npx remotion@4.0.0`) means the run is not reproducible — the
registry can serve a newer, possibly-breaking release between sessions. This module
flags that pattern so the recommendation engine can suggest pinning.
It is a SEPARATE, additive signal: it never touches token/turn/loop/reread counts
and does NOT modify clusters.py. It reuses the proven, quote-aware splitter and
token rules from `engine.binaries` (so it agrees with the binary extractor on what
a "runner + package" is), and adds the one thing the extractor strips away: whether
the package token carried an `@version` at all.
Pure code, NO model (Non-negotiable #1); operates on the normalized Turn[]/ToolCall
contract dicts (like `binaries.extract_binaries`), never raw JSONL. Suggest-only.
Conservative thresholds keep it quiet on incidental one-off use — it fires only when
unpinned runners are a real, repeated habit in the session.
"""
from __future__ import annotations
from typing import Any, Optional
from engine.binaries import (
_RUNNERS_1,
_RUNNERS_2,
_RUNNER_FLAGS_VALUE,
_REDIR,
_extract_subs,
_split_segments,
_strip_version,
_tokens,
)
# Fire when EITHER habit is clear: several distinct tools run unpinned, OR one tool
# is run unpinned again and again. Conservative so a single incidental `npx`
# (reproducibility rarely matters once) stays silent.
_MIN_DISTINCT_PACKAGES = 3 # >= 3 different unpinned packages in the session
_MIN_REPEATS_ONE_PACKAGE = 4 # OR the same unpinned package >= 4 times
def _is_pinned(raw_pkg: str) -> bool:
"""True if `raw_pkg` carries an explicit @version (so the run is reproducible).
`_strip_version` already knows where the version boundary is for both bare and
@scope/name packages — a token is pinned iff stripping it actually removed
something (i.e. there was a trailing @version). `latest`/`next` dist-tags are
treated as UNPINNED on purpose: they float just like a bare name.
"""
stripped = _strip_version(raw_pkg)
if stripped == raw_pkg:
return False # no @version at all
version = raw_pkg[len(stripped) + 1:] # text after the boundary '@'
return version.lower() not in ("", "latest", "next")
def _unpinned_after(toks: list[str], start: int) -> Optional[str]:
"""The first real package token at/after `start` IF it is unpinned, stripped to
its bare name; None if there is no package or it is already pinned.
Mirrors `binaries._package_after` (skip runner flags + redirects) but inspects
the RAW token for an @version before stripping it.
"""
i = start
while i < len(toks):
tok = toks[i]
if tok in _RUNNER_FLAGS_VALUE:
i += 2 # flag + its value
continue
if tok in _REDIR or tok.startswith("-"):
i += 1 # redirect / bare flag (-y, --yes, …)
continue
# the next non-flag token is the package spec
if "/" in tok and not tok.startswith("@"):
return None # a path (./script, dir/x), not a published package
bare = _strip_version(tok)
if not bare or not (bare[0].isalpha() or bare.startswith("@")):
return None
return None if _is_pinned(tok) else bare
return None
def _segment_unpinned(seg: str) -> Optional[str]:
"""Bare package name run UNPINNED by a runner in ONE command segment, else None."""
toks = _tokens(seg)
# skip leading env-assignments / wrappers / redirects (cheap, reuse binaries'
# idea without importing its private head logic — find the first plain token).
i = 0
while i < len(toks) and (toks[i] in _REDIR or "=" in toks[i].split("/")[0]):
i += 1
if i >= len(toks):
return None
head = toks[i].rsplit("/", 1)[-1]
if head in _RUNNERS_1:
return _unpinned_after(toks, i + 1)
if i + 1 < len(toks) and (head, toks[i + 1]) in _RUNNERS_2:
return _unpinned_after(toks, i + 2)
return None
def _command_unpinned(cmd: str) -> set[str]:
"""All bare package names run unpinned by ONE Bash command (first line only)."""
out: set[str] = set()
if not cmd:
return out
first_line = cmd.strip().split("\n", 1)[0]
outer, inners = _extract_subs(first_line)
for seg in _split_segments(outer):
pkg = _segment_unpinned(seg)
if pkg:
out.add(pkg)
for inner in inners:
out |= _command_unpinned(inner)
return out
def detect_npx_unpinned(turns: list[dict[str, Any]]) -> Optional[dict[str, Any]]:
"""Session-level: are unpinned package runners a real habit here?
`turns` are the CONTRACT dicts (post `to_dict`), like `binaries.extract_binaries`.
Returns None (silence) below threshold, else:
{ packages:[name,…], turns:[i,…], distinct:int, total:int }
where `total` counts each unpinned invocation (one per package per Bash call).
Thresholds: >= 3 distinct unpinned packages, OR the same package >= 4 times.
"""
counts: dict[str, int] = {}
turns_for: dict[str, set[int]] = {}
for t in turns:
ti = t.get("i")
for tc in t.get("tools", []) or []:
if (tc.get("name") or "") != "Bash":
continue
inp = tc.get("input") if isinstance(tc.get("input"), dict) else {}
cmd = str(inp.get("command", "") or "")
for pkg in _command_unpinned(cmd):
counts[pkg] = counts.get(pkg, 0) + 1
turns_for.setdefault(pkg, set()).add(ti)
if not counts:
return None
distinct = len(counts)
most = max(counts.values())
if distinct < _MIN_DISTINCT_PACKAGES and most < _MIN_REPEATS_ONE_PACKAGE:
return None
touched: set[int] = set()
for s in turns_for.values():
touched |= s
return {
"packages": sorted(counts),
"turns": sorted(i for i in touched if i is not None),
"distinct": distinct,
"total": sum(counts.values()),
}