File size: 8,248 Bytes
6350667
 
 
 
02ec6d5
6350667
 
02ec6d5
 
 
 
118eb12
6350667
 
 
 
 
0c163b8
6350667
 
6447e9a
f897a90
6350667
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03c7ea3
6350667
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03c7ea3
6350667
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03c7ea3
 
 
6350667
 
6447e9a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f481211
f897a90
f481211
 
 
f897a90
 
 
 
f481211
02ec6d5
 
 
 
 
 
 
0c163b8
 
 
02ec6d5
 
 
 
 
 
 
 
92ae55b
 
f481211
 
5782a2b
118eb12
 
08d3404
5782a2b
08d3404
118eb12
08d3404
5782a2b
08d3404
02ec6d5
 
 
118eb12
92ae55b
5782a2b
02ec6d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f481211
f897a90
 
 
6447e9a
f481211
 
f897a90
6350667
 
 
 
 
 
 
 
 
 
 
 
 
f481211
 
03c7ea3
6350667
03c7ea3
 
 
6350667
 
 
03c7ea3
6350667
 
 
 
 
 
 
 
 
25c4b3f
6350667
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
from __future__ import annotations

import html
import re
import os
import shutil
import subprocess
import asyncio
import sys
from pathlib import Path
import logging
import importlib.util
from dataclasses import dataclass
from itertools import zip_longest
from typing import Iterable


# Version and environment constraints for a "green" health check in the demo.
MIN_DENO_VERSION = (2, 0, 0)
MIN_YTDLP_VERSION = (2025, 11, 12)
MIN_FFMPEG_VERSION = (4, 0)
GEMINI_ENV_VAR = "GEMINI_API_KEY"


@dataclass
class ToolStatus:
    label: str
    ok: bool
    detail: str


@dataclass
class HealthReport:
    ok: bool
    summary: str
    tools: list[ToolStatus]


def _split_version(raw: str) -> Iterable[int]:
    return tuple(int(part) for part in re.findall(r"\d+", raw))


def _version_at_least(raw: str, expected: tuple[int, ...]) -> bool:
    parsed = _split_version(raw)
    if not parsed:
        return False
    for current, required in zip_longest(parsed, expected, fillvalue=0):
        if current > required:
            return True
        if current < required:
            return False
    return True


def _check_deno() -> ToolStatus:
    label = "Deno"
    binary = shutil.which("deno")
    if not binary:
        return ToolStatus(label, False, "`deno` command not found on PATH")
    try:
        completed = subprocess.run(
            [binary, "--version"],
            check=False,
            capture_output=True,
            text=True,
            timeout=10,
        )
    except (FileNotFoundError, subprocess.SubprocessError, OSError) as exc:
        return ToolStatus(label, False, f"Unable to execute: {exc}")
    if completed.returncode != 0:
        detail = completed.stderr.strip() or completed.stdout.strip() or "Unknown error"
        return ToolStatus(label, False, detail)
    match = re.search(r"deno\s+([0-9.]+)", completed.stdout)
    if not match:
        return ToolStatus(label, False, "Could not parse version output")
    version = match.group(1)
    if not _version_at_least(version, MIN_DENO_VERSION):
        return ToolStatus(label, False, f"Found v{version}; need ≥ {'.'.join(map(str, MIN_DENO_VERSION))}")
    return ToolStatus(label, True, f"v{version} ready for yt-dlp")


def _check_yt_dlp_python() -> ToolStatus:
    label = "yt-dlp"
    try:
        import yt_dlp
        from yt_dlp import YoutubeDL
        from yt_dlp.version import __version__ as version_str
    except Exception as exc:
        return ToolStatus(label, False, f"Import failed: {exc}")

    try:
        with YoutubeDL(params={"skip_download": True, "quiet": True}) as ydl:
            _ = ydl.params.get("skip_download")
    except Exception as exc:
        return ToolStatus(label, False, f"YoutubeDL bootstrap failed: {exc}")

    if not _version_at_least(version_str, MIN_YTDLP_VERSION):
        minimum = ".".join(map(str, MIN_YTDLP_VERSION))
        return ToolStatus(label, False, f"Detected {version_str}; require ≥ {minimum}")

    has_ejs = importlib.util.find_spec("yt_dlp_ejs") is not None
    if has_ejs:
        return ToolStatus(label, True, f"v{version_str}")
    return ToolStatus(label, False, "yt_dlp_ejs missing (JS sites will fail)")


