"""pins.py — deterministic detector for UNPINNED package runners (npx_unpinned). A "generally recommended" practice, not an Anthropic one: running a tool with a bare runner (`npx remotion`, `pnpm dlx foo`, `bunx bar`, `yarn dlx baz`) instead of a pinned version (`npx remotion@4.0.0`) means the run is not reproducible — the registry can serve a newer, possibly-breaking release between sessions. This module flags that pattern so the recommendation engine can suggest pinning. It is a SEPARATE, additive signal: it never touches token/turn/loop/reread counts and does NOT modify clusters.py. It reuses the proven, quote-aware splitter and token rules from `engine.binaries` (so it agrees with the binary extractor on what a "runner + package" is), and adds the one thing the extractor strips away: whether the package token carried an `@version` at all. Pure code, NO model (Non-negotiable #1); operates on the normalized Turn[]/ToolCall contract dicts (like `binaries.extract_binaries`), never raw JSONL. Suggest-only. Conservative thresholds keep it quiet on incidental one-off use — it fires only when unpinned runners are a real, repeated habit in the session. """ from __future__ import annotations from typing import Any, Optional from engine.binaries import ( _RUNNERS_1, _RUNNERS_2, _RUNNER_FLAGS_VALUE, _REDIR, _extract_subs, _split_segments, _strip_version, _tokens, ) # Fire when EITHER habit is clear: several distinct tools run unpinned, OR one tool # is run unpinned again and again. Conservative so a single incidental `npx` # (reproducibility rarely matters once) stays silent. _MIN_DISTINCT_PACKAGES = 3 # >= 3 different unpinned packages in the session _MIN_REPEATS_ONE_PACKAGE = 4 # OR the same unpinned package >= 4 times def _is_pinned(raw_pkg: str) -> bool: """True if `raw_pkg` carries an explicit @version (so the run is reproducible). `_strip_version` already knows where the version boundary is for both bare and @scope/name packages — a token is pinned iff stripping it actually removed something (i.e. there was a trailing @version). `latest`/`next` dist-tags are treated as UNPINNED on purpose: they float just like a bare name. """ stripped = _strip_version(raw_pkg) if stripped == raw_pkg: return False # no @version at all version = raw_pkg[len(stripped) + 1:] # text after the boundary '@' return version.lower() not in ("", "latest", "next") def _unpinned_after(toks: list[str], start: int) -> Optional[str]: """The first real package token at/after `start` IF it is unpinned, stripped to its bare name; None if there is no package or it is already pinned. Mirrors `binaries._package_after` (skip runner flags + redirects) but inspects the RAW token for an @version before stripping it. """ i = start while i < len(toks): tok = toks[i] if tok in _RUNNER_FLAGS_VALUE: i += 2 # flag + its value continue if tok in _REDIR or tok.startswith("-"): i += 1 # redirect / bare flag (-y, --yes, …) continue # the next non-flag token is the package spec if "/" in tok and not tok.startswith("@"): return None # a path (./script, dir/x), not a published package bare = _strip_version(tok) if not bare or not (bare[0].isalpha() or bare.startswith("@")): return None return None if _is_pinned(tok) else bare return None def _segment_unpinned(seg: str) -> Optional[str]: """Bare package name run UNPINNED by a runner in ONE command segment, else None.""" toks = _tokens(seg) # skip leading env-assignments / wrappers / redirects (cheap, reuse binaries' # idea without importing its private head logic — find the first plain token). i = 0 while i < len(toks) and (toks[i] in _REDIR or "=" in toks[i].split("/")[0]): i += 1 if i >= len(toks): return None head = toks[i].rsplit("/", 1)[-1] if head in _RUNNERS_1: return _unpinned_after(toks, i + 1) if i + 1 < len(toks) and (head, toks[i + 1]) in _RUNNERS_2: return _unpinned_after(toks, i + 2) return None def _command_unpinned(cmd: str) -> set[str]: """All bare package names run unpinned by ONE Bash command (first line only).""" out: set[str] = set() if not cmd: return out first_line = cmd.strip().split("\n", 1)[0] outer, inners = _extract_subs(first_line) for seg in _split_segments(outer): pkg = _segment_unpinned(seg) if pkg: out.add(pkg) for inner in inners: out |= _command_unpinned(inner) return out def detect_npx_unpinned(turns: list[dict[str, Any]]) -> Optional[dict[str, Any]]: """Session-level: are unpinned package runners a real habit here? `turns` are the CONTRACT dicts (post `to_dict`), like `binaries.extract_binaries`. Returns None (silence) below threshold, else: { packages:[name,…], turns:[i,…], distinct:int, total:int } where `total` counts each unpinned invocation (one per package per Bash call). Thresholds: >= 3 distinct unpinned packages, OR the same package >= 4 times. """ counts: dict[str, int] = {} turns_for: dict[str, set[int]] = {} for t in turns: ti = t.get("i") for tc in t.get("tools", []) or []: if (tc.get("name") or "") != "Bash": continue inp = tc.get("input") if isinstance(tc.get("input"), dict) else {} cmd = str(inp.get("command", "") or "") for pkg in _command_unpinned(cmd): counts[pkg] = counts.get(pkg, 0) + 1 turns_for.setdefault(pkg, set()).add(ti) if not counts: return None distinct = len(counts) most = max(counts.values()) if distinct < _MIN_DISTINCT_PACKAGES and most < _MIN_REPEATS_ONE_PACKAGE: return None touched: set[int] = set() for s in turns_for.values(): touched |= s return { "packages": sorted(counts), "turns": sorted(i for i in touched if i is not None), "distinct": distinct, "total": sum(counts.values()), }