Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import shutil | |
| import tempfile | |
| import yaml | |
| from fastapi import APIRouter, HTTPException, Request | |
| from pydantic import BaseModel | |
| from pathlib import Path | |
| from urllib.parse import urlparse | |
| from ..services.skill_runtime import ( | |
| ensure_skill_venv, | |
| get_skill_environment_status, | |
| get_skill_path, | |
| get_skill_venv_path, | |
| get_venv_python_path, | |
| install_skill_dependency as install_skill_dependency_runtime, | |
| run_subprocess, | |
| should_skip_skill_dir, | |
| validate_package_name, | |
| ) | |
| router = APIRouter(prefix="/skills", tags=["skills"]) | |
| from ._request_secrets import get_llm_api_key | |
| # Path to the internal skill-creator skill bundled with the backend source | |
| _INTERNAL_SKILLS_DIR = Path(__file__).parent.parent / "_internal_skills" | |
| class SkillInfo(BaseModel): | |
| id: str | |
| name: str | |
| description: str | |
| class SkillCreate(BaseModel): | |
| id: str | |
| name: str | |
| description: str | |
| instructions: str | |
| class SkillUpdate(BaseModel): | |
| name: str | None = None | |
| description: str | None = None | |
| instructions: str | None = None | |
| class SkillFileContent(BaseModel): | |
| path: str | |
| content: str | |
| class SkillEnvironmentStatus(BaseModel): | |
| skill_id: str | |
| venv_exists: bool | |
| python_path: str | None = None | |
| scripts_dir_exists: bool | |
| class SkillDependencyInstall(BaseModel): | |
| package_name: str | |
| class SkillImportGitRequest(BaseModel): | |
| repo_url: str | |
| skill_path: str | None = None | |
| skill_id: str | None = None | |
| ref: str | None = None | |
| def _get_skills_dir() -> str: | |
| return os.path.join(os.path.dirname(__file__), "..", "..", ".skills") | |
| def _get_skill_path(skill_id: str) -> str: | |
| try: | |
| return get_skill_path(skill_id) | |
| except FileNotFoundError: | |
| raise HTTPException(status_code=404, detail="Skill not found") | |
| def _get_skill_venv_path(skill_path: str) -> str: | |
| return get_skill_venv_path(skill_path) | |
| def _get_venv_python_path(venv_path: str) -> str: | |
| return get_venv_python_path(venv_path) | |
| def _should_skip_skill_dir(dirname: str) -> bool: | |
| return should_skip_skill_dir(dirname) | |
| async def _run_subprocess(cmd: list[str], cwd: str) -> tuple[int, str, str]: | |
| return await run_subprocess(cmd, cwd) | |
| async def _ensure_skill_venv(skill_path: str) -> tuple[str, bool]: | |
| try: | |
| return await ensure_skill_venv(skill_path) | |
| except RuntimeError as exc: | |
| raise HTTPException(status_code=500, detail=str(exc)) | |
| def _sanitize_skill_id(raw: str) -> str: | |
| skill_id = (raw or "").strip().lower() | |
| skill_id = re.sub(r"[^a-z0-9-]", "-", skill_id) | |
| skill_id = re.sub(r"-+", "-", skill_id).strip("-") | |
| return skill_id[:64] | |
| def _is_allowed_git_repo_url(repo_url: str) -> bool: | |
| parsed = urlparse((repo_url or "").strip()) | |
| if parsed.scheme in {"http", "https"}: | |
| return bool(parsed.netloc and parsed.path) | |
| if parsed.scheme == "ssh": | |
| return bool(parsed.netloc and parsed.path) | |
| return bool(re.match(r"^[^@\s]+@[^:\s]+:[^\s]+$", repo_url or "")) | |
| def _normalize_git_import_source( | |
| repo_url: str, | |
| skill_path: str | None, | |
| ref: str | None, | |
| ) -> tuple[str, str | None, str | None]: | |
| """ | |
| Allow pasting GitHub tree/blob URLs directly, e.g. | |
| https://github.com/<owner>/<repo>/tree/<ref>/<path> | |
| """ | |
| raw = (repo_url or "").strip() | |
| parsed = urlparse(raw) | |
| host = (parsed.netloc or "").lower() | |
| if host.startswith("www."): | |
| host = host[4:] | |
| if host != "github.com": | |
| return raw, skill_path, ref | |
| segments = [seg for seg in parsed.path.split("/") if seg] | |
| # Expect: owner/repo/(tree|blob)/ref/[path...] | |
| if len(segments) >= 4 and segments[2] in {"tree", "blob"}: | |
| owner, repo = segments[0], segments[1] | |
| extracted_ref = segments[3] | |
| extracted_path = "/".join(segments[4:]) if len(segments) > 4 else "." | |
| normalized_repo = f"https://github.com/{owner}/{repo}.git" | |
| final_ref = (ref or "").strip() or extracted_ref | |
| final_path = (skill_path or "").strip() or extracted_path | |
| return normalized_repo, final_path, final_ref | |
| return raw, skill_path, ref | |
| def _normalize_skill_subpath(skill_path: str | None) -> str: | |
| if not skill_path: | |
| return "." | |
| cleaned = str(skill_path).strip().replace("\\", "/").strip("/") | |
| if not cleaned: | |
| return "." | |
| if ".." in cleaned.split("/"): | |
| raise HTTPException(status_code=400, detail="skill_path cannot contain '..'") | |
| return cleaned | |
| def _parse_and_validate_skill_md(content: str) -> tuple[dict, str]: | |
| """ | |
| Validate SKILL.md against the Agno-compatible format used by this app: | |
| - Must start with YAML frontmatter (`--- ... ---`) | |
| - Frontmatter must contain `name` and `description` | |
| - `name` must match ^[a-z0-9-]+$ | |
| """ | |
| if not content.startswith("---"): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid SKILL.md: missing YAML frontmatter. Expected leading '---'.", | |
| ) | |
| parts = content.split("---", 2) | |
| if len(parts) < 3: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid SKILL.md: malformed YAML frontmatter block.", | |
| ) | |
| metadata = yaml.safe_load(parts[1]) or {} | |
| if not isinstance(metadata, dict): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid SKILL.md: frontmatter must be a YAML object.", | |
| ) | |
| name = str(metadata.get("name") or "").strip() | |
| description = str(metadata.get("description") or "").strip() | |
| if not name: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid SKILL.md: frontmatter field 'name' is required.", | |
| ) | |
| if not re.fullmatch(r"^[a-z0-9-]+$", name): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid SKILL.md: 'name' must be lowercase alphanumeric with hyphens only.", | |
| ) | |
| if not description: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid SKILL.md: frontmatter field 'description' is required.", | |
| ) | |
| instructions = parts[2].lstrip("\n") | |
| if not instructions.strip(): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid SKILL.md: instructions body cannot be empty.", | |
| ) | |
| return {"name": name, "description": description}, instructions | |
| async def list_skills(): | |
| """Returns a list of all currently locally installed skills by parsing SKILL.md files.""" | |
| skills_dir = _get_skills_dir() | |
| skills = [] | |
| if not os.path.exists(skills_dir): | |
| return skills | |
| for item in os.listdir(skills_dir): | |
| skill_path = os.path.join(skills_dir, item) | |
| if os.path.isdir(skill_path): | |
| md_path = os.path.join(skill_path, "SKILL.md") | |
| if os.path.exists(md_path): | |
| # Parse the YAML frontmatter | |
| try: | |
| with open(md_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| if content.startswith("---"): | |
| # Extract the YAML block between the first two '---' markers | |
| parts = content.split("---", 2) | |
| if len(parts) >= 3: | |
| frontmatter = parts[1] | |
| metadata = yaml.safe_load(frontmatter) or {} | |
| skills.append(SkillInfo( | |
| id=item, | |
| name=metadata.get("name") or item, | |
| description=metadata.get("description", "No description available.") | |
| )) | |
| except Exception as e: | |
| # Silently skip malformed skills in listing | |
| print(f"Error parsing skill metadata for {item}: {e}") | |
| pass | |
| # Sort skills alphabetically by name | |
| return sorted(skills, key=lambda s: s.name.lower()) | |
| async def create_skill(skill: SkillCreate): | |
| """Create a new skill in the .skills directory.""" | |
| import re | |
| import shutil | |
| # Validate ID strictly according to Agno rules | |
| if not re.match(r"^[a-z0-9-]+$", skill.id): | |
| from fastapi import HTTPException | |
| raise HTTPException(status_code=400, detail="Skill ID must be lowercase, alphanumeric, and hyphens only.") | |
| skills_dir = _get_skills_dir() | |
| skill_path = os.path.join(skills_dir, skill.id) | |
| md_path = os.path.join(skill_path, "SKILL.md") | |
| if os.path.exists(md_path): | |
| from fastapi import HTTPException | |
| raise HTTPException(status_code=409, detail=f"Skill '{skill.id}' already exists.") | |
| os.makedirs(skill_path, exist_ok=True) | |
| os.makedirs(os.path.join(skill_path, "scripts"), exist_ok=True) | |
| os.makedirs(os.path.join(skill_path, "references"), exist_ok=True) | |
| # Agno requires 'name' to match the directory name and be lowercase/alphanumeric/hyphenated. | |
| md_content = f"---\nname: {skill.id}\ndescription: {skill.description}\n---\n\n{skill.instructions}" | |
| md_path = os.path.join(skill_path, "SKILL.md") | |
| with open(md_path, "w", encoding="utf-8") as f: | |
| f.write(md_content) | |
| return SkillInfo(id=skill.id, name=skill.name or skill.id, description=skill.description) | |
| async def import_skill_from_git(req: SkillImportGitRequest): | |
| """Import a skill from a third-party git repository.""" | |
| repo_url, auto_skill_path, auto_ref = _normalize_git_import_source( | |
| req.repo_url, | |
| req.skill_path, | |
| req.ref, | |
| ) | |
| repo_url = (repo_url or "").strip() | |
| if not repo_url: | |
| raise HTTPException(status_code=400, detail="repo_url is required") | |
| if not _is_allowed_git_repo_url(repo_url): | |
| raise HTTPException(status_code=400, detail="Unsupported repo_url format") | |
| skill_subpath = _normalize_skill_subpath(auto_skill_path) | |
| ref = (auto_ref or "").strip() | |
| if ref and not re.fullmatch(r"[A-Za-z0-9._/\-]+", ref): | |
| raise HTTPException(status_code=400, detail="Invalid ref format") | |
| skills_dir = _get_skills_dir() | |
| os.makedirs(skills_dir, exist_ok=True) | |
| tmp_root = tempfile.mkdtemp(prefix="skill-import-", dir=skills_dir) | |
| try: | |
| clone_cmd = ["git", "clone", "--depth", "1"] | |
| if ref: | |
| clone_cmd.extend(["--branch", ref]) | |
| clone_cmd.extend([repo_url, tmp_root]) | |
| try: | |
| code, _, stderr = await _run_subprocess(clone_cmd, cwd=skills_dir) | |
| except FileNotFoundError: | |
| raise HTTPException(status_code=500, detail="Git is not installed on the server") | |
| if code != 0: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Failed to clone repository: {stderr.strip() or 'unknown error'}", | |
| ) | |
| src_path = os.path.abspath(os.path.join(tmp_root, skill_subpath)) | |
| tmp_root_abs = os.path.abspath(tmp_root) | |
| if not src_path.startswith(tmp_root_abs + os.sep) and src_path != tmp_root_abs: | |
| raise HTTPException(status_code=400, detail="Invalid skill_path") | |
| if not os.path.isdir(src_path): | |
| raise HTTPException(status_code=400, detail="skill_path not found in repository") | |
| skill_md = os.path.join(src_path, "SKILL.md") | |
| if not os.path.isfile(skill_md): | |
| raise HTTPException(status_code=400, detail="SKILL.md not found under skill_path") | |
| parsed_name = "" | |
| try: | |
| with open(skill_md, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| if content.startswith("---"): | |
| parts = content.split("---", 2) | |
| if len(parts) >= 3: | |
| metadata = yaml.safe_load(parts[1]) or {} | |
| parsed_name = str(metadata.get("name") or "").strip() | |
| except Exception: | |
| parsed_name = "" | |
| candidate_id = req.skill_id or parsed_name or os.path.basename(src_path) or "imported-skill" | |
| skill_id = _sanitize_skill_id(candidate_id) | |
| if not skill_id: | |
| raise HTTPException(status_code=400, detail="Could not derive a valid skill_id") | |
| target_path = os.path.join(skills_dir, skill_id) | |
| if os.path.exists(target_path): | |
| raise HTTPException(status_code=409, detail=f"Skill '{skill_id}' already exists.") | |
| def _ignore_copy(_dir: str, names: list[str]) -> set[str]: | |
| ignored = {".git", ".venv", "__pycache__"} | |
| return {n for n in names if n in ignored} | |
| shutil.copytree(src_path, target_path, ignore=_ignore_copy) | |
| imported_md = os.path.join(target_path, "SKILL.md") | |
| if not os.path.isfile(imported_md): | |
| shutil.rmtree(target_path, ignore_errors=True) | |
| raise HTTPException(status_code=400, detail="Imported folder does not contain SKILL.md") | |
| name = skill_id | |
| description = "Imported from git repository." | |
| try: | |
| with open(imported_md, "r", encoding="utf-8") as f: | |
| imported_content = f.read() | |
| metadata, instructions = _parse_and_validate_skill_md(imported_content) | |
| description = metadata["description"] | |
| # Force name to match directory id for Agno compatibility in this app. | |
| normalized_frontmatter = yaml.safe_dump( | |
| {"name": skill_id, "description": description}, | |
| allow_unicode=True, | |
| sort_keys=False, | |
| ).strip() | |
| normalized_content = f"---\n{normalized_frontmatter}\n---\n\n{instructions}" | |
| with open(imported_md, "w", encoding="utf-8") as f: | |
| f.write(normalized_content) | |
| name = skill_id | |
| except Exception: | |
| shutil.rmtree(target_path, ignore_errors=True) | |
| raise | |
| return SkillInfo(id=skill_id, name=name, description=description) | |
| finally: | |
| shutil.rmtree(tmp_root, ignore_errors=True) | |
| async def get_skill(skill_id: str): | |
| """Get full details (including instructions) of a specific skill.""" | |
| skills_dir = _get_skills_dir() | |
| skill_path = os.path.join(skills_dir, skill_id) | |
| md_path = os.path.join(skill_path, "SKILL.md") | |
| if not os.path.exists(md_path): | |
| from fastapi import HTTPException | |
| raise HTTPException(status_code=404, detail="Skill not found") | |
| with open(md_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| parts = content.split("---", 2) | |
| instructions = parts[2].strip() if len(parts) >= 3 else content | |
| # Extract metadata to get description | |
| description = "" | |
| if len(parts) >= 3: | |
| try: | |
| metadata = yaml.safe_load(parts[1]) or {} | |
| description = metadata.get("description", "") | |
| except: | |
| pass | |
| # Defaults based on existing | |
| current_name = skill_id | |
| current_desc = "" | |
| if len(parts) >= 3: | |
| try: | |
| metadata = yaml.safe_load(parts[1]) or {} | |
| current_name = metadata.get("name") or skill_id | |
| current_desc = metadata.get("description", "") | |
| except: | |
| pass | |
| return { | |
| "id": skill_id, | |
| "name": current_name, | |
| "description": current_desc, | |
| "instructions": instructions | |
| } | |
| async def list_skill_files(skill_id: str): | |
| """List all files in a skill's directory (excluding SKILL.md).""" | |
| skill_path = _get_skill_path(skill_id) | |
| files = [] | |
| for root, dirnames, filenames in os.walk(skill_path): | |
| dirnames[:] = [d for d in dirnames if not _should_skip_skill_dir(d)] | |
| rel_root = os.path.relpath(root, skill_path) | |
| if rel_root != "." and any(_should_skip_skill_dir(part) for part in rel_root.split(os.sep)): | |
| continue | |
| for filename in filenames: | |
| if filename == "SKILL.md": | |
| continue | |
| abs_path = os.path.join(root, filename) | |
| rel_path = os.path.relpath(abs_path, skill_path) | |
| files.append(rel_path) | |
| return {"files": sorted(files)} | |
| async def get_skill_file(skill_id: str, path: str): | |
| """Get content of a specific file in a skill.""" | |
| skill_path = _get_skill_path(skill_id) | |
| file_path = os.path.abspath(os.path.join(skill_path, path)) | |
| if not file_path.startswith(os.path.abspath(skill_path)): | |
| from fastapi import HTTPException | |
| raise HTTPException(status_code=400, detail="Invalid path") | |
| if not os.path.exists(file_path): | |
| from fastapi import HTTPException | |
| raise HTTPException(status_code=404, detail="File not found") | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| return {"content": content} | |
| async def update_skill_file(skill_id: str, file_data: SkillFileContent): | |
| """Create or update a file in a skill.""" | |
| skill_path = _get_skill_path(skill_id) | |
| file_path = os.path.abspath(os.path.join(skill_path, file_data.path)) | |
| if not file_path.startswith(os.path.abspath(skill_path)): | |
| from fastapi import HTTPException | |
| raise HTTPException(status_code=400, detail="Invalid path") | |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
| with open(file_path, "w", encoding="utf-8") as f: | |
| f.write(file_data.content) | |
| return {"success": True} | |
| async def delete_skill_file(skill_id: str, path: str): | |
| """Delete a file in a skill.""" | |
| skill_path = _get_skill_path(skill_id) | |
| file_path = os.path.abspath(os.path.join(skill_path, path)) | |
| if not file_path.startswith(os.path.abspath(skill_path)): | |
| from fastapi import HTTPException | |
| raise HTTPException(status_code=400, detail="Invalid path") | |
| if not os.path.exists(file_path): | |
| from fastapi import HTTPException | |
| raise HTTPException(status_code=404, detail="File not found") | |
| os.remove(file_path) | |
| return {"success": True} | |
| async def update_skill(skill_id: str, update: SkillUpdate): | |
| """Update an existing skill.""" | |
| skill_path = _get_skill_path(skill_id) | |
| md_path = os.path.join(skill_path, "SKILL.md") | |
| with open(md_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| parts = content.split("---", 2) | |
| # Defaults based on existing | |
| current_name = skill_id | |
| current_desc = "" | |
| current_inst = parts[2].strip() if len(parts) >= 3 else content | |
| if len(parts) >= 3: | |
| try: | |
| metadata = yaml.safe_load(parts[1]) or {} | |
| current_desc = metadata.get("description", "") | |
| except: | |
| pass | |
| new_desc = update.description if update.description is not None else current_desc | |
| new_inst = update.instructions if update.instructions is not None else current_inst | |
| # Force name to match skill_id for Agno compatibility | |
| md_content = f"---\nname: {skill_id}\ndescription: {new_desc}\n---\n\n{new_inst}" | |
| with open(md_path, "w", encoding="utf-8") as f: | |
| f.write(md_content) | |
| return SkillInfo(id=skill_id, name=skill_id, description=new_desc) | |
| async def delete_skill(skill_id: str): | |
| """Delete a skill.""" | |
| import shutil | |
| skill_path = _get_skill_path(skill_id) | |
| shutil.rmtree(skill_path) | |
| return {"success": True} | |
| async def get_skill_environment(skill_id: str): | |
| """Return isolated runtime status for a skill.""" | |
| try: | |
| status = get_skill_environment_status(skill_id) | |
| except FileNotFoundError: | |
| raise HTTPException(status_code=404, detail="Skill not found") | |
| return SkillEnvironmentStatus(**status) | |
| async def install_skill_dependency(skill_id: str, req: SkillDependencyInstall): | |
| """ | |
| Install a single dependency into a skill-scoped virtual environment. | |
| This is intentionally narrower than exposing a raw shell tool: | |
| only one package is accepted, the package name is validated, and | |
| installation is restricted to `.skills/<skill_id>/.venv`. | |
| """ | |
| try: | |
| validate_package_name(req.package_name) | |
| except ValueError as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) | |
| try: | |
| return await install_skill_dependency_runtime(skill_id, req.package_name) | |
| except FileNotFoundError: | |
| raise HTTPException(status_code=404, detail="Skill not found") | |
| except RuntimeError as exc: | |
| raise HTTPException( | |
| status_code=500, | |
| detail={"message": f"Failed to install dependency '{req.package_name}'", "stderr": str(exc)}, | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Skill Creator: AI-powered skill generation | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| class SkillGenerate(BaseModel): | |
| """Request body for AI-powered skill generation.""" | |
| prompt: str # Natural language description of the desired skill | |
| skill_id: str | None = None # Optional; generated by LLM if omitted | |
| provider: str = "openai" | |
| base_url: str | None = None | |
| model: str | None = None | |
| async def generate_skill(request: Request, req: SkillGenerate): | |
| """ | |
| Spin up a Skill Builder Agent with FileTools + the internal skill-creator skill, | |
| let it autonomously generate SKILL.md / scripts/*.py / references/*.md, | |
| and return the list of created files. | |
| """ | |
| from agno.agent import Agent | |
| from agno.tools.file import FileTools | |
| from agno.skills import LocalSkills | |
| from ..services.agent_registry import _build_model | |
| # ββ 1. Validate / derive skill_id ββββββββββββββββββββββββββββββββββββββββ | |
| skill_id = req.skill_id or "" | |
| if not skill_id: | |
| # Let the LLM pick one; we'll parse it from the generated SKILL.md later. | |
| # For now generate a safe placeholder that the agent will overwrite. | |
| import uuid | |
| skill_id = f"skill-{uuid.uuid4().hex[:8]}" | |
| skill_id = skill_id.lower() | |
| skill_id = re.sub(r"[^a-z0-9-]", "-", skill_id) | |
| skill_id = re.sub(r"-+", "-", skill_id).strip("-")[:64] | |
| if not re.match(r"^[a-z0-9-]+$", skill_id): | |
| raise HTTPException(status_code=400, detail="Invalid skill_id format") | |
| skills_dir = _get_skills_dir() | |
| skill_path = os.path.join(skills_dir, skill_id) | |
| if os.path.exists(os.path.join(skill_path, "SKILL.md")): | |
| raise HTTPException(status_code=409, detail=f"Skill '{skill_id}' already exists") | |
| # ββ 2. Create the target directory βββββββββββββββββββββββββββββββββββββββ | |
| os.makedirs(skill_path, exist_ok=True) | |
| # ββ 3. Build the Skill Builder Agent βββββββββββββββββββββββββββββββββββββ | |
| api_key = get_llm_api_key(request, {}) | |
| if not api_key: | |
| raise HTTPException(status_code=400, detail="Missing required header: x-llm-api-key") | |
| model = _build_model(req.provider, api_key, req.base_url, req.model) | |
| # FileTools: sandboxed to the new skill directory β agent cannot escape it. | |
| # enable_delete_file=False prevents the agent from destroying its own work. | |
| file_tools = FileTools( | |
| base_dir=Path(skill_path), | |
| enable_save_file=True, | |
| enable_read_file=True, | |
| enable_list_files=True, | |
| enable_delete_file=False, | |
| ) | |
| # LocalSkills: load the internal skill-creator skill so the agent has | |
| # Anthropics-quality meta-knowledge on how to write good Agno skills. | |
| internal_skill_dir = str(_INTERNAL_SKILLS_DIR / "skill-creator") | |
| skills = None | |
| if os.path.isdir(internal_skill_dir): | |
| from agno.skills import Skills | |
| skills = Skills(loaders=[LocalSkills(internal_skill_dir)]) | |
| system_prompt = f"""You are a Skill architect for the Agno AI framework. | |
| Create a complete, production-quality Skill package based on the user's description below. | |
| You have FileTools available. Your working directory is the skill root folder. | |
| DO NOT create a new subdirectory for the skill. Write all files strictly to the current directory (`.`). | |
| ONLY write files at these paths: | |
| - SKILL.md (REQUIRED) | |
| - scripts/<filename>.py (optional β MUST include `#!/usr/bin/env python3` at the top) | |
| - scripts/<filename>.sh (optional β MUST include `#!/bin/bash` at the top) | |
| - references/<filename>.md (optional β only if supplementary docs add value) | |
| CRITICAL WORKFLOW - YOU MUST FOLLOW THESE EXACT STEPS IN ORDER: | |
| 1. FIRST, write the initial `SKILL.md` file. | |
| 2. SECOND, write ANY required files in `scripts/` and `references/`. | |
| 3. THIRD, if you created ANY scripts or references in step 2, you MUST read and rewrite `SKILL.md` to add explicit usage instructions for those new files in the "## Setup & Usage Instructions" section. | |
| Your `SKILL.md` MUST strictly follow this exact markdown structure (Frontmatter followed by content): | |
| ```markdown | |
| --- | |
| name: <a-kebab-cased-cool-skill-name> | |
| description: <trigger phrase + purpose; max 200 chars; be explicit about WHEN to use this skill> | |
| --- | |
| # <Skill Title> | |
| ## Setup & Usage Instructions | |
| <Provide explicit instructions here teaching the operating Agent WHEN and HOW to use the files you generated in `references/` or `scripts/`.> | |
| <Example: "Before proceeding, YOU MUST read `references/guidelines.md`" or "Execute `scripts/voice_enhancer.py` when asked to enhance voice."> | |
| <If you didn't create any scripts/references, explain how the base skill itself should be operated.> | |
| <If your scripts depend on third-party Python packages, explicitly instruct the operating Agent to ask the user for approval via interactive_form before installing any missing dependency.> | |
| ## <Other Actionable Instructions...> | |
| <Add detailed character prompt, tasks, rules, or references here> | |
| ``` | |
| **CRITICAL DIRECTIVES**: | |
| 1. DO NOT CREATE a separate usage guide in `references/`. Make sure the actual instructions are physically written INSIDE the `SKILL.md` file itself. | |
| 2. DO NOT literally copy the word "CRITICAL" or any instructions from this system prompt into the final output. Replace the placeholder segments with your ACTUAL content. | |
| 3. Replace `<Describe the Skill...>` or any placeholders with specific, actionable instructions for the Agent. | |
| After creating all files, call list_files to confirm, then stop. | |
| Do NOT create assets/ or any other directories except scripts/ and references/ inside the current root. | |
| USER'S SKILL REQUEST: | |
| {req.prompt}""" | |
| agent = Agent( | |
| model=model, | |
| tools=[file_tools], | |
| skills=skills, | |
| instructions=system_prompt, | |
| markdown=False, | |
| ) | |
| # ββ 4. Run the agent (non-streaming, blocking) ββββββββββββββββββββββββββββ | |
| try: | |
| agent.run(req.prompt) | |
| except Exception as exc: | |
| # Clean up the partially created directory on failure | |
| import shutil as _shutil | |
| _shutil.rmtree(skill_path, ignore_errors=True) | |
| raise HTTPException(status_code=500, detail=f"Skill generation failed: {exc}") | |
| # ββ 5. Post-process & Collect output ββββββββββββββββββββββββββββββββββββββ | |
| import shutil as _shutil | |
| # Check if the agent created files *inside* a new subdirectory under skill_path | |
| # (Sometimes LLMs ignore the prompt and create `skill-108b5faa/my-skill/SKILL.md`) | |
| # If the root SKILL.md is missing, but there is exactly one subdirectory with a SKILL.md, hoist it. | |
| if not os.path.exists(os.path.join(skill_path, "SKILL.md")): | |
| subdirs = [d for d in os.listdir(skill_path) if os.path.isdir(os.path.join(skill_path, d))] | |
| if len(subdirs) == 1: | |
| nested_dir = os.path.join(skill_path, subdirs[0]) | |
| if os.path.exists(os.path.join(nested_dir, "SKILL.md")): | |
| # Hoist all contents of nested_dir to skill_path | |
| for item in os.listdir(nested_dir): | |
| src = os.path.join(nested_dir, item) | |
| dst = os.path.join(skill_path, item) | |
| _shutil.move(src, dst) | |
| _shutil.rmtree(nested_dir, ignore_errors=True) | |
| # Alternatively, the LLM might have escaped and created a directory in the parent '.skills' folder. | |
| # We can fetch the real name defined in SKILL.md and rename if necessary. | |
| real_skill_id = skill_id | |
| md_path = os.path.join(skill_path, "SKILL.md") | |
| # If SKILL.md is still missing, check if it was created in a sibling directory (by looking at recently created logic) | |
| # However, doing this safely is complex. If it completely failed to create it here, we will raise an error. | |
| if not os.path.exists(md_path): | |
| _shutil.rmtree(skill_path, ignore_errors=True) | |
| raise HTTPException( | |
| status_code=500, | |
| detail="Agent did not produce a SKILL.md in the correct directory. Please try again." | |
| ) | |
| # Extrat the actual ID the LLM put in SKILL.md YAML | |
| try: | |
| with open(md_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| parts = content.split("---", 2) | |
| if len(parts) >= 3: | |
| metadata = yaml.safe_load(parts[1]) or {} | |
| if "name" in metadata and isinstance(metadata["name"], str): | |
| parsed_name = metadata["name"].strip() | |
| parsed_name = re.sub(r"[^a-z0-9-]", "-", parsed_name.lower()).strip("-") | |
| if parsed_name and parsed_name != skill_id: | |
| target_path = os.path.join(skills_dir, parsed_name) | |
| if not os.path.exists(target_path): | |
| # Rename the directory to the nice name | |
| os.rename(skill_path, target_path) | |
| skill_path = target_path | |
| real_skill_id = parsed_name | |
| # rewrite the name inside SKILL.md to match safety requirements | |
| metadata["name"] = real_skill_id | |
| new_md_content = f"---\n{yaml.dump(metadata, sort_keys=False, allow_unicode=True).strip()}\n---\n{parts[2]}" | |
| with open(os.path.join(skill_path, "SKILL.md"), "w", encoding="utf-8") as f: | |
| f.write(new_md_content) | |
| except Exception as e: | |
| print(f"Warning: could not parse/rename generated skill: {e}") | |
| files_created: list[str] = [] | |
| for root, dirnames, filenames in os.walk(skill_path): | |
| dirnames[:] = [d for d in dirnames if not _should_skip_skill_dir(d)] | |
| rel_root = os.path.relpath(root, skill_path) | |
| if rel_root != "." and any(_should_skip_skill_dir(part) for part in rel_root.split(os.sep)): | |
| continue | |
| for fname in filenames: | |
| abs_path = os.path.join(root, fname) | |
| rel_path = os.path.relpath(abs_path, skill_path) | |
| files_created.append(rel_path) | |
| return {"skill_id": real_skill_id, "files_created": sorted(files_created)} | |