import json import logging import os import re from pathlib import Path from config import settings log = logging.getLogger("skill_analyzer") SKILLS_DIR = Path(os.environ.get("OPENCLAW_SKILLS_DIR", "/home/node/.openclaw/skills")).expanduser() MCPORTER_CONFIG = Path("/home/node/.openclaw/workspace/_shared/mcporter/config/mcporter.json") OPENCLAW_CONFIG = Path(os.environ.get("OPENCLAW_CONFIG", "/home/node/.openclaw/openclaw.json")).expanduser() STOP_WORDS = { "the", "and", "for", "with", "from", "that", "this", "when", "into", "your", "you", "are", "can", "use", "using", "skill", "skills", "agent", "agents", "tool", "tools", "task", "tasks", "workflow", "workflows", "process", "openclaw", "before", "after", "existing", "new", "review", "checks", "first", "instead", "only", "improve", "simple", "appropriate", "repo", "updates", "adding", "draft", "recommend", "requested", "evaluate", } REUSE_SCORE_THRESHOLD = 6 def _list_mcp_skills() -> list[str]: """Return skill names from the skills dir.""" if not SKILLS_DIR.exists(): return [] return sorted(d.name for d in SKILLS_DIR.iterdir() if d.is_dir() and (d / "SKILL.md").exists()) def _tokens(text: str | None) -> set[str]: if not text: return set() return { tok for tok in re.findall(r"[a-z][a-z0-9_-]{2,}", text.lower()) if tok not in STOP_WORDS } def _frontmatter(text: str) -> dict[str, str]: if not text.startswith("---"): return {} parts = text.split("---", 2) if len(parts) < 3: return {} out: dict[str, str] = {} for line in parts[1].splitlines(): if ":" not in line or line.startswith((" ", "\t")): continue key, value = line.split(":", 1) out[key.strip()] = value.strip().strip('"\'') return out def _skill_catalog() -> list[dict]: if not SKILLS_DIR.exists(): return [] rows: list[dict] = [] for skill_md in sorted(SKILLS_DIR.glob("**/SKILL.md")): if any(part in {"_archive", ".git", "node_modules", ".cache"} for part in skill_md.parts): continue try: text = skill_md.read_text(encoding="utf-8", errors="replace") except Exception: continue fm = _frontmatter(text) rel = str(skill_md.parent.relative_to(SKILLS_DIR)) name = fm.get("name") or skill_md.parent.name description = fm.get("description") or "" body = text.split("---", 2)[2] if text.startswith("---") and len(text.split("---", 2)) > 2 else text rows.append({ "name": name, "path": str(skill_md), "relativePath": rel, "description": description[:500], "tokens": sorted(_tokens(" ".join([name, rel, description, body[:4000]]))), }) return rows def find_skill_candidates(query: str, agent_id: str | None = None, limit: int = 8) -> dict: query_tokens = _tokens(query) matches = [] for skill in _skill_catalog(): skill_tokens = set(skill["tokens"]) overlap = sorted(query_tokens & skill_tokens) if not overlap: continue score = len(overlap) score += 3 * len(query_tokens & _tokens(skill["name"])) score += 2 * len(query_tokens & _tokens(skill["description"])) matches.append({ "name": skill["name"], "path": skill["path"], "relativePath": skill["relativePath"], "description": skill["description"], "score": score, "matchedTerms": overlap[:12], }) matches = sorted(matches, key=lambda row: (-row["score"], row["name"]))[:limit] top = matches[0] if matches and matches[0]["score"] >= REUSE_SCORE_THRESHOLD else None agent_filter = load_agent_skill_filter(agent_id) if top and isinstance(agent_filter.get("skills"), list): present = top["name"] in agent_filter["skills"] or top["relativePath"] in agent_filter["skills"] decision = "reuse-existing-skill" allowlist_action = "none" if present else "add-existing-skill-to-agent-allowlist" elif top: decision = "reuse-existing-skill" allowlist_action = "none-unrestricted-or-unknown" else: decision = "draft-new-skill-for-review" allowlist_action = "n/a" return { "query": query, "agentId": agent_id, "decision": decision, "allowlistAction": allowlist_action, "allowlistValue": top["name"] if top else None, "agentSkillFilter": agent_filter, "matches": matches, } def load_agent_skill_filter(agent_id: str | None) -> dict: if not agent_id: return {"agentId": None, "mode": "not-evaluated", "skills": None} try: cfg = json.loads(OPENCLAW_CONFIG.read_text(encoding="utf-8")) except Exception as exc: return {"agentId": agent_id, "mode": "config-unavailable", "error": str(exc), "skills": None} defaults = (cfg.get("agents") or {}).get("defaults") or {} for agent in (cfg.get("agents") or {}).get("list") or []: if agent.get("id") == agent_id: if "skills" in agent: return {"agentId": agent_id, "mode": "explicit-agent-allowlist", "skills": agent.get("skills") or []} if "skills" in defaults: return {"agentId": agent_id, "mode": "inherits-default-allowlist", "skills": defaults.get("skills") or []} return {"agentId": agent_id, "mode": "unrestricted", "skills": None} return {"agentId": agent_id, "mode": "agent-not-found", "skills": None} def analyze_agent_skill_filters() -> dict: try: cfg = json.loads(OPENCLAW_CONFIG.read_text(encoding="utf-8")) except Exception as exc: return {"error": str(exc), "agents": []} defaults = (cfg.get("agents") or {}).get("defaults") or {} rows = [] for agent in (cfg.get("agents") or {}).get("list") or []: if "skills" in agent: mode = "explicit-agent-allowlist" skills = agent.get("skills") or [] elif "skills" in defaults: mode = "inherits-default-allowlist" skills = defaults.get("skills") or [] else: mode = "unrestricted" skills = None rows.append({"agentId": agent.get("id"), "mode": mode, "skillCount": len(skills) if isinstance(skills, list) else None}) return {"defaultSkillsSet": "skills" in defaults, "agents": rows} def _list_registered_mcps() -> list[dict]: """Read registered MCP servers from shared mcporter config.""" if not MCPORTER_CONFIG.exists(): return [] try: data = json.loads(MCPORTER_CONFIG.read_text()) servers = data.get("mcpServers", {}) if not isinstance(servers, dict): return [] return [ { "name": name, "transport": "http" if isinstance(cfg, dict) and cfg.get("type") == "http" else "stdio", "enabled": True, } for name, cfg in servers.items() if isinstance(cfg, dict) ] except Exception as exc: log.warning("Could not read mcporter config: %s", exc) return [] def _skill_aliases(name: str) -> set[str]: aliases = {name, name.replace("_", "-"), name.replace("-", "_")} if name.endswith("-mcp"): aliases.add(name[:-4]) aliases.add(name[:-4].replace("-", "_")) if name.endswith("_mcp"): aliases.add(name[:-4].replace("_", "-")) aliases.add(name[:-4]) return aliases def analyze_skill_coverage() -> dict: """Compare registered MCPs vs skill surfaces.""" registered_mcps = _list_registered_mcps() skill_names = set(_list_mcp_skills()) mcps_with_skill = [] mcps_missing_skill = [] for mcp in registered_mcps: name = mcp["name"] aliases = _skill_aliases(name) if aliases & skill_names: mcps_with_skill.append(name) else: mcps_missing_skill.append(name) skills_without_registered_mcp = sorted( skill for skill in skill_names if not any(skill in _skill_aliases(mcp["name"]) for mcp in registered_mcps) ) missing_sorted = sorted(mcps_missing_skill) return { "registered_mcps": registered_mcps, "mcps_with_skill_surface": sorted(mcps_with_skill), "mcps_missing_skill_surface": missing_sorted, "skills_without_registered_mcp": skills_without_registered_mcp, "skill_catalog_count": len(_skill_catalog()), "agent_skill_filters": analyze_agent_skill_filters(), # Deprecated alias — use mcps_missing_skill_surface instead "mcps_missing_skill": missing_sorted, }