| import json |
| import logging |
| import os |
| import re |
| from pathlib import Path |
| from config import settings |
|
|
| log = logging.getLogger("skill_analyzer") |
|
|
| SKILLS_DIR = Path(os.environ.get("OPENCLAW_SKILLS_DIR", "/home/node/.openclaw/skills")).expanduser() |
| MCPORTER_CONFIG = Path("/home/node/.openclaw/workspace/_shared/mcporter/config/mcporter.json") |
| OPENCLAW_CONFIG = Path(os.environ.get("OPENCLAW_CONFIG", "/home/node/.openclaw/openclaw.json")).expanduser() |
| STOP_WORDS = { |
| "the", "and", "for", "with", "from", "that", "this", "when", "into", "your", "you", |
| "are", "can", "use", "using", "skill", "skills", "agent", "agents", "tool", "tools", |
| "task", "tasks", "workflow", "workflows", "process", "openclaw", |
| "before", "after", "existing", "new", "review", "checks", "first", "instead", "only", |
| "improve", "simple", "appropriate", "repo", "updates", "adding", "draft", "recommend", |
| "requested", "evaluate", |
| } |
| REUSE_SCORE_THRESHOLD = 6 |
|
|
|
|
| def _list_mcp_skills() -> list[str]: |
| """Return skill names from the skills dir.""" |
| if not SKILLS_DIR.exists(): |
| return [] |
| return sorted(d.name for d in SKILLS_DIR.iterdir() if d.is_dir() and (d / "SKILL.md").exists()) |
|
|
|
|
| def _tokens(text: str | None) -> set[str]: |
| if not text: |
| return set() |
| return { |
| tok |
| for tok in re.findall(r"[a-z][a-z0-9_-]{2,}", text.lower()) |
| if tok not in STOP_WORDS |
| } |
|
|
|
|
| def _frontmatter(text: str) -> dict[str, str]: |
| if not text.startswith("---"): |
| return {} |
| parts = text.split("---", 2) |
| if len(parts) < 3: |
| return {} |
| out: dict[str, str] = {} |
| for line in parts[1].splitlines(): |
| if ":" not in line or line.startswith((" ", "\t")): |
| continue |
| key, value = line.split(":", 1) |
| out[key.strip()] = value.strip().strip('"\'') |
| return out |
|
|
|
|
| def _skill_catalog() -> list[dict]: |
| if not SKILLS_DIR.exists(): |
| return [] |
| rows: list[dict] = [] |
| for skill_md in sorted(SKILLS_DIR.glob("**/SKILL.md")): |
| if any(part in {"_archive", ".git", "node_modules", ".cache"} for part in skill_md.parts): |
| continue |
| try: |
| text = skill_md.read_text(encoding="utf-8", errors="replace") |
| except Exception: |
| continue |
| fm = _frontmatter(text) |
| rel = str(skill_md.parent.relative_to(SKILLS_DIR)) |
| name = fm.get("name") or skill_md.parent.name |
| description = fm.get("description") or "" |
| body = text.split("---", 2)[2] if text.startswith("---") and len(text.split("---", 2)) > 2 else text |
| rows.append({ |
| "name": name, |
| "path": str(skill_md), |
| "relativePath": rel, |
| "description": description[:500], |
| "tokens": sorted(_tokens(" ".join([name, rel, description, body[:4000]]))), |
| }) |
| return rows |
|
|
|
|
| def find_skill_candidates(query: str, agent_id: str | None = None, limit: int = 8) -> dict: |
| query_tokens = _tokens(query) |
| matches = [] |
| for skill in _skill_catalog(): |
| skill_tokens = set(skill["tokens"]) |
| overlap = sorted(query_tokens & skill_tokens) |
| if not overlap: |
| continue |
| score = len(overlap) |
| score += 3 * len(query_tokens & _tokens(skill["name"])) |
| score += 2 * len(query_tokens & _tokens(skill["description"])) |
| matches.append({ |
| "name": skill["name"], |
| "path": skill["path"], |
| "relativePath": skill["relativePath"], |
| "description": skill["description"], |
| "score": score, |
| "matchedTerms": overlap[:12], |
| }) |
| matches = sorted(matches, key=lambda row: (-row["score"], row["name"]))[:limit] |
| top = matches[0] if matches and matches[0]["score"] >= REUSE_SCORE_THRESHOLD else None |
| agent_filter = load_agent_skill_filter(agent_id) |
| if top and isinstance(agent_filter.get("skills"), list): |
| present = top["name"] in agent_filter["skills"] or top["relativePath"] in agent_filter["skills"] |
| decision = "reuse-existing-skill" |
| allowlist_action = "none" if present else "add-existing-skill-to-agent-allowlist" |
| elif top: |
| decision = "reuse-existing-skill" |
| allowlist_action = "none-unrestricted-or-unknown" |
| else: |
| decision = "draft-new-skill-for-review" |
| allowlist_action = "n/a" |
| return { |
| "query": query, |
| "agentId": agent_id, |
| "decision": decision, |
| "allowlistAction": allowlist_action, |
| "allowlistValue": top["name"] if top else None, |
| "agentSkillFilter": agent_filter, |
| "matches": matches, |
| } |
|
|
|
|
| def load_agent_skill_filter(agent_id: str | None) -> dict: |
| if not agent_id: |
| return {"agentId": None, "mode": "not-evaluated", "skills": None} |
| try: |
| cfg = json.loads(OPENCLAW_CONFIG.read_text(encoding="utf-8")) |
| except Exception as exc: |
| return {"agentId": agent_id, "mode": "config-unavailable", "error": str(exc), "skills": None} |
| defaults = (cfg.get("agents") or {}).get("defaults") or {} |
| for agent in (cfg.get("agents") or {}).get("list") or []: |
| if agent.get("id") == agent_id: |
| if "skills" in agent: |
| return {"agentId": agent_id, "mode": "explicit-agent-allowlist", "skills": agent.get("skills") or []} |
| if "skills" in defaults: |
| return {"agentId": agent_id, "mode": "inherits-default-allowlist", "skills": defaults.get("skills") or []} |
| return {"agentId": agent_id, "mode": "unrestricted", "skills": None} |
| return {"agentId": agent_id, "mode": "agent-not-found", "skills": None} |
|
|
|
|
| def analyze_agent_skill_filters() -> dict: |
| try: |
| cfg = json.loads(OPENCLAW_CONFIG.read_text(encoding="utf-8")) |
| except Exception as exc: |
| return {"error": str(exc), "agents": []} |
| defaults = (cfg.get("agents") or {}).get("defaults") or {} |
| rows = [] |
| for agent in (cfg.get("agents") or {}).get("list") or []: |
| if "skills" in agent: |
| mode = "explicit-agent-allowlist" |
| skills = agent.get("skills") or [] |
| elif "skills" in defaults: |
| mode = "inherits-default-allowlist" |
| skills = defaults.get("skills") or [] |
| else: |
| mode = "unrestricted" |
| skills = None |
| rows.append({"agentId": agent.get("id"), "mode": mode, "skillCount": len(skills) if isinstance(skills, list) else None}) |
| return {"defaultSkillsSet": "skills" in defaults, "agents": rows} |
|
|
|
|
| def _list_registered_mcps() -> list[dict]: |
| """Read registered MCP servers from shared mcporter config.""" |
| if not MCPORTER_CONFIG.exists(): |
| return [] |
| try: |
| data = json.loads(MCPORTER_CONFIG.read_text()) |
| servers = data.get("mcpServers", {}) |
| if not isinstance(servers, dict): |
| return [] |
| return [ |
| { |
| "name": name, |
| "transport": "http" if isinstance(cfg, dict) and cfg.get("type") == "http" else "stdio", |
| "enabled": True, |
| } |
| for name, cfg in servers.items() |
| if isinstance(cfg, dict) |
| ] |
| except Exception as exc: |
| log.warning("Could not read mcporter config: %s", exc) |
| return [] |
|
|
|
|
| def _skill_aliases(name: str) -> set[str]: |
| aliases = {name, name.replace("_", "-"), name.replace("-", "_")} |
| if name.endswith("-mcp"): |
| aliases.add(name[:-4]) |
| aliases.add(name[:-4].replace("-", "_")) |
| if name.endswith("_mcp"): |
| aliases.add(name[:-4].replace("_", "-")) |
| aliases.add(name[:-4]) |
| return aliases |
|
|
|
|
| def analyze_skill_coverage() -> dict: |
| """Compare registered MCPs vs skill surfaces.""" |
| registered_mcps = _list_registered_mcps() |
| skill_names = set(_list_mcp_skills()) |
|
|
| mcps_with_skill = [] |
| mcps_missing_skill = [] |
| for mcp in registered_mcps: |
| name = mcp["name"] |
| aliases = _skill_aliases(name) |
| if aliases & skill_names: |
| mcps_with_skill.append(name) |
| else: |
| mcps_missing_skill.append(name) |
|
|
| skills_without_registered_mcp = sorted( |
| skill for skill in skill_names |
| if not any(skill in _skill_aliases(mcp["name"]) for mcp in registered_mcps) |
| ) |
|
|
| missing_sorted = sorted(mcps_missing_skill) |
| return { |
| "registered_mcps": registered_mcps, |
| "mcps_with_skill_surface": sorted(mcps_with_skill), |
| "mcps_missing_skill_surface": missing_sorted, |
| "skills_without_registered_mcp": skills_without_registered_mcp, |
| "skill_catalog_count": len(_skill_catalog()), |
| "agent_skill_filters": analyze_agent_skill_filters(), |
| |
| "mcps_missing_skill": missing_sorted, |
| } |
|
|