Spaces:
Running
Running
| from __future__ import annotations | |
| import html | |
| import re | |
| import os | |
| import shutil | |
| import subprocess | |
| import asyncio | |
| import sys | |
| from pathlib import Path | |
| import logging | |
| import importlib.util | |
| from dataclasses import dataclass | |
| from itertools import zip_longest | |
| from typing import Iterable | |
| # Version and environment constraints for a "green" health check in the demo. | |
| MIN_DENO_VERSION = (2, 0, 0) | |
| MIN_YTDLP_VERSION = (2025, 11, 12) | |
| MIN_FFMPEG_VERSION = (4, 0) | |
| GEMINI_ENV_VAR = "GEMINI_API_KEY" | |
| class ToolStatus: | |
| label: str | |
| ok: bool | |
| detail: str | |
| class HealthReport: | |
| ok: bool | |
| summary: str | |
| tools: list[ToolStatus] | |
| def _split_version(raw: str) -> Iterable[int]: | |
| return tuple(int(part) for part in re.findall(r"\d+", raw)) | |
| def _version_at_least(raw: str, expected: tuple[int, ...]) -> bool: | |
| parsed = _split_version(raw) | |
| if not parsed: | |
| return False | |
| for current, required in zip_longest(parsed, expected, fillvalue=0): | |
| if current > required: | |
| return True | |
| if current < required: | |
| return False | |
| return True | |
| def _check_deno() -> ToolStatus: | |
| label = "Deno" | |
| binary = shutil.which("deno") | |
| if not binary: | |
| return ToolStatus(label, False, "`deno` command not found on PATH") | |
| try: | |
| completed = subprocess.run( | |
| [binary, "--version"], | |
| check=False, | |
| capture_output=True, | |
| text=True, | |
| timeout=10, | |
| ) | |
| except (FileNotFoundError, subprocess.SubprocessError, OSError) as exc: | |
| return ToolStatus(label, False, f"Unable to execute: {exc}") | |
| if completed.returncode != 0: | |
| detail = completed.stderr.strip() or completed.stdout.strip() or "Unknown error" | |
| return ToolStatus(label, False, detail) | |
| match = re.search(r"deno\s+([0-9.]+)", completed.stdout) | |
| if not match: | |
| return ToolStatus(label, False, "Could not parse version output") | |
| version = match.group(1) | |
| if not _version_at_least(version, MIN_DENO_VERSION): | |
| return ToolStatus(label, False, f"Found v{version}; need ≥ {'.'.join(map(str, MIN_DENO_VERSION))}") | |
| return ToolStatus(label, True, f"v{version} ready for yt-dlp") | |
| def _check_yt_dlp_python() -> ToolStatus: | |
| label = "yt-dlp" | |
| try: | |
| import yt_dlp | |
| from yt_dlp import YoutubeDL | |
| from yt_dlp.version import __version__ as version_str | |
| except Exception as exc: | |
| return ToolStatus(label, False, f"Import failed: {exc}") | |
| try: | |
| with YoutubeDL(params={"skip_download": True, "quiet": True}) as ydl: | |
| _ = ydl.params.get("skip_download") | |
| except Exception as exc: | |
| return ToolStatus(label, False, f"YoutubeDL bootstrap failed: {exc}") | |
| if not _version_at_least(version_str, MIN_YTDLP_VERSION): | |
| minimum = ".".join(map(str, MIN_YTDLP_VERSION)) | |
| return ToolStatus(label, False, f"Detected {version_str}; require ≥ {minimum}") | |
| has_ejs = importlib.util.find_spec("yt_dlp_ejs") is not None | |
| if has_ejs: | |
| return ToolStatus(label, True, f"v{version_str}") | |
| return ToolStatus(label, False, "yt_dlp_ejs missing (JS sites will fail)") | |
| def _check_ffmpeg() -> ToolStatus: | |
| label = "ffmpeg" | |
| binary = shutil.which("ffmpeg") | |
| if not binary: | |
| return ToolStatus(label, False, "`ffmpeg` binary not found on PATH") | |
| try: | |
| completed = subprocess.run( | |
| [binary, "-version"], | |
| capture_output=True, | |
| text=True, | |
| check=False, | |
| timeout=5, | |
| ) | |
| except Exception as exc: | |
| return ToolStatus(label, False, f"failed to exec: {exc}") | |
| if completed.returncode != 0: | |
| return ToolStatus(label, False, completed.stderr.strip() or "ffmpeg returned error") | |
| first_line = (completed.stdout or "").splitlines()[0] if completed.stdout else "ffmpeg present" | |
| return ToolStatus(label, True, first_line) | |
| def _check_gemini_env(gemini_api_key: str | None = None) -> ToolStatus: | |
| label = "Gemini API key" | |
| # Prefer an explicit key passed from the UI; fall back to process env. | |
| key = gemini_api_key or os.environ.get(GEMINI_ENV_VAR) | |
| if key: | |
| return ToolStatus(label, True, f"{GEMINI_ENV_VAR} is set") | |
| return ToolStatus(label, False, f"{GEMINI_ENV_VAR} is not set") | |
| def _check_mcp_health(gemini_api_key: str | None = None) -> ToolStatus: | |
| label = "MCP server" | |
| try: | |
| from fastmcp import Client # type: ignore | |
| from fastmcp.client.transports import StdioTransport # type: ignore | |
| except Exception as exc: # pragma: no cover - defensive | |
| return ToolStatus(label, False, f"fastmcp missing: {exc}") | |
| # When running inside the Hugging Face Space, the repo root is the | |
| # working directory; keep this logic in sync with the Dockerfile so | |
| # that imports work both locally and in production. | |
| repo_root = Path(__file__).resolve().parents[1] | |
| mcp_src = repo_root / "mcp" / "src" | |
| existing_py_path = os.environ.get("PYTHONPATH", "") | |
| py_path = ( | |
| f"{mcp_src}{os.pathsep}{existing_py_path}" | |
| if existing_py_path | |
| else str(mcp_src) | |
| ) | |
| env = os.environ.copy() | |
| env["PYTHONPATH"] = py_path | |
| if gemini_api_key: | |
| env[GEMINI_ENV_VAR] = gemini_api_key | |
| server_entry = ["-m", "aileen3_mcp.server"] | |
| logging.warning( | |
| "MCP probe spawning server: cmd=%s args=%s PYTHONPATH=%s cwd=%s", | |
| sys.executable, | |
| server_entry, | |
| py_path, | |
| repo_root, | |
| ) | |
| transport = StdioTransport( | |
| command=sys.executable, | |
| args=server_entry, | |
| env=env, | |
| cwd=str(repo_root), | |
| ) | |
| async def probe() -> ToolStatus: | |
| try: | |
| async with Client(transport) as client: | |
| result = await client.call_tool("health", {}) | |
| except Exception as exc: | |
| logging.warning("MCP health probe failed: %s", exc) | |
| return ToolStatus(label, False, f"Probe failed: {exc}") | |
| # FastMCP returns a CallToolResult; unwrap to the underlying payload if present. | |
| payload = getattr(result, "data", None) or getattr(result, "structured_content", None) or result | |
| if isinstance(payload, dict): | |
| ok = bool(payload.get("ok", False)) | |
| detail = str(payload.get("detail", "no detail")) | |
| else: | |
| ok = False | |
| detail = str(payload) | |
| if not ok: | |
| logging.warning("MCP health probe reported not-ok: %s", detail) | |
| return ToolStatus(label, ok, detail) | |
| return asyncio.run(probe()) | |
| def run_health_report(gemini_api_key: str | None = None) -> HealthReport: | |
| tool_statuses = [ | |
| _check_deno(), | |
| _check_yt_dlp_python(), | |
| _check_ffmpeg(), | |
| _check_gemini_env(gemini_api_key), | |
| _check_mcp_health(gemini_api_key), | |
| ] | |
| ok = all(status.ok for status in tool_statuses) | |
| if ok: | |
| summary = "All systems look good—ready for notebook-style experimentation." | |
| else: | |
| blocking = ", ".join(status.label for status in tool_statuses if not status.ok) | |
| summary = f"Needs attention: {blocking}." | |
| return HealthReport(ok=ok, summary=summary, tools=tool_statuses) | |
| def _sanitize(text: str) -> str: | |
| return html.escape(text, quote=False) | |
| def render_health_notice(gemini_api_key: str | None = None) -> str: | |
| report = run_health_report(gemini_api_key) | |
| state_class, icon = ("health-success", "✅") if report.ok else ("health-fail", "🧯") | |
| bullet_rows = "".join( | |
| f"<li>{'✅' if status.ok else '❌'} {_sanitize(status.label)}: " | |
| f"<strong>{'Ready' if status.ok else 'Needs attention'}</strong>" | |
| f"{'' if not status.detail else ' — ' + _sanitize(status.detail)}</li>" | |
| for status in report.tools | |
| ) | |
| return ( | |
| f"<div class='health-box {state_class}'>" | |
| f"<div class='health-head'>{icon} {_sanitize(report.summary)}</div>" | |
| f"<ul>{bullet_rows}</ul>" | |
| "</div>" | |
| ) | |
| def render_placeholder_notice() -> str: | |
| return ( | |
| "<div class='health-box health-placeholder'>" | |
| "<div class='health-head'>🔍 Let's run a quick health check to verify everything is set up correctly.</div>" | |
| "</div>" | |
| ) | |