# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """ Container provider abstractions for running environment servers. This module provides a pluggable architecture for different container providers (local Docker, Kubernetes, cloud providers, etc.) to be used with EnvClient. """ from __future__ import annotations from abc import ABC, abstractmethod from typing import Any, Dict, Optional, Sequence class ContainerProvider(ABC): """ Abstract base class for container providers. Providers implement this interface to support different container platforms: - LocalDockerProvider: Runs containers on local Docker daemon - KubernetesProvider: Runs containers in Kubernetes cluster - FargateProvider: Runs containers on AWS Fargate - CloudRunProvider: Runs containers on Google Cloud Run The provider manages a single container lifecycle and provides the base URL for connecting to it. Example: >>> provider = LocalDockerProvider() >>> base_url = provider.start_container("echo-env:latest") >>> print(base_url) # http://localhost:8000 >>> # Use the environment via base_url >>> provider.stop_container() """ @abstractmethod def start_container( self, image: str, port: Optional[int] = None, env_vars: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> str: """ Start a container from the specified image. Args: image: Container image name (e.g., "echo-env:latest") port: Port to expose (if None, provider chooses) env_vars: Environment variables to pass to container **kwargs: Provider-specific options Returns: Base URL to connect to the container (e.g., "http://localhost:8000") Raises: RuntimeError: If container fails to start """ pass @abstractmethod def stop_container(self) -> None: """ Stop and remove the running container. This cleans up the container that was started by start_container(). """ pass @abstractmethod def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None: """ Wait for the container to be ready to accept requests. This typically polls the /health endpoint until it returns 200. Args: base_url: Base URL of the container timeout_s: Maximum time to wait Raises: TimeoutError: If container doesn't become ready in time """ pass class LocalDockerProvider(ContainerProvider): """ Container provider for local Docker daemon. This provider runs containers on the local machine using Docker. Useful for development and testing. Example: >>> provider = LocalDockerProvider() >>> base_url = provider.start_container("echo-env:latest") >>> # Container running on http://localhost: >>> provider.stop_container() """ def __init__(self): """Initialize the local Docker provider.""" self._container_id: Optional[str] = None self._container_name: Optional[str] = None # Check if Docker is available import subprocess try: subprocess.run( ["docker", "version"], check=True, capture_output=True, timeout=5, ) except ( subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired, ): raise RuntimeError( "Docker is not available. Please install Docker Desktop or Docker Engine." ) def start_container( self, image: str, port: Optional[int] = None, env_vars: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> str: """ Start a Docker container locally. Args: image: Docker image name port: Port to expose (if None, finds available port) env_vars: Environment variables for the container **kwargs: Additional Docker run options Returns: Base URL to connect to the container """ import subprocess import time # Find available port if not specified if port is None: port = self._find_available_port() # Generate container name self._container_name = self._generate_container_name(image) # Build docker run command cmd = [ "docker", "run", "-d", # Detached "--name", self._container_name, "-p", f"{port}:8000", # Map port ] # Add environment variables if env_vars: for key, value in env_vars.items(): cmd.extend(["-e", f"{key}={value}"]) # Add image cmd.append(image) # Run container try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) self._container_id = result.stdout.strip() except subprocess.CalledProcessError as e: error_msg = f"Failed to start Docker container.\nCommand: {' '.join(cmd)}\nExit code: {e.returncode}\nStderr: {e.stderr}\nStdout: {e.stdout}" raise RuntimeError(error_msg) from e # Wait a moment for container to start time.sleep(1) base_url = f"http://localhost:{port}" return base_url def stop_container(self) -> None: """ Stop and remove the Docker container. """ if self._container_id is None: return import subprocess try: # Stop container subprocess.run( ["docker", "stop", self._container_id], capture_output=True, check=True, timeout=10, ) # Remove container subprocess.run( ["docker", "rm", self._container_id], capture_output=True, check=True, timeout=10, ) except subprocess.CalledProcessError: # Container might already be stopped/removed pass finally: self._container_id = None self._container_name = None def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None: """ Wait for container to be ready by polling /health endpoint. Args: base_url: Base URL of the container timeout_s: Maximum time to wait Raises: TimeoutError: If container doesn't become ready """ import time import requests start_time = time.time() health_url = f"{base_url}/health" # Bypass proxy for localhost to avoid proxy issues proxies = {"http": None, "https": None} while time.time() - start_time < timeout_s: try: response = requests.get(health_url, timeout=2.0, proxies=proxies) if response.status_code == 200: return except requests.RequestException: pass time.sleep(0.5) raise TimeoutError( f"Container at {base_url} did not become ready within {timeout_s}s" ) def _find_available_port(self) -> int: """ Find an available port on localhost. Returns: An available port number """ import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("", 0)) s.listen(1) port = s.getsockname()[1] return port def _generate_container_name(self, image: str) -> str: """ Generate a unique container name based on image name and timestamp. Args: image: Docker image name Returns: A unique container name """ import time clean_image = image.split("/")[-1].split(":")[0] timestamp = int(time.time() * 1000) return f"{clean_image}-{timestamp}" class DockerSwarmProvider(ContainerProvider): """ Container provider that uses Docker Swarm services for local concurrency. This provider creates a replicated Swarm service backed by the local Docker engine. The built-in load-balancer fans requests across the replicas, allowing multiple container instances to run concurrently on the developer workstation (mirroring the workflow described in the Docker stack docs). """ def __init__( self, *, auto_init_swarm: bool = True, overlay_network: Optional[str] = None, ): """ Args: auto_init_swarm: Whether to call ``docker swarm init`` when Swarm is not active. Otherwise, user must manually initialize Swarm. overlay_network: Optional overlay network name for the service. When provided, the network is created with ``docker network create --driver overlay --attachable`` if it does not already exist. """ self._service_name: Optional[str] = None self._service_id: Optional[str] = None self._published_port: Optional[int] = None self._overlay_network = overlay_network self._auto_init_swarm = auto_init_swarm self._ensure_docker_available() self._ensure_swarm_initialized() if self._overlay_network: self._ensure_overlay_network(self._overlay_network) def start_container( self, image: str, port: Optional[int] = None, env_vars: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> str: """ Start (or scale) a Swarm service for the given image. Supported kwargs: replicas (int): Number of container replicas (default: 2). cpu_limit (float | str): CPU limit passed to ``--limit-cpu``. memory_limit (str): Memory limit passed to ``--limit-memory``. constraints (Sequence[str]): Placement constraints. labels (Dict[str, str]): Service labels. command (Sequence[str] | str): Override container command. """ import shlex import subprocess import time allowed_kwargs = { "replicas", "cpu_limit", "memory_limit", "constraints", "labels", "command", } unknown = set(kwargs) - allowed_kwargs if unknown: raise ValueError(f"Unsupported kwargs for DockerSwarmProvider: {unknown}") replicas = int(kwargs.get("replicas", 2)) cpu_limit = kwargs.get("cpu_limit") memory_limit = kwargs.get("memory_limit") constraints: Optional[Sequence[str]] = kwargs.get("constraints") labels: Optional[Dict[str, str]] = kwargs.get("labels") command_override = kwargs.get("command") if port is None: port = self._find_available_port() self._service_name = self._generate_service_name(image) self._published_port = port cmd = [ "docker", "service", "create", "--detach", "--name", self._service_name, "--replicas", str(max(1, replicas)), "--publish", f"{port}:8000", ] if self._overlay_network: cmd.extend(["--network", self._overlay_network]) if env_vars: for key, value in env_vars.items(): cmd.extend(["--env", f"{key}={value}"]) if cpu_limit is not None: cmd.extend(["--limit-cpu", str(cpu_limit)]) if memory_limit is not None: cmd.extend(["--limit-memory", str(memory_limit)]) if constraints: for constraint in constraints: cmd.extend(["--constraint", constraint]) if labels: for key, value in labels.items(): cmd.extend(["--label", f"{key}={value}"]) cmd.append(image) if command_override: if isinstance(command_override, str): cmd.extend(shlex.split(command_override)) else: cmd.extend(command_override) try: result = subprocess.run( cmd, capture_output=True, text=True, check=True, ) self._service_id = result.stdout.strip() except subprocess.CalledProcessError as e: error_msg = ( "Failed to start Docker Swarm service.\n" f"Command: {' '.join(cmd)}\n" f"Exit code: {e.returncode}\n" f"Stdout: {e.stdout}\n" f"Stderr: {e.stderr}" ) raise RuntimeError(error_msg) from e # Give Swarm a brief moment to schedule the tasks. time.sleep(1.0) return f"http://localhost:{port}" def stop_container(self) -> None: """ Remove the Swarm service (and keep the Swarm manager running). """ if not self._service_name: return import subprocess try: subprocess.run( ["docker", "service", "rm", self._service_name], capture_output=True, check=True, timeout=10, ) except subprocess.CalledProcessError: # Service may already be gone; ignore. pass finally: self._service_name = None self._service_id = None self._published_port = None def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None: """ Wait for at least one replica to become healthy by polling /health. Note: With Swarm's load balancer, requests round-robin across replicas, so this only verifies that at least one replica is responding. Some replicas may still be starting when this returns. """ import time import requests deadline = time.time() + timeout_s health_url = f"{base_url}/health" # Bypass proxy for localhost to avoid proxy issues proxies = {"http": None, "https": None} while time.time() < deadline: try: response = requests.get(health_url, timeout=2.0, proxies=proxies) if response.status_code == 200: return except requests.RequestException: pass time.sleep(0.5) raise TimeoutError( f"Swarm service at {base_url} did not become ready within {timeout_s}s" ) def _ensure_docker_available(self) -> None: import subprocess try: subprocess.run( ["docker", "version"], check=True, capture_output=True, timeout=5, ) except ( subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired, ) as exc: raise RuntimeError( "Docker is not available. Please install Docker Desktop or Docker Engine." ) from exc def _ensure_swarm_initialized(self) -> None: import subprocess try: result = subprocess.run( ["docker", "info", "--format", "{{.Swarm.LocalNodeState}}"], capture_output=True, text=True, check=True, timeout=5, ) state = result.stdout.strip().lower() if state == "active": return except subprocess.CalledProcessError: state = "unknown" if not self._auto_init_swarm: raise RuntimeError( f"Docker Swarm is not active (state={state}). Enable Swarm manually or pass auto_init_swarm=True." ) try: subprocess.run( ["docker", "swarm", "init"], check=True, capture_output=True, timeout=10, ) except subprocess.CalledProcessError as e: raise RuntimeError("Failed to initialize Docker Swarm") from e def _ensure_overlay_network(self, network: str) -> None: import subprocess inspect = subprocess.run( ["docker", "network", "inspect", network], capture_output=True, text=True, check=False, ) if inspect.returncode == 0: return try: subprocess.run( [ "docker", "network", "create", "--driver", "overlay", "--attachable", network, ], check=True, capture_output=True, timeout=10, ) except subprocess.CalledProcessError as e: raise RuntimeError(f"Failed to create overlay network '{network}'") from e def _find_available_port(self) -> int: import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("", 0)) s.listen(1) port = s.getsockname()[1] return port def _generate_service_name(self, image: str) -> str: import time clean_image = image.split("/")[-1].split(":")[0] timestamp = int(time.time() * 1000) return f"{clean_image}-swarm-{timestamp}" class KubernetesProvider(ContainerProvider): """ Container provider for Kubernetes clusters. This provider creates pods in a Kubernetes cluster and exposes them via services or port-forwarding. Example: >>> provider = KubernetesProvider(namespace="envtorch-dev") >>> base_url = provider.start_container("echo-env:latest") >>> # Pod running in k8s, accessible via service or port-forward >>> provider.stop_container() """ pass class RuntimeProvider(ABC): """ Abstract base class for runtime providers that are not container providers. Providers implement this interface to support different runtime platforms: - UVProvider: Runs environments via `uv run` The provider manages a single runtime lifecycle and provides the base URL for connecting to it. Example: >>> provider = UVProvider(project_path="/path/to/env") >>> base_url = provider.start() >>> print(base_url) # http://localhost:8000 >>> provider.stop() """ @abstractmethod def start( self, port: Optional[int] = None, env_vars: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> str: """ Start a runtime from the specified image. Args: image: Runtime image name port: Port to expose (if None, provider chooses) env_vars: Environment variables for the runtime **kwargs: Additional runtime options """ @abstractmethod def stop(self) -> None: """ Stop the runtime. """ pass @abstractmethod def wait_for_ready(self, timeout_s: float = 30.0) -> None: """ Wait for the runtime to be ready to accept requests. """ pass def __enter__(self) -> "RuntimeProvider": """ Enter the runtime provider. """ self.start() return self def __exit__(self, exc_type, exc, tb) -> None: """ Exit the runtime provider. """ self.stop() return False