Spaces:
Sleeping
Sleeping
| """Code execution engine with concurrency control and timeout management.""" | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| import os | |
| import shlex | |
| import time | |
| from pathlib import Path | |
| from app.config import Settings, get_settings | |
| from app.models import ExecutionResult | |
| logger = logging.getLogger(__name__) | |
| class CodeExecutor: | |
| """Manages concurrent Python code execution with resource limits.""" | |
| def __init__(self, settings: Settings | None = None): | |
| self._settings = settings or get_settings() | |
| self._semaphore = asyncio.Semaphore(self._settings.max_concurrent_executions) | |
| self._active_processes: dict[str, asyncio.subprocess.Process] = {} | |
| self._lock = asyncio.Lock() | |
| def settings(self) -> Settings: | |
| return self._settings | |
| def update_settings(self, settings: Settings) -> None: | |
| self._settings = settings | |
| self._semaphore = asyncio.Semaphore(settings.max_concurrent_executions) | |
| async def execute_file(self, file_path: str) -> ExecutionResult: | |
| """Execute a Python file with concurrency control and timeout.""" | |
| path = Path(file_path) | |
| if not path.exists(): | |
| return ExecutionResult( | |
| success=False, | |
| stderr=f"File not found: {file_path}", | |
| return_code=-1, | |
| file_path=file_path, | |
| ) | |
| if not path.suffix == ".py": | |
| return ExecutionResult( | |
| success=False, | |
| stderr="Only .py files can be executed", | |
| return_code=-1, | |
| file_path=file_path, | |
| ) | |
| async with self._semaphore: | |
| return await self._run_python(file_path) | |
| async def execute_code(self, code: str, filename: str | None = None) -> ExecutionResult: | |
| """Write code to a file and execute it.""" | |
| from app.file_manager import FileManager | |
| fm = FileManager(self._settings) | |
| result = fm.create_file(code, filename) | |
| if not result.success: | |
| return ExecutionResult( | |
| success=False, | |
| stderr=result.message, | |
| return_code=-1, | |
| ) | |
| return await self.execute_file(result.file_path) | |
| async def _run_python(self, file_path: str) -> ExecutionResult: | |
| """Run a Python file as a subprocess.""" | |
| start_time = time.monotonic() | |
| python_exec = self._settings.get_python_executable() | |
| # Build command | |
| if self._settings.env_type.value == "conda": | |
| cmd = f"{python_exec} {shlex.quote(file_path)}" | |
| use_shell = True | |
| else: | |
| cmd_parts = [python_exec, file_path] | |
| cmd = cmd_parts | |
| use_shell = False | |
| process = None | |
| process_id = f"{file_path}_{time.monotonic()}" | |
| try: | |
| env = os.environ.copy() | |
| # Ensure the virtual environment is activated properly | |
| if self._settings.env_type.value in ("venv", "venv-uv"): | |
| venv = self._settings.venv_path or self._settings.uv_venv_path | |
| if venv: | |
| env["VIRTUAL_ENV"] = venv | |
| env["PATH"] = f"{Path(venv) / 'bin'}:{env.get('PATH', '')}" | |
| if use_shell: | |
| process = await asyncio.create_subprocess_shell( | |
| cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| env=env, | |
| cwd=self._settings.code_storage_dir, | |
| ) | |
| else: | |
| process = await asyncio.create_subprocess_exec( | |
| *cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| env=env, | |
| cwd=self._settings.code_storage_dir, | |
| ) | |
| async with self._lock: | |
| self._active_processes[process_id] = process | |
| try: | |
| stdout_bytes, stderr_bytes = await asyncio.wait_for( | |
| process.communicate(), | |
| timeout=self._settings.execution_timeout, | |
| ) | |
| except asyncio.TimeoutError: | |
| process.kill() | |
| await process.wait() | |
| elapsed = time.monotonic() - start_time | |
| return ExecutionResult( | |
| success=False, | |
| stderr=f"Execution timed out after {self._settings.execution_timeout}s", | |
| return_code=-1, | |
| file_path=file_path, | |
| execution_time=elapsed, | |
| ) | |
| elapsed = time.monotonic() - start_time | |
| stdout = stdout_bytes.decode("utf-8", errors="replace")[: self._settings.max_output_size] | |
| stderr = stderr_bytes.decode("utf-8", errors="replace")[: self._settings.max_output_size] | |
| return ExecutionResult( | |
| success=process.returncode == 0, | |
| stdout=stdout, | |
| stderr=stderr, | |
| return_code=process.returncode or 0, | |
| file_path=file_path, | |
| execution_time=elapsed, | |
| ) | |
| except Exception as e: | |
| elapsed = time.monotonic() - start_time | |
| logger.exception("Error executing %s", file_path) | |
| return ExecutionResult( | |
| success=False, | |
| stderr=f"Execution error: {str(e)}", | |
| return_code=-1, | |
| file_path=file_path, | |
| execution_time=elapsed, | |
| ) | |
| finally: | |
| async with self._lock: | |
| self._active_processes.pop(process_id, None) | |
| async def cleanup(self) -> None: | |
| """Kill all active processes during shutdown.""" | |
| async with self._lock: | |
| for pid, proc in self._active_processes.items(): | |
| try: | |
| proc.kill() | |
| logger.info("Killed process %s", pid) | |
| except ProcessLookupError: | |
| pass | |
| self._active_processes.clear() | |
| # Module-level singleton | |
| _executor: CodeExecutor | None = None | |
| def get_executor() -> CodeExecutor: | |
| global _executor | |
| if _executor is None: | |
| _executor = CodeExecutor() | |
| return _executor | |
| def reset_executor(settings: Settings | None = None) -> CodeExecutor: | |
| global _executor | |
| _executor = CodeExecutor(settings) | |
| return _executor |