openclaw-session-amplifier / reviewer /skill_analyzer.py
Ordo
Initial public release
63c75d5
import json
import logging
import os
import re
from pathlib import Path
from config import settings
log = logging.getLogger("skill_analyzer")
SKILLS_DIR = Path(os.environ.get("OPENCLAW_SKILLS_DIR", "/home/node/.openclaw/skills")).expanduser()
MCPORTER_CONFIG = Path("/home/node/.openclaw/workspace/_shared/mcporter/config/mcporter.json")
OPENCLAW_CONFIG = Path(os.environ.get("OPENCLAW_CONFIG", "/home/node/.openclaw/openclaw.json")).expanduser()
STOP_WORDS = {
"the", "and", "for", "with", "from", "that", "this", "when", "into", "your", "you",
"are", "can", "use", "using", "skill", "skills", "agent", "agents", "tool", "tools",
"task", "tasks", "workflow", "workflows", "process", "openclaw",
"before", "after", "existing", "new", "review", "checks", "first", "instead", "only",
"improve", "simple", "appropriate", "repo", "updates", "adding", "draft", "recommend",
"requested", "evaluate",
}
REUSE_SCORE_THRESHOLD = 6
def _list_mcp_skills() -> list[str]:
"""Return skill names from the skills dir."""
if not SKILLS_DIR.exists():
return []
return sorted(d.name for d in SKILLS_DIR.iterdir() if d.is_dir() and (d / "SKILL.md").exists())
def _tokens(text: str | None) -> set[str]:
if not text:
return set()
return {
tok
for tok in re.findall(r"[a-z][a-z0-9_-]{2,}", text.lower())
if tok not in STOP_WORDS
}
def _frontmatter(text: str) -> dict[str, str]:
if not text.startswith("---"):
return {}
parts = text.split("---", 2)
if len(parts) < 3:
return {}
out: dict[str, str] = {}
for line in parts[1].splitlines():
if ":" not in line or line.startswith((" ", "\t")):
continue
key, value = line.split(":", 1)
out[key.strip()] = value.strip().strip('"\'')
return out
def _skill_catalog() -> list[dict]:
if not SKILLS_DIR.exists():
return []
rows: list[dict] = []
for skill_md in sorted(SKILLS_DIR.glob("**/SKILL.md")):
if any(part in {"_archive", ".git", "node_modules", ".cache"} for part in skill_md.parts):
continue
try:
text = skill_md.read_text(encoding="utf-8", errors="replace")
except Exception:
continue
fm = _frontmatter(text)
rel = str(skill_md.parent.relative_to(SKILLS_DIR))
name = fm.get("name") or skill_md.parent.name
description = fm.get("description") or ""
body = text.split("---", 2)[2] if text.startswith("---") and len(text.split("---", 2)) > 2 else text
rows.append({
"name": name,
"path": str(skill_md),
"relativePath": rel,
"description": description[:500],
"tokens": sorted(_tokens(" ".join([name, rel, description, body[:4000]]))),
})
return rows
def find_skill_candidates(query: str, agent_id: str | None = None, limit: int = 8) -> dict:
query_tokens = _tokens(query)
matches = []
for skill in _skill_catalog():
skill_tokens = set(skill["tokens"])
overlap = sorted(query_tokens & skill_tokens)
if not overlap:
continue
score = len(overlap)
score += 3 * len(query_tokens & _tokens(skill["name"]))
score += 2 * len(query_tokens & _tokens(skill["description"]))
matches.append({
"name": skill["name"],
"path": skill["path"],
"relativePath": skill["relativePath"],
"description": skill["description"],
"score": score,
"matchedTerms": overlap[:12],
})
matches = sorted(matches, key=lambda row: (-row["score"], row["name"]))[:limit]
top = matches[0] if matches and matches[0]["score"] >= REUSE_SCORE_THRESHOLD else None
agent_filter = load_agent_skill_filter(agent_id)
if top and isinstance(agent_filter.get("skills"), list):
present = top["name"] in agent_filter["skills"] or top["relativePath"] in agent_filter["skills"]
decision = "reuse-existing-skill"
allowlist_action = "none" if present else "add-existing-skill-to-agent-allowlist"
elif top:
decision = "reuse-existing-skill"
allowlist_action = "none-unrestricted-or-unknown"
else:
decision = "draft-new-skill-for-review"
allowlist_action = "n/a"
return {
"query": query,
"agentId": agent_id,
"decision": decision,
"allowlistAction": allowlist_action,
"allowlistValue": top["name"] if top else None,
"agentSkillFilter": agent_filter,
"matches": matches,
}
def load_agent_skill_filter(agent_id: str | None) -> dict:
if not agent_id:
return {"agentId": None, "mode": "not-evaluated", "skills": None}
try:
cfg = json.loads(OPENCLAW_CONFIG.read_text(encoding="utf-8"))
except Exception as exc:
return {"agentId": agent_id, "mode": "config-unavailable", "error": str(exc), "skills": None}
defaults = (cfg.get("agents") or {}).get("defaults") or {}
for agent in (cfg.get("agents") or {}).get("list") or []:
if agent.get("id") == agent_id:
if "skills" in agent:
return {"agentId": agent_id, "mode": "explicit-agent-allowlist", "skills": agent.get("skills") or []}
if "skills" in defaults:
return {"agentId": agent_id, "mode": "inherits-default-allowlist", "skills": defaults.get("skills") or []}
return {"agentId": agent_id, "mode": "unrestricted", "skills": None}
return {"agentId": agent_id, "mode": "agent-not-found", "skills": None}
def analyze_agent_skill_filters() -> dict:
try:
cfg = json.loads(OPENCLAW_CONFIG.read_text(encoding="utf-8"))
except Exception as exc:
return {"error": str(exc), "agents": []}
defaults = (cfg.get("agents") or {}).get("defaults") or {}
rows = []
for agent in (cfg.get("agents") or {}).get("list") or []:
if "skills" in agent:
mode = "explicit-agent-allowlist"
skills = agent.get("skills") or []
elif "skills" in defaults:
mode = "inherits-default-allowlist"
skills = defaults.get("skills") or []
else:
mode = "unrestricted"
skills = None
rows.append({"agentId": agent.get("id"), "mode": mode, "skillCount": len(skills) if isinstance(skills, list) else None})
return {"defaultSkillsSet": "skills" in defaults, "agents": rows}
def _list_registered_mcps() -> list[dict]:
"""Read registered MCP servers from shared mcporter config."""
if not MCPORTER_CONFIG.exists():
return []
try:
data = json.loads(MCPORTER_CONFIG.read_text())
servers = data.get("mcpServers", {})
if not isinstance(servers, dict):
return []
return [
{
"name": name,
"transport": "http" if isinstance(cfg, dict) and cfg.get("type") == "http" else "stdio",
"enabled": True,
}
for name, cfg in servers.items()
if isinstance(cfg, dict)
]
except Exception as exc:
log.warning("Could not read mcporter config: %s", exc)
return []
def _skill_aliases(name: str) -> set[str]:
aliases = {name, name.replace("_", "-"), name.replace("-", "_")}
if name.endswith("-mcp"):
aliases.add(name[:-4])
aliases.add(name[:-4].replace("-", "_"))
if name.endswith("_mcp"):
aliases.add(name[:-4].replace("_", "-"))
aliases.add(name[:-4])
return aliases
def analyze_skill_coverage() -> dict:
"""Compare registered MCPs vs skill surfaces."""
registered_mcps = _list_registered_mcps()
skill_names = set(_list_mcp_skills())
mcps_with_skill = []
mcps_missing_skill = []
for mcp in registered_mcps:
name = mcp["name"]
aliases = _skill_aliases(name)
if aliases & skill_names:
mcps_with_skill.append(name)
else:
mcps_missing_skill.append(name)
skills_without_registered_mcp = sorted(
skill for skill in skill_names
if not any(skill in _skill_aliases(mcp["name"]) for mcp in registered_mcps)
)
missing_sorted = sorted(mcps_missing_skill)
return {
"registered_mcps": registered_mcps,
"mcps_with_skill_surface": sorted(mcps_with_skill),
"mcps_missing_skill_surface": missing_sorted,
"skills_without_registered_mcp": skills_without_registered_mcp,
"skill_catalog_count": len(_skill_catalog()),
"agent_skill_filters": analyze_agent_skill_filters(),
# Deprecated alias — use mcps_missing_skill_surface instead
"mcps_missing_skill": missing_sorted,
}