| |
| |
| |
| |
| |
|
|
| """ |
| Container provider abstractions for running environment servers. |
| |
| This module provides a pluggable architecture for different container providers |
| (local Docker, Kubernetes, cloud providers, etc.) to be used with EnvClient. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from abc import ABC, abstractmethod |
| from typing import Any, Dict, Optional, Sequence |
|
|
|
|
| class ContainerProvider(ABC): |
| """ |
| Abstract base class for container providers. |
| |
| Providers implement this interface to support different container platforms: |
| - LocalDockerProvider: Runs containers on local Docker daemon |
| - KubernetesProvider: Runs containers in Kubernetes cluster |
| - FargateProvider: Runs containers on AWS Fargate |
| - CloudRunProvider: Runs containers on Google Cloud Run |
| |
| The provider manages a single container lifecycle and provides the base URL |
| for connecting to it. |
| |
| Example: |
| >>> provider = LocalDockerProvider() |
| >>> base_url = provider.start_container("echo-env:latest") |
| >>> print(base_url) # http://localhost:8000 |
| >>> # Use the environment via base_url |
| >>> provider.stop_container() |
| """ |
|
|
| @abstractmethod |
| def start_container( |
| self, |
| image: str, |
| port: Optional[int] = None, |
| env_vars: Optional[Dict[str, str]] = None, |
| **kwargs: Any, |
| ) -> str: |
| """ |
| Start a container from the specified image. |
| |
| Args: |
| image: Container image name (e.g., "echo-env:latest") |
| port: Port to expose (if None, provider chooses) |
| env_vars: Environment variables to pass to container |
| **kwargs: Provider-specific options |
| |
| Returns: |
| Base URL to connect to the container (e.g., "http://localhost:8000") |
| |
| Raises: |
| RuntimeError: If container fails to start |
| """ |
| pass |
|
|
| @abstractmethod |
| def stop_container(self) -> None: |
| """ |
| Stop and remove the running container. |
| |
| This cleans up the container that was started by start_container(). |
| """ |
| pass |
|
|
| @abstractmethod |
| def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None: |
| """ |
| Wait for the container to be ready to accept requests. |
| |
| This typically polls the /health endpoint until it returns 200. |
| |
| Args: |
| base_url: Base URL of the container |
| timeout_s: Maximum time to wait |
| |
| Raises: |
| TimeoutError: If container doesn't become ready in time |
| """ |
| pass |
|
|
|
|
| class LocalDockerProvider(ContainerProvider): |
| """ |
| Container provider for local Docker daemon. |
| |
| This provider runs containers on the local machine using Docker. |
| Useful for development and testing. |
| |
| Example: |
| >>> provider = LocalDockerProvider() |
| >>> base_url = provider.start_container("echo-env:latest") |
| >>> # Container running on http://localhost:<random-port> |
| >>> provider.stop_container() |
| """ |
|
|
| def __init__(self): |
| """Initialize the local Docker provider.""" |
| self._container_id: Optional[str] = None |
| self._container_name: Optional[str] = None |
|
|
| |
| import subprocess |
|
|
| try: |
| subprocess.run( |
| ["docker", "version"], |
| check=True, |
| capture_output=True, |
| timeout=5, |
| ) |
| except ( |
| subprocess.CalledProcessError, |
| FileNotFoundError, |
| subprocess.TimeoutExpired, |
| ): |
| raise RuntimeError( |
| "Docker is not available. Please install Docker Desktop or Docker Engine." |
| ) |
|
|
| def start_container( |
| self, |
| image: str, |
| port: Optional[int] = None, |
| env_vars: Optional[Dict[str, str]] = None, |
| **kwargs: Any, |
| ) -> str: |
| """ |
| Start a Docker container locally. |
| |
| Args: |
| image: Docker image name |
| port: Port to expose (if None, finds available port) |
| env_vars: Environment variables for the container |
| **kwargs: Additional Docker run options |
| |
| Returns: |
| Base URL to connect to the container |
| """ |
| import subprocess |
| import time |
|
|
| |
| if port is None: |
| port = self._find_available_port() |
|
|
| |
| self._container_name = self._generate_container_name(image) |
|
|
| |
| cmd = [ |
| "docker", |
| "run", |
| "-d", |
| "--name", |
| self._container_name, |
| "-p", |
| f"{port}:8000", |
| ] |
|
|
| |
| if env_vars: |
| for key, value in env_vars.items(): |
| cmd.extend(["-e", f"{key}={value}"]) |
|
|
| |
| cmd.append(image) |
|
|
| |
| try: |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
| self._container_id = result.stdout.strip() |
| except subprocess.CalledProcessError as e: |
| error_msg = f"Failed to start Docker container.\nCommand: {' '.join(cmd)}\nExit code: {e.returncode}\nStderr: {e.stderr}\nStdout: {e.stdout}" |
| raise RuntimeError(error_msg) from e |
|
|
| |
| time.sleep(1) |
|
|
| base_url = f"http://localhost:{port}" |
| return base_url |
|
|
| def stop_container(self) -> None: |
| """ |
| Stop and remove the Docker container. |
| """ |
| if self._container_id is None: |
| return |
|
|
| import subprocess |
|
|
| try: |
| |
| subprocess.run( |
| ["docker", "stop", self._container_id], |
| capture_output=True, |
| check=True, |
| timeout=10, |
| ) |
|
|
| |
| subprocess.run( |
| ["docker", "rm", self._container_id], |
| capture_output=True, |
| check=True, |
| timeout=10, |
| ) |
| except subprocess.CalledProcessError: |
| |
| pass |
| finally: |
| self._container_id = None |
| self._container_name = None |
|
|
| def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None: |
| """ |
| Wait for container to be ready by polling /health endpoint. |
| |
| Args: |
| base_url: Base URL of the container |
| timeout_s: Maximum time to wait |
| |
| Raises: |
| TimeoutError: If container doesn't become ready |
| """ |
| import time |
|
|
| import requests |
|
|
| start_time = time.time() |
| health_url = f"{base_url}/health" |
|
|
| |
| proxies = {"http": None, "https": None} |
|
|
| while time.time() - start_time < timeout_s: |
| try: |
| response = requests.get(health_url, timeout=2.0, proxies=proxies) |
| if response.status_code == 200: |
| return |
| except requests.RequestException: |
| pass |
|
|
| time.sleep(0.5) |
|
|
| raise TimeoutError( |
| f"Container at {base_url} did not become ready within {timeout_s}s" |
| ) |
|
|
| def _find_available_port(self) -> int: |
| """ |
| Find an available port on localhost. |
| |
| Returns: |
| An available port number |
| """ |
| import socket |
|
|
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
| s.bind(("", 0)) |
| s.listen(1) |
| port = s.getsockname()[1] |
| return port |
|
|
| def _generate_container_name(self, image: str) -> str: |
| """ |
| Generate a unique container name based on image name and timestamp. |
| |
| Args: |
| image: Docker image name |
| |
| Returns: |
| A unique container name |
| """ |
| import time |
|
|
| clean_image = image.split("/")[-1].split(":")[0] |
| timestamp = int(time.time() * 1000) |
| return f"{clean_image}-{timestamp}" |
|
|
|
|
| class DockerSwarmProvider(ContainerProvider): |
| """ |
| Container provider that uses Docker Swarm services for local concurrency. |
| |
| This provider creates a replicated Swarm service backed by the local Docker |
| engine. The built-in load-balancer fans requests across the replicas, |
| allowing multiple container instances to run concurrently on the developer |
| workstation (mirroring the workflow described in the Docker stack docs). |
| """ |
|
|
| def __init__( |
| self, |
| *, |
| auto_init_swarm: bool = True, |
| overlay_network: Optional[str] = None, |
| ): |
| """ |
| Args: |
| auto_init_swarm: Whether to call ``docker swarm init`` when Swarm |
| is not active. Otherwise, user must manually initialize Swarm. |
| overlay_network: Optional overlay network name for the service. |
| When provided, the network is created with |
| ``docker network create --driver overlay --attachable`` if it |
| does not already exist. |
| """ |
| self._service_name: Optional[str] = None |
| self._service_id: Optional[str] = None |
| self._published_port: Optional[int] = None |
| self._overlay_network = overlay_network |
| self._auto_init_swarm = auto_init_swarm |
|
|
| self._ensure_docker_available() |
| self._ensure_swarm_initialized() |
| if self._overlay_network: |
| self._ensure_overlay_network(self._overlay_network) |
|
|
| def start_container( |
| self, |
| image: str, |
| port: Optional[int] = None, |
| env_vars: Optional[Dict[str, str]] = None, |
| **kwargs: Any, |
| ) -> str: |
| """ |
| Start (or scale) a Swarm service for the given image. |
| |
| Supported kwargs: |
| replicas (int): Number of container replicas (default: 2). |
| cpu_limit (float | str): CPU limit passed to ``--limit-cpu``. |
| memory_limit (str): Memory limit passed to ``--limit-memory``. |
| constraints (Sequence[str]): Placement constraints. |
| labels (Dict[str, str]): Service labels. |
| command (Sequence[str] | str): Override container command. |
| """ |
| import shlex |
| import subprocess |
| import time |
|
|
| allowed_kwargs = { |
| "replicas", |
| "cpu_limit", |
| "memory_limit", |
| "constraints", |
| "labels", |
| "command", |
| } |
| unknown = set(kwargs) - allowed_kwargs |
| if unknown: |
| raise ValueError(f"Unsupported kwargs for DockerSwarmProvider: {unknown}") |
|
|
| replicas = int(kwargs.get("replicas", 2)) |
| cpu_limit = kwargs.get("cpu_limit") |
| memory_limit = kwargs.get("memory_limit") |
| constraints: Optional[Sequence[str]] = kwargs.get("constraints") |
| labels: Optional[Dict[str, str]] = kwargs.get("labels") |
| command_override = kwargs.get("command") |
|
|
| if port is None: |
| port = self._find_available_port() |
|
|
| self._service_name = self._generate_service_name(image) |
| self._published_port = port |
|
|
| cmd = [ |
| "docker", |
| "service", |
| "create", |
| "--detach", |
| "--name", |
| self._service_name, |
| "--replicas", |
| str(max(1, replicas)), |
| "--publish", |
| f"{port}:8000", |
| ] |
|
|
| if self._overlay_network: |
| cmd.extend(["--network", self._overlay_network]) |
|
|
| if env_vars: |
| for key, value in env_vars.items(): |
| cmd.extend(["--env", f"{key}={value}"]) |
|
|
| if cpu_limit is not None: |
| cmd.extend(["--limit-cpu", str(cpu_limit)]) |
|
|
| if memory_limit is not None: |
| cmd.extend(["--limit-memory", str(memory_limit)]) |
|
|
| if constraints: |
| for constraint in constraints: |
| cmd.extend(["--constraint", constraint]) |
|
|
| if labels: |
| for key, value in labels.items(): |
| cmd.extend(["--label", f"{key}={value}"]) |
|
|
| cmd.append(image) |
|
|
| if command_override: |
| if isinstance(command_override, str): |
| cmd.extend(shlex.split(command_override)) |
| else: |
| cmd.extend(command_override) |
|
|
| try: |
| result = subprocess.run( |
| cmd, |
| capture_output=True, |
| text=True, |
| check=True, |
| ) |
| self._service_id = result.stdout.strip() |
| except subprocess.CalledProcessError as e: |
| error_msg = ( |
| "Failed to start Docker Swarm service.\n" |
| f"Command: {' '.join(cmd)}\n" |
| f"Exit code: {e.returncode}\n" |
| f"Stdout: {e.stdout}\n" |
| f"Stderr: {e.stderr}" |
| ) |
| raise RuntimeError(error_msg) from e |
|
|
| |
| time.sleep(1.0) |
|
|
| return f"http://localhost:{port}" |
|
|
| def stop_container(self) -> None: |
| """ |
| Remove the Swarm service (and keep the Swarm manager running). |
| """ |
| if not self._service_name: |
| return |
|
|
| import subprocess |
|
|
| try: |
| subprocess.run( |
| ["docker", "service", "rm", self._service_name], |
| capture_output=True, |
| check=True, |
| timeout=10, |
| ) |
| except subprocess.CalledProcessError: |
| |
| pass |
| finally: |
| self._service_name = None |
| self._service_id = None |
| self._published_port = None |
|
|
| def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None: |
| """ |
| Wait for at least one replica to become healthy by polling /health. |
| |
| Note: With Swarm's load balancer, requests round-robin across replicas, |
| so this only verifies that at least one replica is responding. Some |
| replicas may still be starting when this returns. |
| """ |
| import time |
|
|
| import requests |
|
|
| deadline = time.time() + timeout_s |
| health_url = f"{base_url}/health" |
|
|
| |
| proxies = {"http": None, "https": None} |
|
|
| while time.time() < deadline: |
| try: |
| response = requests.get(health_url, timeout=2.0, proxies=proxies) |
| if response.status_code == 200: |
| return |
| except requests.RequestException: |
| pass |
|
|
| time.sleep(0.5) |
|
|
| raise TimeoutError( |
| f"Swarm service at {base_url} did not become ready within {timeout_s}s" |
| ) |
|
|
| def _ensure_docker_available(self) -> None: |
| import subprocess |
|
|
| try: |
| subprocess.run( |
| ["docker", "version"], |
| check=True, |
| capture_output=True, |
| timeout=5, |
| ) |
| except ( |
| subprocess.CalledProcessError, |
| FileNotFoundError, |
| subprocess.TimeoutExpired, |
| ) as exc: |
| raise RuntimeError( |
| "Docker is not available. Please install Docker Desktop or Docker Engine." |
| ) from exc |
|
|
| def _ensure_swarm_initialized(self) -> None: |
| import subprocess |
|
|
| try: |
| result = subprocess.run( |
| ["docker", "info", "--format", "{{.Swarm.LocalNodeState}}"], |
| capture_output=True, |
| text=True, |
| check=True, |
| timeout=5, |
| ) |
| state = result.stdout.strip().lower() |
| if state == "active": |
| return |
| except subprocess.CalledProcessError: |
| state = "unknown" |
|
|
| if not self._auto_init_swarm: |
| raise RuntimeError( |
| f"Docker Swarm is not active (state={state}). Enable Swarm manually or pass auto_init_swarm=True." |
| ) |
|
|
| try: |
| subprocess.run( |
| ["docker", "swarm", "init"], |
| check=True, |
| capture_output=True, |
| timeout=10, |
| ) |
| except subprocess.CalledProcessError as e: |
| raise RuntimeError("Failed to initialize Docker Swarm") from e |
|
|
| def _ensure_overlay_network(self, network: str) -> None: |
| import subprocess |
|
|
| inspect = subprocess.run( |
| ["docker", "network", "inspect", network], |
| capture_output=True, |
| text=True, |
| check=False, |
| ) |
| if inspect.returncode == 0: |
| return |
|
|
| try: |
| subprocess.run( |
| [ |
| "docker", |
| "network", |
| "create", |
| "--driver", |
| "overlay", |
| "--attachable", |
| network, |
| ], |
| check=True, |
| capture_output=True, |
| timeout=10, |
| ) |
| except subprocess.CalledProcessError as e: |
| raise RuntimeError(f"Failed to create overlay network '{network}'") from e |
|
|
| def _find_available_port(self) -> int: |
| import socket |
|
|
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
| s.bind(("", 0)) |
| s.listen(1) |
| port = s.getsockname()[1] |
| return port |
|
|
| def _generate_service_name(self, image: str) -> str: |
| import time |
|
|
| clean_image = image.split("/")[-1].split(":")[0] |
| timestamp = int(time.time() * 1000) |
| return f"{clean_image}-swarm-{timestamp}" |
|
|
|
|
| class KubernetesProvider(ContainerProvider): |
| """ |
| Container provider for Kubernetes clusters. |
| |
| This provider creates pods in a Kubernetes cluster and exposes them |
| via services or port-forwarding. |
| |
| Example: |
| >>> provider = KubernetesProvider(namespace="envtorch-dev") |
| >>> base_url = provider.start_container("echo-env:latest") |
| >>> # Pod running in k8s, accessible via service or port-forward |
| >>> provider.stop_container() |
| """ |
|
|
| pass |
|
|
|
|
| class RuntimeProvider(ABC): |
| """ |
| Abstract base class for runtime providers that are not container providers. |
| Providers implement this interface to support different runtime platforms: |
| - UVProvider: Runs environments via `uv run` |
| |
| The provider manages a single runtime lifecycle and provides the base URL |
| for connecting to it. |
| |
| Example: |
| >>> provider = UVProvider(project_path="/path/to/env") |
| >>> base_url = provider.start() |
| >>> print(base_url) # http://localhost:8000 |
| >>> provider.stop() |
| """ |
|
|
| @abstractmethod |
| def start( |
| self, |
| port: Optional[int] = None, |
| env_vars: Optional[Dict[str, str]] = None, |
| **kwargs: Any, |
| ) -> str: |
| """ |
| Start a runtime from the specified image. |
| |
| Args: |
| image: Runtime image name |
| port: Port to expose (if None, provider chooses) |
| env_vars: Environment variables for the runtime |
| **kwargs: Additional runtime options |
| """ |
|
|
| @abstractmethod |
| def stop(self) -> None: |
| """ |
| Stop the runtime. |
| """ |
| pass |
|
|
| @abstractmethod |
| def wait_for_ready(self, timeout_s: float = 30.0) -> None: |
| """ |
| Wait for the runtime to be ready to accept requests. |
| """ |
| pass |
|
|
| def __enter__(self) -> "RuntimeProvider": |
| """ |
| Enter the runtime provider. |
| """ |
| self.start() |
| return self |
|
|
| def __exit__(self, exc_type, exc, tb) -> None: |
| """ |
| Exit the runtime provider. |
| """ |
| self.stop() |
| return False |
|
|