isolated-sandbox / sandbox /session_manager.py
ChefAdorous's picture
Deploy Code Execution Sandbox with FastAPI and Docker
a89f25d
import docker
from docker.errors import DockerException, NotFound, APIError
import logging
import uuid
from datetime import datetime, timedelta
from typing import Dict, Optional, List
import threading
import time
from sandbox.models import (
SessionResponse, SessionStatus, CreateSessionRequest,
SandboxConfig
)
logger = logging.getLogger(__name__)
class SessionManager:
"""
Manages persistent VM-like container sessions.
Each session is a long-running Docker container with:
- Persistent volume for file storage
- Multi-language development environment
- Dedicated workspace directory
"""
def __init__(self, config: Optional[SandboxConfig] = None):
self.config = config or SandboxConfig()
try:
self.client = docker.from_env()
self.client.ping()
logger.info("Docker client initialized for session management")
except DockerException as e:
logger.error(f"Failed to initialize Docker client: {e}")
raise RuntimeError("Docker is not available") from e
# In-memory session registry (use Redis for production)
self.sessions: Dict[str, SessionResponse] = {}
self._lock = threading.Lock()
# Start cleanup thread
self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
self._cleanup_thread.start()
logger.info("Session cleanup thread started")
def create_session(self, request: CreateSessionRequest) -> SessionResponse:
"""
Create a new persistent session.
Args:
request: Session creation request
Returns:
SessionResponse with session details
"""
session_id = str(uuid.uuid4())
volume_name = f"sandbox-session-{session_id}"
try:
# Create dedicated volume
logger.info(f"Creating volume {volume_name}")
volume = self.client.volumes.create(
name=volume_name,
driver='local',
labels={'session_id': session_id}
)
# Create long-running container
logger.info(f"Creating session container for {session_id}")
container = self.client.containers.create(
image='sandbox-devenv:latest', # Our multi-language image
name=f"sandbox-session-{session_id}",
detach=True,
# Mount volume to workspace
volumes={
volume_name: {'bind': '/workspace', 'mode': 'rw'}
},
# Keep container running
command='tail -f /dev/null',
# Resource limits
mem_limit=f"{self.config.max_memory_mb}m",
memswap_limit=f"{self.config.max_memory_mb}m",
cpu_quota=100000, # 1 CPU core
cpu_period=100000,
# Network isolation (can be disabled for package installation)
network_mode="bridge" if self.config.enable_network else "none",
# Security
read_only=False, # Need write access for development
security_opt=["no-new-privileges"],
# Working directory
working_dir='/workspace',
# Labels for tracking
labels={
'session_id': session_id,
'managed_by': 'sandbox-api'
}
)
# Start the container
container.start()
logger.info(f"Started container {container.id[:12]} for session {session_id}")
# Create session object
now = datetime.utcnow()
session = SessionResponse(
session_id=session_id,
container_id=container.id,
volume_name=volume_name,
status=SessionStatus.READY,
created_at=now,
last_activity=now,
timeout_minutes=request.timeout_minutes,
metadata=request.metadata,
files_count=0
)
# Store in registry
with self._lock:
self.sessions[session_id] = session
logger.info(f"Session {session_id} created successfully")
return session
except Exception as e:
logger.error(f"Failed to create session: {e}", exc_info=True)
# Cleanup on failure
try:
if volume_name:
vol = self.client.volumes.get(volume_name)
vol.remove(force=True)
except:
pass
raise RuntimeError(f"Failed to create session: {str(e)}")
def get_session(self, session_id: str) -> Optional[SessionResponse]:
"""Get session by ID"""
with self._lock:
session = self.sessions.get(session_id)
if session:
# Update status from container
try:
container = self.client.containers.get(session.container_id)
if container.status == 'running':
session.status = SessionStatus.READY
else:
session.status = SessionStatus.ERROR
except:
session.status = SessionStatus.ERROR
return session
def list_sessions(self) -> List[SessionResponse]:
"""List all active sessions"""
with self._lock:
return list(self.sessions.values())
def update_activity(self, session_id: str):
"""Update last activity timestamp for a session"""
with self._lock:
if session_id in self.sessions:
self.sessions[session_id].last_activity = datetime.utcnow()
def destroy_session(self, session_id: str) -> bool:
"""
Destroy a session and cleanup resources.
Args:
session_id: Session to destroy
Returns:
True if destroyed, False if not found
"""
with self._lock:
session = self.sessions.pop(session_id, None)
if not session:
logger.warning(f"Session {session_id} not found")
return False
try:
# Stop and remove container
try:
container = self.client.containers.get(session.container_id)
container.stop(timeout=5)
container.remove(force=True)
logger.info(f"Removed container {session.container_id[:12]}")
except NotFound:
logger.warning(f"Container {session.container_id[:12]} not found")
except Exception as e:
logger.error(f"Error removing container: {e}")
# Remove volume
try:
volume = self.client.volumes.get(session.volume_name)
volume.remove(force=True)
logger.info(f"Removed volume {session.volume_name}")
except NotFound:
logger.warning(f"Volume {session.volume_name} not found")
except Exception as e:
logger.error(f"Error removing volume: {e}")
logger.info(f"Session {session_id} destroyed successfully")
return True
except Exception as e:
logger.error(f"Error destroying session {session_id}: {e}", exc_info=True)
return False
def _cleanup_loop(self):
"""Background thread to cleanup idle sessions"""
while True:
try:
time.sleep(60) # Check every minute
self._cleanup_idle_sessions()
except Exception as e:
logger.error(f"Error in cleanup loop: {e}", exc_info=True)
def _cleanup_idle_sessions(self):
"""Cleanup sessions that have exceeded their timeout"""
now = datetime.utcnow()
sessions_to_destroy = []
with self._lock:
for session_id, session in self.sessions.items():
timeout = timedelta(minutes=session.timeout_minutes)
if (now - session.last_activity) > timeout:
sessions_to_destroy.append(session_id)
logger.info(f"Session {session_id} exceeded timeout, will cleanup")
# Destroy outside lock to avoid deadlock
for session_id in sessions_to_destroy:
self.destroy_session(session_id)
def shutdown(self):
"""Cleanup all sessions on shutdown"""
logger.info("Shutting down session manager, cleaning up all sessions")
session_ids = list(self.sessions.keys())
for session_id in session_ids:
self.destroy_session(session_id)