codeMcp / app /executor.py
sarveshpatel's picture
Create app/executor.py
6b64e5c verified
"""Code execution engine with concurrency control and timeout management."""
from __future__ import annotations
import asyncio
import logging
import os
import shlex
import time
from pathlib import Path
from app.config import Settings, get_settings
from app.models import ExecutionResult
logger = logging.getLogger(__name__)
class CodeExecutor:
"""Manages concurrent Python code execution with resource limits."""
def __init__(self, settings: Settings | None = None):
self._settings = settings or get_settings()
self._semaphore = asyncio.Semaphore(self._settings.max_concurrent_executions)
self._active_processes: dict[str, asyncio.subprocess.Process] = {}
self._lock = asyncio.Lock()
@property
def settings(self) -> Settings:
return self._settings
def update_settings(self, settings: Settings) -> None:
self._settings = settings
self._semaphore = asyncio.Semaphore(settings.max_concurrent_executions)
async def execute_file(self, file_path: str) -> ExecutionResult:
"""Execute a Python file with concurrency control and timeout."""
path = Path(file_path)
if not path.exists():
return ExecutionResult(
success=False,
stderr=f"File not found: {file_path}",
return_code=-1,
file_path=file_path,
)
if not path.suffix == ".py":
return ExecutionResult(
success=False,
stderr="Only .py files can be executed",
return_code=-1,
file_path=file_path,
)
async with self._semaphore:
return await self._run_python(file_path)
async def execute_code(self, code: str, filename: str | None = None) -> ExecutionResult:
"""Write code to a file and execute it."""
from app.file_manager import FileManager
fm = FileManager(self._settings)
result = fm.create_file(code, filename)
if not result.success:
return ExecutionResult(
success=False,
stderr=result.message,
return_code=-1,
)
return await self.execute_file(result.file_path)
async def _run_python(self, file_path: str) -> ExecutionResult:
"""Run a Python file as a subprocess."""
start_time = time.monotonic()
python_exec = self._settings.get_python_executable()
# Build command
if self._settings.env_type.value == "conda":
cmd = f"{python_exec} {shlex.quote(file_path)}"
use_shell = True
else:
cmd_parts = [python_exec, file_path]
cmd = cmd_parts
use_shell = False
process = None
process_id = f"{file_path}_{time.monotonic()}"
try:
env = os.environ.copy()
# Ensure the virtual environment is activated properly
if self._settings.env_type.value in ("venv", "venv-uv"):
venv = self._settings.venv_path or self._settings.uv_venv_path
if venv:
env["VIRTUAL_ENV"] = venv
env["PATH"] = f"{Path(venv) / 'bin'}:{env.get('PATH', '')}"
if use_shell:
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
cwd=self._settings.code_storage_dir,
)
else:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
cwd=self._settings.code_storage_dir,
)
async with self._lock:
self._active_processes[process_id] = process
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
process.communicate(),
timeout=self._settings.execution_timeout,
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
elapsed = time.monotonic() - start_time
return ExecutionResult(
success=False,
stderr=f"Execution timed out after {self._settings.execution_timeout}s",
return_code=-1,
file_path=file_path,
execution_time=elapsed,
)
elapsed = time.monotonic() - start_time
stdout = stdout_bytes.decode("utf-8", errors="replace")[: self._settings.max_output_size]
stderr = stderr_bytes.decode("utf-8", errors="replace")[: self._settings.max_output_size]
return ExecutionResult(
success=process.returncode == 0,
stdout=stdout,
stderr=stderr,
return_code=process.returncode or 0,
file_path=file_path,
execution_time=elapsed,
)
except Exception as e:
elapsed = time.monotonic() - start_time
logger.exception("Error executing %s", file_path)
return ExecutionResult(
success=False,
stderr=f"Execution error: {str(e)}",
return_code=-1,
file_path=file_path,
execution_time=elapsed,
)
finally:
async with self._lock:
self._active_processes.pop(process_id, None)
async def cleanup(self) -> None:
"""Kill all active processes during shutdown."""
async with self._lock:
for pid, proc in self._active_processes.items():
try:
proc.kill()
logger.info("Killed process %s", pid)
except ProcessLookupError:
pass
self._active_processes.clear()
# Module-level singleton
_executor: CodeExecutor | None = None
def get_executor() -> CodeExecutor:
global _executor
if _executor is None:
_executor = CodeExecutor()
return _executor
def reset_executor(settings: Settings | None = None) -> CodeExecutor:
global _executor
_executor = CodeExecutor(settings)
return _executor