isolated-sandbox / sandbox /executor.py
ChefAdorous's picture
Deploy Code Execution Sandbox with FastAPI and Docker
a89f25d
import docker
from docker.errors import DockerException, ContainerError, ImageNotFound, APIError
import time
import logging
from typing import Optional
from sandbox.models import ExecutionRequest, ExecutionResponse, SandboxConfig, Language
from sandbox.language_runners import LanguageRunner
logger = logging.getLogger(__name__)
class SandboxExecutor:
"""
Executes code in isolated Docker containers with security controls.
Features:
- Resource limits (CPU, memory)
- Network isolation
- Automatic cleanup
- Timeout enforcement
- Read-only filesystem
"""
def __init__(self, config: Optional[SandboxConfig] = None):
self.config = config or SandboxConfig()
try:
self.client = docker.from_env()
# Test Docker connection
self.client.ping()
logger.info("Docker client initialized successfully")
except DockerException as e:
logger.error(f"Failed to initialize Docker client: {e}")
raise RuntimeError(
"Docker is not available. Please ensure Docker is running and accessible."
) from e
def _prepare_container_config(
self,
request: ExecutionRequest,
runner_config: dict
) -> dict:
"""Prepare Docker container configuration with security settings"""
# Calculate resource limits
memory_limit = f"{request.memory_limit}m"
cpu_quota = 50000 # 0.5 CPU cores (out of 100000)
container_config = {
"image": runner_config["image"],
"command": runner_config["command"] + [request.code],
"stdin_open": bool(request.stdin),
"detach": True,
"remove": False, # We'll remove manually after getting logs
# Security settings
"network_mode": "none" if not self.config.enable_network else "bridge",
"read_only": self.config.read_only_root,
"security_opt": ["no-new-privileges"],
# Resource limits
"mem_limit": memory_limit,
"memswap_limit": memory_limit, # Disable swap
"cpu_quota": cpu_quota,
"cpu_period": 100000,
# Prevent fork bombs
"pids_limit": 50,
# Working directory
"working_dir": "/tmp",
# User (non-root when possible)
"user": "nobody" if runner_config["image"] != "bash:5.2-alpine" else None,
}
return container_config
def execute(self, request: ExecutionRequest) -> ExecutionResponse:
"""
Execute code in an isolated container.
Args:
request: Execution request with code and parameters
Returns:
ExecutionResponse with stdout, stderr, and execution metadata
"""
start_time = time.time()
container = None
try:
# Get language-specific configuration
runner_config = LanguageRunner.get_runner_config(request.language)
# Prepare container configuration
container_config = self._prepare_container_config(request, runner_config)
# Pull image if not available
try:
self.client.images.get(runner_config["image"])
except ImageNotFound:
logger.info(f"Pulling image {runner_config['image']}...")
self.client.images.pull(runner_config["image"])
# Create and start container
logger.info(f"Executing {request.language} code in container")
container = self.client.containers.create(**container_config)
container.start()
# Provide stdin if specified
if request.stdin:
sock = container.attach_socket(params={'stdin': 1, 'stream': 1})
sock._sock.sendall(request.stdin.encode())
sock.close()
# Wait for container to finish with timeout
try:
result = container.wait(timeout=request.timeout)
exit_code = result.get("StatusCode", -1)
error_msg = None
except Exception as e:
# Timeout or other error
logger.warning(f"Container execution timeout or error: {e}")
container.stop(timeout=1)
exit_code = 124 # Timeout exit code
error_msg = f"Execution timed out after {request.timeout} seconds"
# Get logs (stdout and stderr combined)
try:
logs = container.logs(stdout=True, stderr=True).decode('utf-8', errors='replace')
# Truncate if too large
if len(logs) > self.config.max_output_size:
logs = logs[:self.config.max_output_size] + "\n... (output truncated)"
# Try to separate stdout and stderr (Docker combines them)
stdout = logs
stderr = ""
# If there's an error, try to extract stderr
if exit_code != 0:
stderr = logs
stdout = ""
except Exception as e:
logger.error(f"Failed to get container logs: {e}")
stdout = ""
stderr = str(e)
execution_time = time.time() - start_time
return ExecutionResponse(
stdout=stdout,
stderr=stderr,
exit_code=exit_code,
execution_time=round(execution_time, 3),
error=error_msg
)
except ImageNotFound as e:
logger.error(f"Image not found: {e}")
return ExecutionResponse(
stdout="",
stderr="",
exit_code=-1,
execution_time=time.time() - start_time,
error=f"Language image not available: {runner_config['image']}"
)
except ContainerError as e:
logger.error(f"Container error: {e}")
return ExecutionResponse(
stdout="",
stderr=str(e),
exit_code=e.exit_status,
execution_time=time.time() - start_time,
error="Container execution failed"
)
except APIError as e:
logger.error(f"Docker API error: {e}")
return ExecutionResponse(
stdout="",
stderr="",
exit_code=-1,
execution_time=time.time() - start_time,
error=f"Docker API error: {str(e)}"
)
except Exception as e:
logger.error(f"Unexpected error during execution: {e}", exc_info=True)
return ExecutionResponse(
stdout="",
stderr="",
exit_code=-1,
execution_time=time.time() - start_time,
error=f"Unexpected error: {str(e)}"
)
finally:
# Always cleanup container
if container:
try:
container.remove(force=True)
logger.debug(f"Container {container.id[:12]} removed")
except Exception as e:
logger.warning(f"Failed to remove container: {e}")
def cleanup_all(self):
"""Remove all stopped containers (maintenance task)"""
try:
containers = self.client.containers.list(all=True, filters={"status": "exited"})
for container in containers:
try:
container.remove()
logger.info(f"Cleaned up container {container.id[:12]}")
except Exception as e:
logger.warning(f"Failed to remove container {container.id[:12]}: {e}")
except Exception as e:
logger.error(f"Failed to cleanup containers: {e}")