"""Code execution engine with concurrency control and timeout management.""" from __future__ import annotations import asyncio import logging import os import shlex import time from pathlib import Path from app.config import Settings, get_settings from app.models import ExecutionResult logger = logging.getLogger(__name__) class CodeExecutor: """Manages concurrent Python code execution with resource limits.""" def __init__(self, settings: Settings | None = None): self._settings = settings or get_settings() self._semaphore = asyncio.Semaphore(self._settings.max_concurrent_executions) self._active_processes: dict[str, asyncio.subprocess.Process] = {} self._lock = asyncio.Lock() @property def settings(self) -> Settings: return self._settings def update_settings(self, settings: Settings) -> None: self._settings = settings self._semaphore = asyncio.Semaphore(settings.max_concurrent_executions) async def execute_file(self, file_path: str) -> ExecutionResult: """Execute a Python file with concurrency control and timeout.""" path = Path(file_path) if not path.exists(): return ExecutionResult( success=False, stderr=f"File not found: {file_path}", return_code=-1, file_path=file_path, ) if not path.suffix == ".py": return ExecutionResult( success=False, stderr="Only .py files can be executed", return_code=-1, file_path=file_path, ) async with self._semaphore: return await self._run_python(file_path) async def execute_code(self, code: str, filename: str | None = None) -> ExecutionResult: """Write code to a file and execute it.""" from app.file_manager import FileManager fm = FileManager(self._settings) result = fm.create_file(code, filename) if not result.success: return ExecutionResult( success=False, stderr=result.message, return_code=-1, ) return await self.execute_file(result.file_path) async def _run_python(self, file_path: str) -> ExecutionResult: """Run a Python file as a subprocess.""" start_time = time.monotonic() python_exec = self._settings.get_python_executable() # Build command if self._settings.env_type.value == "conda": cmd = f"{python_exec} {shlex.quote(file_path)}" use_shell = True else: cmd_parts = [python_exec, file_path] cmd = cmd_parts use_shell = False process = None process_id = f"{file_path}_{time.monotonic()}" try: env = os.environ.copy() # Ensure the virtual environment is activated properly if self._settings.env_type.value in ("venv", "venv-uv"): venv = self._settings.venv_path or self._settings.uv_venv_path if venv: env["VIRTUAL_ENV"] = venv env["PATH"] = f"{Path(venv) / 'bin'}:{env.get('PATH', '')}" if use_shell: process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env, cwd=self._settings.code_storage_dir, ) else: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env, cwd=self._settings.code_storage_dir, ) async with self._lock: self._active_processes[process_id] = process try: stdout_bytes, stderr_bytes = await asyncio.wait_for( process.communicate(), timeout=self._settings.execution_timeout, ) except asyncio.TimeoutError: process.kill() await process.wait() elapsed = time.monotonic() - start_time return ExecutionResult( success=False, stderr=f"Execution timed out after {self._settings.execution_timeout}s", return_code=-1, file_path=file_path, execution_time=elapsed, ) elapsed = time.monotonic() - start_time stdout = stdout_bytes.decode("utf-8", errors="replace")[: self._settings.max_output_size] stderr = stderr_bytes.decode("utf-8", errors="replace")[: self._settings.max_output_size] return ExecutionResult( success=process.returncode == 0, stdout=stdout, stderr=stderr, return_code=process.returncode or 0, file_path=file_path, execution_time=elapsed, ) except Exception as e: elapsed = time.monotonic() - start_time logger.exception("Error executing %s", file_path) return ExecutionResult( success=False, stderr=f"Execution error: {str(e)}", return_code=-1, file_path=file_path, execution_time=elapsed, ) finally: async with self._lock: self._active_processes.pop(process_id, None) async def cleanup(self) -> None: """Kill all active processes during shutdown.""" async with self._lock: for pid, proc in self._active_processes.items(): try: proc.kill() logger.info("Killed process %s", pid) except ProcessLookupError: pass self._active_processes.clear() # Module-level singleton _executor: CodeExecutor | None = None def get_executor() -> CodeExecutor: global _executor if _executor is None: _executor = CodeExecutor() return _executor def reset_executor(settings: Settings | None = None) -> CodeExecutor: global _executor _executor = CodeExecutor(settings) return _executor