Spaces:
Sleeping
Sleeping
File size: 6,495 Bytes
6b64e5c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | """Code execution engine with concurrency control and timeout management."""
from __future__ import annotations
import asyncio
import logging
import os
import shlex
import time
from pathlib import Path
from app.config import Settings, get_settings
from app.models import ExecutionResult
logger = logging.getLogger(__name__)
class CodeExecutor:
"""Manages concurrent Python code execution with resource limits."""
def __init__(self, settings: Settings | None = None):
self._settings = settings or get_settings()
self._semaphore = asyncio.Semaphore(self._settings.max_concurrent_executions)
self._active_processes: dict[str, asyncio.subprocess.Process] = {}
self._lock = asyncio.Lock()
@property
def settings(self) -> Settings:
return self._settings
def update_settings(self, settings: Settings) -> None:
self._settings = settings
self._semaphore = asyncio.Semaphore(settings.max_concurrent_executions)
async def execute_file(self, file_path: str) -> ExecutionResult:
"""Execute a Python file with concurrency control and timeout."""
path = Path(file_path)
if not path.exists():
return ExecutionResult(
success=False,
stderr=f"File not found: {file_path}",
return_code=-1,
file_path=file_path,
)
if not path.suffix == ".py":
return ExecutionResult(
success=False,
stderr="Only .py files can be executed",
return_code=-1,
file_path=file_path,
)
async with self._semaphore:
return await self._run_python(file_path)
async def execute_code(self, code: str, filename: str | None = None) -> ExecutionResult:
"""Write code to a file and execute it."""
from app.file_manager import FileManager
fm = FileManager(self._settings)
result = fm.create_file(code, filename)
if not result.success:
return ExecutionResult(
success=False,
stderr=result.message,
return_code=-1,
)
return await self.execute_file(result.file_path)
async def _run_python(self, file_path: str) -> ExecutionResult:
"""Run a Python file as a subprocess."""
start_time = time.monotonic()
python_exec = self._settings.get_python_executable()
# Build command
if self._settings.env_type.value == "conda":
cmd = f"{python_exec} {shlex.quote(file_path)}"
use_shell = True
else:
cmd_parts = [python_exec, file_path]
cmd = cmd_parts
use_shell = False
process = None
process_id = f"{file_path}_{time.monotonic()}"
try:
env = os.environ.copy()
# Ensure the virtual environment is activated properly
if self._settings.env_type.value in ("venv", "venv-uv"):
venv = self._settings.venv_path or self._settings.uv_venv_path
if venv:
env["VIRTUAL_ENV"] = venv
env["PATH"] = f"{Path(venv) / 'bin'}:{env.get('PATH', '')}"
if use_shell:
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
cwd=self._settings.code_storage_dir,
)
else:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
cwd=self._settings.code_storage_dir,
)
async with self._lock:
self._active_processes[process_id] = process
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
process.communicate(),
timeout=self._settings.execution_timeout,
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
elapsed = time.monotonic() - start_time
return ExecutionResult(
success=False,
stderr=f"Execution timed out after {self._settings.execution_timeout}s",
return_code=-1,
file_path=file_path,
execution_time=elapsed,
)
elapsed = time.monotonic() - start_time
stdout = stdout_bytes.decode("utf-8", errors="replace")[: self._settings.max_output_size]
stderr = stderr_bytes.decode("utf-8", errors="replace")[: self._settings.max_output_size]
return ExecutionResult(
success=process.returncode == 0,
stdout=stdout,
stderr=stderr,
return_code=process.returncode or 0,
file_path=file_path,
execution_time=elapsed,
)
except Exception as e:
elapsed = time.monotonic() - start_time
logger.exception("Error executing %s", file_path)
return ExecutionResult(
success=False,
stderr=f"Execution error: {str(e)}",
return_code=-1,
file_path=file_path,
execution_time=elapsed,
)
finally:
async with self._lock:
self._active_processes.pop(process_id, None)
async def cleanup(self) -> None:
"""Kill all active processes during shutdown."""
async with self._lock:
for pid, proc in self._active_processes.items():
try:
proc.kill()
logger.info("Killed process %s", pid)
except ProcessLookupError:
pass
self._active_processes.clear()
# Module-level singleton
_executor: CodeExecutor | None = None
def get_executor() -> CodeExecutor:
global _executor
if _executor is None:
_executor = CodeExecutor()
return _executor
def reset_executor(settings: Settings | None = None) -> CodeExecutor:
global _executor
_executor = CodeExecutor(settings)
return _executor |