File size: 8,678 Bytes
4ef118d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
"""
Shared helpers for skill-scoped virtual environments and dependency installation.
"""

from __future__ import annotations

import asyncio
import os
import re
import shutil
import sys
from typing import Any


def get_skills_dir() -> str:
    return os.path.join(os.path.dirname(__file__), "..", "..", ".skills")


def get_skill_path(skill_id: str) -> str:
    # 1. Check external (.skills) directory
    ext_path = os.path.join(get_skills_dir(), skill_id)
    if os.path.isdir(ext_path):
        return ext_path
    
    # 2. Check internal (_internal_skills) directory
    # Path relative to source file src/services/skill_runtime.py
    int_skills_dir = os.path.join(os.path.dirname(__file__), "..", "_internal_skills")
    int_path = os.path.normpath(os.path.join(int_skills_dir, skill_id))
    if os.path.isdir(int_path):
        return int_path

    raise FileNotFoundError(f"Skill '{skill_id}' not found")


def get_skill_venv_path(skill_path: str) -> str:
    return os.path.join(skill_path, ".venv")


def get_venv_python_path(venv_path: str) -> str:
    windows_path = os.path.join(venv_path, "Scripts", "python.exe")
    if os.path.exists(windows_path):
        return windows_path
    return os.path.join(venv_path, "bin", "python")


def should_skip_skill_dir(dirname: str) -> bool:
    return dirname in {".venv", "__pycache__"}


def validate_package_name(package_name: str) -> str:
    trimmed = str(package_name or "").strip()
    if not re.fullmatch(r"^[A-Za-z0-9-]+$", trimmed):
        raise ValueError("Invalid package_name. Only letters, numbers, and hyphens are allowed.")
    return trimmed


