# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """ Container provider abstractions for running environment servers. This module provides a pluggable architecture for different container providers (local Docker, Kubernetes, cloud providers, etc.) to be used with HTTPEnvClient. """ from __future__ import annotations from abc import ABC, abstractmethod from typing import Any, Dict, Optional class ContainerProvider(ABC): """ Abstract base class for container providers. Providers implement this interface to support different container platforms: - LocalDockerProvider: Runs containers on local Docker daemon - KubernetesProvider: Runs containers in Kubernetes cluster - FargateProvider: Runs containers on AWS Fargate - CloudRunProvider: Runs containers on Google Cloud Run The provider manages a single container lifecycle and provides the base URL for connecting to it. Example: >>> provider = LocalDockerProvider() >>> base_url = provider.start_container("echo-env:latest") >>> print(base_url) # http://localhost:8000 >>> # Use the environment via base_url >>> provider.stop_container() """ @abstractmethod def start_container( self, image: str, port: Optional[int] = None, env_vars: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> str: """ Start a container from the specified image. Args: image: Container image name (e.g., "echo-env:latest") port: Port to expose (if None, provider chooses) env_vars: Environment variables to pass to container **kwargs: Provider-specific options Returns: Base URL to connect to the container (e.g., "http://localhost:8000") Raises: RuntimeError: If container fails to start """ pass @abstractmethod def stop_container(self) -> None: """ Stop and remove the running container. This cleans up the container that was started by start_container(). """ pass @abstractmethod def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None: """ Wait for the container to be ready to accept requests. This typically polls the /health endpoint until it returns 200. Args: base_url: Base URL of the container timeout_s: Maximum time to wait Raises: TimeoutError: If container doesn't become ready in time """ pass class LocalDockerProvider(ContainerProvider): """ Container provider for local Docker daemon. This provider runs containers on the local machine using Docker. Useful for development and testing. Example: >>> provider = LocalDockerProvider() >>> base_url = provider.start_container("echo-env:latest") >>> # Container running on http://localhost: >>> provider.stop_container() """ def __init__(self): """Initialize the local Docker provider.""" self._container_id: Optional[str] = None self._container_name: Optional[str] = None # Check if Docker is available import subprocess try: subprocess.run( ["docker", "version"], check=True, capture_output=True, timeout=5, ) except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): raise RuntimeError( "Docker is not available. Please install Docker Desktop or Docker Engine." ) def start_container( self, image: str, port: Optional[int] = None, env_vars: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> str: """ Start a Docker container locally. Args: image: Docker image name port: Port to expose (if None, finds available port) env_vars: Environment variables for the container **kwargs: Additional Docker run options - memory_gb: Memory limit in GB (default: 4GB) - command_override: List of command args to override container CMD Returns: Base URL to connect to the container """ import subprocess import time import logging logger = logging.getLogger(__name__) # Find available port if not specified if port is None: port = self._find_available_port() # Use default memory limit if not specified memory_gb = kwargs.get("memory_gb", 16) # Generate container name self._container_name = self._generate_container_name(image) # Build docker run command # Use host networking for better performance and consistency with podman # NOTE: Do NOT use --rm initially - if container fails to start, we need logs cmd = [ "docker", "run", "-d", # Detached "--name", self._container_name, "--network", "host", # Use host network "--memory", f"{memory_gb}g", # Limit container memory "--memory-swap", f"{memory_gb}g", # Prevent swap usage (set equal to --memory) "--oom-kill-disable=false", # Allow OOM killer (exit gracefully) ] # Add environment variables if env_vars: for key, value in env_vars.items(): cmd.extend(["-e", f"{key}={value}"]) # Pass custom port via environment variable instead of overriding command # This allows the container to use its proper entrypoint/CMD if port != 8000: cmd.extend(["-e", f"PORT={port}"]) # Add image cmd.append(image) # Add command override if provided (explicit override by user) if "command_override" in kwargs: cmd.extend(kwargs["command_override"]) # Run container try: logger.debug(f"Starting container with command: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True, check=True) self._container_id = result.stdout.strip() logger.debug(f"Container started with ID: {self._container_id}") except subprocess.CalledProcessError as e: error_msg = f"Failed to start Docker container.\nCommand: {' '.join(cmd)}\nExit code: {e.returncode}\nStderr: {e.stderr}\nStdout: {e.stdout}" raise RuntimeError(error_msg) from e # Wait a moment for container to start time.sleep(1) base_url = f"http://127.0.0.1:{port}" return base_url def stop_container(self) -> None: """ Stop and remove the Docker container. """ if self._container_id is None: return import subprocess try: # Stop container subprocess.run( ["docker", "stop", self._container_id], capture_output=True, check=True, timeout=10, ) # Remove container subprocess.run( ["docker", "rm", self._container_id], capture_output=True, check=True, timeout=10, ) except subprocess.CalledProcessError: # Container might already be stopped/removed pass finally: self._container_id = None self._container_name = None def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None: """ Wait for container to be ready by polling /health endpoint. Args: base_url: Base URL of the container timeout_s: Maximum time to wait Raises: TimeoutError: If container doesn't become ready """ import time import requests import subprocess import logging start_time = time.time() health_url = f"{base_url}/health" last_error = None while time.time() - start_time < timeout_s: try: response = requests.get(health_url, timeout=2.0) if response.status_code == 200: return except requests.RequestException as e: last_error = str(e) time.sleep(0.5) # If we timeout, provide diagnostic information error_msg = f"Container at {base_url} did not become ready within {timeout_s}s" if self._container_id: try: # First check if container exists inspect_result = subprocess.run( ["docker", "inspect", self._container_id], capture_output=True, text=True, timeout=5, ) if inspect_result.returncode != 0: # Container doesn't exist - likely exited and auto-removed due to --rm flag error_msg += f"\n\nContainer was auto-removed (likely exited immediately)." error_msg += f"\nThis typically means:" error_msg += f"\n 1. The container image has an error in its startup script" error_msg += f"\n 2. Required dependencies are missing in the container" error_msg += f"\n 3. Port {base_url.split(':')[-1]} might be in use by another process" error_msg += f"\n 4. Container command/entrypoint is misconfigured" error_msg += f"\nTry running the container manually to debug:" error_msg += f"\n docker run -it --rm " else: # Container exists, try to get logs result = subprocess.run( ["docker", "logs", "--tail", "50", self._container_id], capture_output=True, text=True, timeout=5, ) if result.stdout or result.stderr: error_msg += f"\n\nContainer logs (last 50 lines):\n{result.stdout}\n{result.stderr}" except subprocess.TimeoutExpired: error_msg += f"\n\nTimeout while trying to inspect container" except Exception as e: error_msg += f"\n\nFailed to get container diagnostics: {e}" if last_error: error_msg += f"\n\nLast connection error: {last_error}" raise TimeoutError(error_msg) def _find_available_port(self) -> int: """ Find an available port on localhost. Returns: An available port number """ import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("", 0)) s.listen(1) port = s.getsockname()[1] return port def _generate_container_name(self, image: str) -> str: """ Generate a unique container name based on image name and timestamp. Args: image: Docker image name Returns: A unique container name """ import time clean_image = image.split("/")[-1].split(":")[0] timestamp = int(time.time() * 1000) return f"{clean_image}-{timestamp}" class KubernetesProvider(ContainerProvider): """ Container provider for Kubernetes clusters. This provider creates pods in a Kubernetes cluster and exposes them via services or port-forwarding. Example: >>> provider = KubernetesProvider(namespace="envtorch-dev") >>> base_url = provider.start_container("echo-env:latest") >>> # Pod running in k8s, accessible via service or port-forward >>> provider.stop_container() """ pass