"""entities.py — deterministic extraction of the named agents-in-the-machine. Uniquely identifies the SKILLS used, SUB-AGENTS invoked, and MCP SERVERS/tools touched in a session, each with turn-level traceability — so a bad skill, a misbehaving sub-agent, or a flaky MCP server can be traced back to exactly where it ran. Pure code, NO model (Non-negotiable #1); operates on the normalized Turn[]/ToolCall contract only, never raw JSONL. SCOPE — this is USAGE, not inventory. A skill is listed ONLY when it was actually INVOKED (a `Skill` tool_use). Skills that the session merely AUTHORED/edited (Write/ Edit on `.claude/skills/**`) or that are AVAILABLE on disk but never run do NOT appear — that's deliberate, not a miss. (Owner decision: the panel answers "what ran", not "what could have run". E.g. a session that wrote three skill files but only invoked `smruti` correctly lists `smruti` alone.) Signals (verified against real ~/.claude sessions): * Skill -> tool_use name == "Skill" (input.skill / input.command) * Sub-agent -> tool_use name in {"Agent","Task"} (input.subagent_type, .description) * Sub-agent -> tool_use name == "Workflow" (a workflow spawns MANY agents; enumerate them from the script's agent() labels — one Workflow tool_use is N sub-agents, which a naive "Task/Agent only" pass misses) * MCP -> tool_use name startswith "mcp__" -> mcp____ """ from __future__ import annotations import re from typing import Any # A workflow script declares each spawned agent with a `label:` (and the run itself # with `meta.name`). Parsing these off the inline script is deterministic (it's a # plain string already in the tool input — no model, still inside the contract). _WF_LABEL_RE = re.compile(r"""label:\s*[`'"]([^`'"]+)[`'"]""") _WF_NAME_RE = re.compile(r"""name:\s*['"]([^'"]+)['"]""") def _tool_name(tc: dict[str, Any]) -> str: return str(tc.get("name", "") or "") def _workflow_agents(inp: dict[str, Any]) -> tuple[str, list[str]]: """(workflow name, [agent labels]) parsed from a Workflow tool's inline script. Returns ('', []) when the script is absent (e.g. a scriptPath re-invoke) — the caller still records the workflow run itself so it never silently vanishes.""" script = str(inp.get("script") or "") if not script: return "", [] nm = _WF_NAME_RE.search(script) name = nm.group(1).strip() if nm else "" labels: list[str] = [] for lab in _WF_LABEL_RE.findall(script): lab = lab.strip() if lab and lab not in labels: labels.append(lab) return name, labels[:64] # cap: a runaway script never floods the inventory def _mcp_parts(name: str) -> tuple[str, str]: """mcp____ -> (server, tool). Tool may itself contain '__'.""" rest = name[len("mcp__"):] server, sep, tool = rest.partition("__") return (server, tool if sep else "") def extract_entities(turns: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]: """Return {skills, subAgents, mcpServers}, each a list of entity rows sorted by descending use count. Every row carries the turn indices it ran in (traceback).""" skills: dict[str, dict[str, Any]] = {} subagents: dict[str, dict[str, Any]] = {} mcp: dict[str, dict[str, Any]] = {} def bump(table: dict, key: str, ti: int) -> dict: row = table.setdefault(key, {"name": key, "count": 0, "turns": set()}) row["count"] += 1 row["turns"].add(ti) return row for t in turns: ti = t.get("i") for tc in t.get("tools", []) or []: name = _tool_name(tc) inp = tc.get("input") if isinstance(tc.get("input"), dict) else {} if name == "Skill": sk = str(inp.get("skill") or inp.get("command") or "skill").strip() or "skill" bump(skills, sk, ti) elif name in ("Agent", "Task"): st = str(inp.get("subagent_type") or ("general-purpose" if name == "Agent" else "task")).strip() or "agent" row = bump(subagents, st, ti) row["via"] = name.lower() desc = (inp.get("description") or "").strip() samples = row.setdefault("samples", []) if desc and desc not in samples and len(samples) < 4: samples.append(desc) elif name == "Workflow": wf_name, agent_labels = _workflow_agents(inp) for lab in agent_labels: row = bump(subagents, lab, ti) row["via"] = "workflow" if wf_name: row["workflow"] = wf_name if not agent_labels: # a workflow ran but the script was a scriptPath re-invoke (or had # no labels) — record the run itself so it isn't lost. row = bump(subagents, wf_name or "workflow", ti) row["via"] = "workflow" elif name.startswith("mcp__"): server, tool = _mcp_parts(name) server = server or "mcp" row = mcp.setdefault(server, {"name": server, "count": 0, "turns": set(), "tools": set()}) row["count"] += 1 row["turns"].add(ti) if tool: row["tools"].add(tool) def finalize(table: dict, set_keys: tuple = ("turns",)) -> list[dict]: out = [] for row in table.values(): r = dict(row) for k in set_keys: if isinstance(r.get(k), set): r[k] = sorted(r[k]) out.append(r) out.sort(key=lambda x: (-x["count"], x["name"])) return out return { "skills": finalize(skills), "subAgents": finalize(subagents), "mcpServers": finalize(mcp, set_keys=("turns", "tools")), } def entity_totals(entities: dict[str, list]) -> dict[str, int]: """Quick counts for a header chip: distinct skills / sub-agents / mcp servers.""" return { "skills": len(entities.get("skills", [])), "subAgents": len(entities.get("subAgents", [])), "mcpServers": len(entities.get("mcpServers", [])), }