async def run_subprocess(cmd: list[str], cwd: str) -> tuple[int, str, str]:
    process = await asyncio.create_subprocess_exec(
        *cmd,
        cwd=cwd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await process.communicate()
    return process.returncode, stdout.decode("utf-8", errors="replace"), stderr.decode(
        "utf-8", errors="replace"
    )


async def install_with_available_pip(
    *,
    target_python: str,
    packages: list[str],
    cwd: str,
) -> tuple[int, str, str, str]:
    commands: list[tuple[str, list[str]]] = [
        ("venv-pip", [target_python, "-m", "pip", "install", *packages]),
        (
            "host-pip-target-python",
            [sys.executable, "-m", "pip", "--python", target_python, "install", *packages],
        ),
    ]

    uv_path = shutil.which("uv")
    if uv_path:
        commands.append(
            ("uv-pip", [uv_path, "pip", "install", "--python", target_python, *packages])
        )

    attempts: list[str] = []
    for label, cmd in commands:
        code, stdout, stderr = await run_subprocess(cmd, cwd=cwd)
        if code == 0:
            return code, stdout, stderr, label
        details = stderr.strip() or stdout.strip() or f"{label} failed"
        attempts.append(f"{label}: {details}")

    raise RuntimeError(
        "Dependency installation failed for the isolated environment. Attempts: "
        + " | ".join(attempts)
    )


async def run_subprocess_with_timeout(
    cmd: list[str],
    cwd: str,
    timeout_seconds: float = 60.0,
) -> tuple[int, str, str]:
    process = await asyncio.create_subprocess_exec(
        *cmd,
        cwd=cwd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    try:
        stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout_seconds)
    except asyncio.TimeoutError:
        process.kill()
        await process.communicate()
        raise RuntimeError(f"Script execution timed out after {timeout_seconds:.1f}s")
    return process.returncode, stdout.decode("utf-8", errors="replace"), stderr.decode(
        "utf-8", errors="replace"
    )


async def ensure_skill_venv(skill_path: str) -> tuple[str, bool]:
    venv_path = get_skill_venv_path(skill_path)
    python_path = get_venv_python_path(venv_path)
    if os.path.exists(python_path):
        return python_path, False

    os.makedirs(skill_path, exist_ok=True)
    code, _, stderr = await run_subprocess([sys.executable, "-m", "venv", venv_path], cwd=skill_path)
    if code != 0:
        raise RuntimeError(f"Failed to create isolated environment: {stderr.strip() or 'unknown error'}")

    python_path = get_venv_python_path(venv_path)
    if not os.path.exists(python_path):
        raise RuntimeError("Virtual environment was created without a Python executable")

    return python_path, True


def get_skill_environment_status(skill_id: str) -> dict[str, Any]:
    skill_path = get_skill_path(skill_id)
    venv_path = get_skill_venv_path(skill_path)
    python_path = get_venv_python_path(venv_path)
    python_exists = os.path.exists(python_path)
    return {
        "skill_id": skill_id,
        "venv_exists": python_exists,
        "python_path": python_path if python_exists else None,
        "scripts_dir_exists": os.path.isdir(os.path.join(skill_path, "scripts")),
    }


async def install_skill_dependency(skill_id: str, package_name: str) -> dict[str, Any]:
    skill_path = get_skill_path(skill_id)
    validated_package = validate_package_name(package_name)
    python_path, created = await ensure_skill_venv(skill_path)
    code, stdout, stderr, installer = await install_with_available_pip(
        target_python=python_path,
        packages=[validated_package],
        cwd=skill_path,
    )

    return {
        "success": True,
        "skill_id": skill_id,
        "package_name": validated_package,
        "venv_created": created,
        "python_path": python_path,
        "installer": installer,
        "stdout": stdout.strip(),
    }


def resolve_skill_script_path(skill_id: str, script_path: str) -> tuple[str, str]:
    skill_path = get_skill_path(skill_id)
    normalized_rel = str(script_path or "").strip().replace("\\", "/")
    if not normalized_rel:
        raise ValueError("script_path is required")
    
    scripts_root = os.path.abspath(os.path.join(skill_path, "scripts"))
    
    # Try the original path first
    abs_path = os.path.abspath(os.path.join(skill_path, normalized_rel))
    
    # If not found and doesn't already start with scripts/, try prepending scripts/
    if not os.path.isfile(abs_path) and not normalized_rel.startswith("scripts/"):
        alt_path = os.path.abspath(os.path.join(scripts_root, normalized_rel))
        if os.path.isfile(alt_path):
            abs_path = alt_path

    # Security Check: Must stay inside scripts_root
    if not abs_path.startswith(scripts_root + os.sep) and abs_path != scripts_root:
        raise ValueError(f"Security error: script_path '{normalized_rel}' must stay inside the skill's scripts directory")
        
    if not os.path.isfile(abs_path):
        raise FileNotFoundError(f"Script '{normalized_rel}' not found in skill '{skill_id}' (searched in scripts/ directory)")
        
    return skill_path, abs_path


def build_skill_script_command(skill_id: str, script_path: str, args: list[str] | None = None) -> tuple[str, list[str]]:
    skill_path, abs_path = resolve_skill_script_path(skill_id, script_path)
    ext = os.path.splitext(abs_path)[1].lower()
    cmd_args = [str(a) for a in (args or [])]
    venv_python = get_venv_python_path(get_skill_venv_path(skill_path))

    if ext == ".py":
        python_cmd = venv_python if os.path.exists(venv_python) else sys.executable
        return skill_path, [python_cmd, abs_path, *cmd_args]
    if ext in {".sh", ".bash"}:
        return skill_path, ["bash", abs_path, *cmd_args]

    with open(abs_path, "r", encoding="utf-8", errors="ignore") as handle:
        first_line = handle.readline().strip()

    if "python" in first_line:
        python_cmd = venv_python if os.path.exists(venv_python) else sys.executable
        return skill_path, [python_cmd, abs_path, *cmd_args]
    if "bash" in first_line or "sh" in first_line:
        return skill_path, ["bash", abs_path, *cmd_args]

    return skill_path, [abs_path, *cmd_args]


async def execute_skill_script(
    skill_id: str,
    script_path: str,
    args: list[str] | None = None,
    timeout_seconds: float = 60.0,
) -> dict[str, Any]:
    skill_path, cmd = build_skill_script_command(skill_id, script_path, args)
    code, stdout, stderr = await run_subprocess_with_timeout(cmd, cwd=skill_path, timeout_seconds=timeout_seconds)
    return {
        "success": code == 0,
        "skill_id": skill_id,
        "script_path": script_path,
        "args": [str(a) for a in (args or [])],
        "command": cmd,
        "exit_code": code,
        "stdout": stdout.strip(),
        "stderr": stderr.strip(),
    }