def _check_ffmpeg() -> ToolStatus:
    label = "ffmpeg"
    binary = shutil.which("ffmpeg")
    if not binary:
        return ToolStatus(label, False, "`ffmpeg` binary not found on PATH")
    try:
        completed = subprocess.run(
            [binary, "-version"],
            capture_output=True,
            text=True,
            check=False,
            timeout=5,
        )
    except Exception as exc:
        return ToolStatus(label, False, f"failed to exec: {exc}")
    if completed.returncode != 0:
        return ToolStatus(label, False, completed.stderr.strip() or "ffmpeg returned error")
    first_line = (completed.stdout or "").splitlines()[0] if completed.stdout else "ffmpeg present"
    return ToolStatus(label, True, first_line)


def _check_gemini_env(gemini_api_key: str | None = None) -> ToolStatus:
    label = "Gemini API key"
    # Prefer an explicit key passed from the UI; fall back to process env.
    key = gemini_api_key or os.environ.get(GEMINI_ENV_VAR)
    if key:
        return ToolStatus(label, True, f"{GEMINI_ENV_VAR} is set")
    return ToolStatus(label, False, f"{GEMINI_ENV_VAR} is not set")


def _check_mcp_health(gemini_api_key: str | None = None) -> ToolStatus:
    label = "MCP server"
    try:
        from fastmcp import Client  # type: ignore
        from fastmcp.client.transports import StdioTransport  # type: ignore
    except Exception as exc:  # pragma: no cover - defensive
        return ToolStatus(label, False, f"fastmcp missing: {exc}")

    # When running inside the Hugging Face Space, the repo root is the
    # working directory; keep this logic in sync with the Dockerfile so
    # that imports work both locally and in production.
    repo_root = Path(__file__).resolve().parents[1]
    mcp_src = repo_root / "mcp" / "src"
    existing_py_path = os.environ.get("PYTHONPATH", "")
    py_path = (
        f"{mcp_src}{os.pathsep}{existing_py_path}"
        if existing_py_path
        else str(mcp_src)
    )
    env = os.environ.copy()
    env["PYTHONPATH"] = py_path
    if gemini_api_key:
        env[GEMINI_ENV_VAR] = gemini_api_key

    server_entry = ["-m", "aileen3_mcp.server"]

    logging.warning(
        "MCP probe spawning server: cmd=%s args=%s PYTHONPATH=%s cwd=%s",
        sys.executable,
        server_entry,
        py_path,
        repo_root,
    )

    transport = StdioTransport(
        command=sys.executable,
        args=server_entry,
        env=env,
        cwd=str(repo_root),
    )

    async def probe() -> ToolStatus:
        try:
            async with Client(transport) as client:
                result = await client.call_tool("health", {})
        except Exception as exc:
            logging.warning("MCP health probe failed: %s", exc)
            return ToolStatus(label, False, f"Probe failed: {exc}")

        # FastMCP returns a CallToolResult; unwrap to the underlying payload if present.
        payload = getattr(result, "data", None) or getattr(result, "structured_content", None) or result

        if isinstance(payload, dict):
            ok = bool(payload.get("ok", False))
            detail = str(payload.get("detail", "no detail"))
        else:
            ok = False
            detail = str(payload)

        if not ok:
            logging.warning("MCP health probe reported not-ok: %s", detail)
        return ToolStatus(label, ok, detail)

    return asyncio.run(probe())


def run_health_report(gemini_api_key: str | None = None) -> HealthReport:
    tool_statuses = [
        _check_deno(),
        _check_yt_dlp_python(),
        _check_ffmpeg(),
        _check_gemini_env(gemini_api_key),
        _check_mcp_health(gemini_api_key),
    ]
    ok = all(status.ok for status in tool_statuses)
    if ok:
        summary = "All systems look good—ready for notebook-style experimentation."
    else:
        blocking = ", ".join(status.label for status in tool_statuses if not status.ok)
        summary = f"Needs attention: {blocking}."
    return HealthReport(ok=ok, summary=summary, tools=tool_statuses)


def _sanitize(text: str) -> str:
    return html.escape(text, quote=False)


def render_health_notice(gemini_api_key: str | None = None) -> str:
    report = run_health_report(gemini_api_key)
    state_class, icon = ("health-success", "✅") if report.ok else ("health-fail", "🧯")
    bullet_rows = "".join(
        f"<li>{'✅' if status.ok else '❌'} {_sanitize(status.label)}: "
        f"<strong>{'Ready' if status.ok else 'Needs attention'}</strong>"
        f"{'' if not status.detail else ' — ' + _sanitize(status.detail)}</li>"
        for status in report.tools
    )
    return (
        f"<div class='health-box {state_class}'>"
        f"<div class='health-head'>{icon} {_sanitize(report.summary)}</div>"
        f"<ul>{bullet_rows}</ul>"
        "</div>"
    )


def render_placeholder_notice() -> str:
    return (
        "<div class='health-box health-placeholder'>"
        "<div class='health-head'>🔍 Let's run a quick health check to verify everything is set up correctly.</div>"
        "</div>"
    )