Spaces:
Sleeping
Sleeping
| """Providers for launching Hugging Face Spaces via ``uv run``.""" | |
| from __future__ import annotations | |
| import os | |
| import socket | |
| import subprocess | |
| import time | |
| from dataclasses import dataclass, field | |
| from typing import Dict, Optional | |
| import requests | |
| from .providers import ContainerProvider | |
| def _poll_health(health_url: str, timeout_s: float) -> None: | |
| """Poll a health endpoint until it returns HTTP 200 or times out.""" | |
| deadline = time.time() + timeout_s | |
| while time.time() < deadline: | |
| try: | |
| response = requests.get(health_url, timeout=2.0) | |
| if response.status_code == 200: | |
| return | |
| except requests.RequestException: | |
| pass | |
| time.sleep(0.5) | |
| raise TimeoutError( | |
| f"Server did not become ready within {timeout_s:.1f} seconds" | |
| ) | |
| def _create_uv_command( | |
| space_id: str, | |
| host: str, | |
| port: int, | |
| reload: bool, | |
| project_url: Optional[str] = None, | |
| ) -> list[str]: | |
| command = [ | |
| "uv", | |
| "run", | |
| "--project", | |
| project_url or f"git+https://huggingface.co/spaces/{space_id}", | |
| "--", | |
| "server", | |
| "--host", | |
| host, | |
| "--port", | |
| str(port), | |
| ] | |
| if reload: | |
| command.append("--reload") | |
| return command | |
| class UVProvider(ContainerProvider): | |
| """ContainerProvider implementation backed by ``uv run``.""" | |
| space_id: str | |
| host: str = "0.0.0.0" | |
| port: Optional[int] = None | |
| reload: bool = False | |
| project_url: Optional[str] = None | |
| connect_host: Optional[str] = None | |
| extra_env: Optional[Dict[str, str]] = None | |
| context_timeout_s: float = 60.0 | |
| _process: subprocess.Popen | None = field(init=False, default=None) | |
| _base_url: str | None = field(init=False, default=None) | |
| def start_container( | |
| self, | |
| image: str, | |
| port: Optional[int] = None, | |
| env_vars: Optional[Dict[str, str]] = None, | |
| **_: Dict[str, str], | |
| ) -> str: | |
| if self._process is not None and self._process.poll() is None: | |
| raise RuntimeError("UVProvider is already running") | |
| self.space_id = image or self.space_id | |
| bind_port = port or self.port or self._find_free_port() | |
| command = _create_uv_command( | |
| self.space_id, | |
| self.host, | |
| bind_port, | |
| self.reload, | |
| project_url=self.project_url, | |
| ) | |
| env = os.environ.copy() | |
| if self.extra_env: | |
| env.update(self.extra_env) | |
| if env_vars: | |
| env.update(env_vars) | |
| try: | |
| self._process = subprocess.Popen(command, env=env) | |
| except FileNotFoundError as exc: | |
| raise RuntimeError( | |
| "`uv` executable not found. Install uv from " | |
| "https://github.com/astral-sh/uv and ensure it is on PATH." | |
| ) from exc | |
| except OSError as exc: | |
| raise RuntimeError(f"Failed to launch `uv run`: {exc}") from exc | |
| client_host = self.connect_host or ( | |
| "127.0.0.1" if self.host in {"0.0.0.0", "::"} else self.host | |
| ) | |
| self._base_url = f"http://{client_host}:{bind_port}" | |
| self.port = bind_port | |
| return self._base_url | |
| def wait_for_ready(self, base_url: str, timeout_s: float = 60.0) -> None: | |
| if self._process and self._process.poll() is not None: | |
| code = self._process.returncode | |
| raise RuntimeError( | |
| f"uv process exited prematurely with code {code}" | |
| ) | |
| _poll_health(f"{base_url}/health", timeout_s) | |
| def stop_container(self) -> None: | |
| if self._process is None: | |
| return | |
| if self._process.poll() is None: | |
| self._process.terminate() | |
| try: | |
| self._process.wait(timeout=10.0) | |
| except subprocess.TimeoutExpired: | |
| self._process.kill() | |
| self._process.wait(timeout=5.0) | |
| self._process = None | |
| self._base_url = None | |
| def start(self) -> str: | |
| return self.start_container(self.space_id, port=self.port) | |
| def stop(self) -> None: | |
| self.stop_container() | |
| def wait_for_ready_default(self, timeout_s: float | None = None) -> None: | |
| if self._base_url is None: | |
| raise RuntimeError("UVProvider has not been started") | |
| self.wait_for_ready( | |
| self._base_url, | |
| timeout_s or self.context_timeout_s, | |
| ) | |
| def close(self) -> None: | |
| self.stop_container() | |
| def __enter__(self) -> "UVProvider": | |
| if self._base_url is None: | |
| base_url = self.start_container(self.space_id, port=self.port) | |
| self.wait_for_ready(base_url, timeout_s=self.context_timeout_s) | |
| return self | |
| def __exit__(self, exc_type, exc, tb) -> None: | |
| self.stop_container() | |
| def _find_free_port(self) -> int: | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | |
| sock.bind(("", 0)) | |
| sock.listen(1) | |
| return sock.getsockname()[1] | |
| def base_url(self) -> str: | |
| if self._base_url is None: | |
| raise RuntimeError("UVProvider has not been started") | |
| return self._base_url | |