import docker from docker.errors import DockerException, NotFound, APIError import logging import uuid from datetime import datetime, timedelta from typing import Dict, Optional, List import threading import time from sandbox.models import ( SessionResponse, SessionStatus, CreateSessionRequest, SandboxConfig ) logger = logging.getLogger(__name__) class SessionManager: """ Manages persistent VM-like container sessions. Each session is a long-running Docker container with: - Persistent volume for file storage - Multi-language development environment - Dedicated workspace directory """ def __init__(self, config: Optional[SandboxConfig] = None): self.config = config or SandboxConfig() try: self.client = docker.from_env() self.client.ping() logger.info("Docker client initialized for session management") except DockerException as e: logger.error(f"Failed to initialize Docker client: {e}") raise RuntimeError("Docker is not available") from e # In-memory session registry (use Redis for production) self.sessions: Dict[str, SessionResponse] = {} self._lock = threading.Lock() # Start cleanup thread self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) self._cleanup_thread.start() logger.info("Session cleanup thread started") def create_session(self, request: CreateSessionRequest) -> SessionResponse: """ Create a new persistent session. Args: request: Session creation request Returns: SessionResponse with session details """ session_id = str(uuid.uuid4()) volume_name = f"sandbox-session-{session_id}" try: # Create dedicated volume logger.info(f"Creating volume {volume_name}") volume = self.client.volumes.create( name=volume_name, driver='local', labels={'session_id': session_id} ) # Create long-running container logger.info(f"Creating session container for {session_id}") container = self.client.containers.create( image='sandbox-devenv:latest', # Our multi-language image name=f"sandbox-session-{session_id}", detach=True, # Mount volume to workspace volumes={ volume_name: {'bind': '/workspace', 'mode': 'rw'} }, # Keep container running command='tail -f /dev/null', # Resource limits mem_limit=f"{self.config.max_memory_mb}m", memswap_limit=f"{self.config.max_memory_mb}m", cpu_quota=100000, # 1 CPU core cpu_period=100000, # Network isolation (can be disabled for package installation) network_mode="bridge" if self.config.enable_network else "none", # Security read_only=False, # Need write access for development security_opt=["no-new-privileges"], # Working directory working_dir='/workspace', # Labels for tracking labels={ 'session_id': session_id, 'managed_by': 'sandbox-api' } ) # Start the container container.start() logger.info(f"Started container {container.id[:12]} for session {session_id}") # Create session object now = datetime.utcnow() session = SessionResponse( session_id=session_id, container_id=container.id, volume_name=volume_name, status=SessionStatus.READY, created_at=now, last_activity=now, timeout_minutes=request.timeout_minutes, metadata=request.metadata, files_count=0 ) # Store in registry with self._lock: self.sessions[session_id] = session logger.info(f"Session {session_id} created successfully") return session except Exception as e: logger.error(f"Failed to create session: {e}", exc_info=True) # Cleanup on failure try: if volume_name: vol = self.client.volumes.get(volume_name) vol.remove(force=True) except: pass raise RuntimeError(f"Failed to create session: {str(e)}") def get_session(self, session_id: str) -> Optional[SessionResponse]: """Get session by ID""" with self._lock: session = self.sessions.get(session_id) if session: # Update status from container try: container = self.client.containers.get(session.container_id) if container.status == 'running': session.status = SessionStatus.READY else: session.status = SessionStatus.ERROR except: session.status = SessionStatus.ERROR return session def list_sessions(self) -> List[SessionResponse]: """List all active sessions""" with self._lock: return list(self.sessions.values()) def update_activity(self, session_id: str): """Update last activity timestamp for a session""" with self._lock: if session_id in self.sessions: self.sessions[session_id].last_activity = datetime.utcnow() def destroy_session(self, session_id: str) -> bool: """ Destroy a session and cleanup resources. Args: session_id: Session to destroy Returns: True if destroyed, False if not found """ with self._lock: session = self.sessions.pop(session_id, None) if not session: logger.warning(f"Session {session_id} not found") return False try: # Stop and remove container try: container = self.client.containers.get(session.container_id) container.stop(timeout=5) container.remove(force=True) logger.info(f"Removed container {session.container_id[:12]}") except NotFound: logger.warning(f"Container {session.container_id[:12]} not found") except Exception as e: logger.error(f"Error removing container: {e}") # Remove volume try: volume = self.client.volumes.get(session.volume_name) volume.remove(force=True) logger.info(f"Removed volume {session.volume_name}") except NotFound: logger.warning(f"Volume {session.volume_name} not found") except Exception as e: logger.error(f"Error removing volume: {e}") logger.info(f"Session {session_id} destroyed successfully") return True except Exception as e: logger.error(f"Error destroying session {session_id}: {e}", exc_info=True) return False def _cleanup_loop(self): """Background thread to cleanup idle sessions""" while True: try: time.sleep(60) # Check every minute self._cleanup_idle_sessions() except Exception as e: logger.error(f"Error in cleanup loop: {e}", exc_info=True) def _cleanup_idle_sessions(self): """Cleanup sessions that have exceeded their timeout""" now = datetime.utcnow() sessions_to_destroy = [] with self._lock: for session_id, session in self.sessions.items(): timeout = timedelta(minutes=session.timeout_minutes) if (now - session.last_activity) > timeout: sessions_to_destroy.append(session_id) logger.info(f"Session {session_id} exceeded timeout, will cleanup") # Destroy outside lock to avoid deadlock for session_id in sessions_to_destroy: self.destroy_session(session_id) def shutdown(self): """Cleanup all sessions on shutdown""" logger.info("Shutting down session manager, cleaning up all sessions") session_ids = list(self.sessions.keys()) for session_id in session_ids: self.destroy_session(session_id)