File size: 8,039 Bytes
5f43c7d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""clusters.py — tool_cluster signal: flailing on an external CLI with no skill.

NON-NEGOTIABLE #1/#6: pure code, NO model; the advisor stays SILENT unless a
NAMED, FIXABLE pattern fires. This detects exactly one:

  A run of Bash calls that share an external binary (the command's first real
  token, e.g. `railway`, `gh`, `docker`) where the agent FLAILED — it errored
  repeatedly — and NO skill was loaded in the session to give it that context.

That maps to Anthropic's "Use CLI tools / Create skills" guidance: a skill or a
service CLI would have handed the agent the context it instead burned tokens
rediscovering. The fix text is attached from the cited knowledge file
(best_practices.practice_for) — this module never invents advice.

NOT a domain dictionary. The build owner rejected a service→skill mapping (the
maintenance treadmill). What we keep instead is the inverse: `_UBIQUITOUS`, a
small, stable, STRUCTURAL denylist of commands the agent universally knows —
shell builtins, coreutils, core dev runtimes/VCS — for which a skill would never
help. Adding a new service (vercel, supabase, fly…) needs ZERO maintenance here;
it simply isn't on the denylist, so it can fire. The error signal is the proof,
not a curated allow-list.

PER-BINARY skill coverage: a cluster is suppressed only when a skill whose name
matches THAT binary was loaded (e.g. a `railway` skill mutes the railway cluster).
A skill used elsewhere in the session no longer mutes an unrelated CLI that flailed
— that session-wide suppression was hiding real findings (e.g. railway 17x/2-errored
in a session that happened to use some other skill).
"""
from __future__ import annotations

import re
from typing import Any

from engine.core.best_practices import practice_for

# Tunable in one place. Conservative on purpose: a teaching tool shipped to
# beginners should under-fire rather than nag. Railway-style flailing (many calls,
# several errors) clears these comfortably; a one-off 3-call hiccup does not.
MIN_CALLS = 4
MIN_ERRORS = 2

# Command-segment splitters within a single line: &&  ||  |  ;
_SEG_RE = re.compile(r"&&|\|\|?|;")
# Leading `VAR=value` environment assignments to skip when finding the binary.
_ASSIGN_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*=")
# A real command name (rejects flags like `-c`, redirects `<`/`>`, `$(...)`, etc.)
_NAME_RE = re.compile(r"^[A-Za-z][A-Za-z0-9_.+-]*$")
# Command wrappers that precede the real binary.
_WRAPPERS = {"sudo", "env", "command", "time", "nohup", "exec", "builtin", "then", "do", "xargs"}
# Segment heads that are navigation/no-op/shell-keywords — look at the next segment.
_NAV_HEADS = {
    "cd", "pushd", "popd", "export", "set", "source", ".", "unset", "",
    # shell control keywords — never an external binary
    "if", "elif", "else", "fi", "for", "while", "until", "done", "case", "esac",
    "function", "select", "return", "in", "continue", "break",
}

# Universally-known commands a skill would never help with. STRUCTURAL, not a
# domain map: shell builtins + coreutils + core dev runtimes/VCS + generic net
# fetchers. Stable for years; new SERVICES never need adding here.
_UBIQUITOUS = {
    # shell builtins / control
    "echo", "printf", "true", "false", "test", "read", "wait", "trap", "eval",
    "alias", "type", "which", "kill", "jobs", "bg", "fg", "umask",
    # coreutils / text
    "cat", "ls", "cp", "mv", "rm", "mkdir", "rmdir", "touch", "ln", "chmod",
    "chown", "pwd", "grep", "egrep", "fgrep", "sed", "awk", "cut", "tr", "sort",
    "uniq", "head", "tail", "wc", "find", "tee", "tar", "gzip", "gunzip", "zip",
    "unzip", "diff", "comm", "basename", "dirname", "sleep", "date", "ps", "top",
    "df", "du", "stat", "file", "seq", "yes", "less", "more", "t3", "open",
    # core dev runtimes / package managers / vcs
    "python", "python3", "pip", "pip3", "node", "npm", "npx", "yarn", "pnpm",
    "deno", "bun", "git", "make", "cmake", "go", "cargo", "rustc", "java",
    "javac", "mvn", "gradle", "ruby", "gem", "bundle", "bash", "sh", "zsh", "fish",
    # generic network tools (too generic to map to one skill)
    "curl", "wget", "jq", "ssh", "scp", "rsync", "nc", "ping", "dig", "host",
}


def _binary(cmd: str) -> str:
    """The external binary a Bash command actually invokes (basename), or ''.

    First line only (heredoc bodies / multiline scripts never become 'binaries'),
    then skip leading env-assignments and wrappers (`sudo`, `env`, …) and step past
    pure-navigation segments (`cd foo && railway up` -> `railway`). The head must
    look like a real command name — flags (`-c`), redirects (`<`), and `$(...)` are
    rejected. Structural tokenization only — NOT a domain dictionary.
    """
    first_line = cmd.strip().split("\n", 1)[0]
    for seg in _SEG_RE.split(first_line):
        toks = seg.strip().split()
        i = 0
        while i < len(toks) and (_ASSIGN_RE.match(toks[i]) or toks[i] in _WRAPPERS):
            i += 1
        if i >= len(toks):
            continue
        head = toks[i].rsplit("/", 1)[-1]  # /usr/bin/railway -> railway
        if head in _NAV_HEADS or not _NAME_RE.match(head):
            continue
        return head
    return ""


def detect_tool_clusters(turns) -> list[dict[str, Any]]:
    """Return the FLAGGED tool-clusters for a session (silence -> empty list).

    Each entry: { binary, calls, errored, turns:[i], toolIds:[id], skillLoaded,
                  practice, fix, section, source }  (the last four when the cited
                  knowledge file is present). Only clusters that fired are returned;
                  clean or universally-known binaries are never surfaced.
    """
    # Skill identifiers used anywhere in the session (lowercased). A cluster is
    # muted only if a skill name matches its binary (per-binary, not session-wide).
    skill_names: set[str] = set()
    for t in turns:
        for tc in t.tools:
            if getattr(tc, "name", "") == "Skill":
                inp = tc.input if isinstance(tc.input, dict) else {}
                nm = str(inp.get("skill") or inp.get("command") or "").strip().lower()
                if nm:
                    skill_names.add(nm)

    def _covered(binary: str) -> bool:
        bl = binary.lower()
        return any(bl in sn or sn in bl for sn in skill_names)

    agg: dict[str, dict[str, Any]] = {}
    for t in turns:
        for tc in t.tools:
            if getattr(tc, "name", "") != "Bash":
                continue
            inp = tc.input if isinstance(tc.input, dict) else {}
            cmd = str(inp.get("command", "") or "")
            b = _binary(cmd)
            if not b or b in _UBIQUITOUS:
                continue
            row = agg.setdefault(
                b, {"calls": 0, "errored": 0, "turns": set(), "toolIds": []}
            )
            row["calls"] += 1
            if getattr(tc, "errored", False):
                row["errored"] += 1
            row["turns"].add(t.i)
            if tc.id:
                row["toolIds"].append(tc.id)

    bp = practice_for("tool_cluster")
    clusters: list[dict[str, Any]] = []
    for b, row in agg.items():
        if row["calls"] < MIN_CALLS or row["errored"] < MIN_ERRORS or _covered(b):
            continue
        c: dict[str, Any] = {
            "binary": b,
            "calls": row["calls"],
            "errored": row["errored"],
            "turns": sorted(row["turns"]),
            "toolIds": row["toolIds"],
            "skillLoaded": False,  # a matching skill would have suppressed this cluster
        }
        if bp:
            c.update(
                {
                    "practice": bp.get("practice"),
                    "fix": bp.get("fix"),
                    "section": bp.get("section"),
                    "source": bp.get("source"),
                }
            )
        clusters.append(c)

    # Most-flailing first: by errors, then calls, then name (deterministic).
    clusters.sort(key=lambda c: (-c["errored"], -c["calls"], c["binary"]))
    return clusters