File size: 6,495 Bytes
6b64e5c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""Code execution engine with concurrency control and timeout management."""

from __future__ import annotations

import asyncio
import logging
import os
import shlex
import time
from pathlib import Path

from app.config import Settings, get_settings
from app.models import ExecutionResult

logger = logging.getLogger(__name__)


class CodeExecutor:
    """Manages concurrent Python code execution with resource limits."""

    def __init__(self, settings: Settings | None = None):
        self._settings = settings or get_settings()
        self._semaphore = asyncio.Semaphore(self._settings.max_concurrent_executions)
        self._active_processes: dict[str, asyncio.subprocess.Process] = {}
        self._lock = asyncio.Lock()

    @property
    def settings(self) -> Settings:
        return self._settings

    def update_settings(self, settings: Settings) -> None:
        self._settings = settings
        self._semaphore = asyncio.Semaphore(settings.max_concurrent_executions)

    async def execute_file(self, file_path: str) -> ExecutionResult:
        """Execute a Python file with concurrency control and timeout."""
        path = Path(file_path)
        if not path.exists():
            return ExecutionResult(
                success=False,
                stderr=f"File not found: {file_path}",
                return_code=-1,
                file_path=file_path,
            )

        if not path.suffix == ".py":
            return ExecutionResult(
                success=False,
                stderr="Only .py files can be executed",
                return_code=-1,
                file_path=file_path,
            )

        async with self._semaphore:
            return await self._run_python(file_path)

    async def execute_code(self, code: str, filename: str | None = None) -> ExecutionResult:
        """Write code to a file and execute it."""
        from app.file_manager import FileManager

        fm = FileManager(self._settings)
        result = fm.create_file(code, filename)
        if not result.success:
            return ExecutionResult(
                success=False,
                stderr=result.message,
                return_code=-1,
            )

        return await self.execute_file(result.file_path)

    async def _run_python(self, file_path: str) -> ExecutionResult:
        """Run a Python file as a subprocess."""
        start_time = time.monotonic()
        python_exec = self._settings.get_python_executable()

        # Build command
        if self._settings.env_type.value == "conda":
            cmd = f"{python_exec} {shlex.quote(file_path)}"
            use_shell = True
        else:
            cmd_parts = [python_exec, file_path]
            cmd = cmd_parts
            use_shell = False

        process = None
        process_id = f"{file_path}_{time.monotonic()}"

        try:
            env = os.environ.copy()
            # Ensure the virtual environment is activated properly
            if self._settings.env_type.value in ("venv", "venv-uv"):
                venv = self._settings.venv_path or self._settings.uv_venv_path
                if venv:
                    env["VIRTUAL_ENV"] = venv
                    env["PATH"] = f"{Path(venv) / 'bin'}:{env.get('PATH', '')}"

            if use_shell:
                process = await asyncio.create_subprocess_shell(
                    cmd,
                    stdout=asyncio.subprocess.PIPE,
                    stderr=asyncio.subprocess.PIPE,
                    env=env,
                    cwd=self._settings.code_storage_dir,
                )
            else:
                process = await asyncio.create_subprocess_exec(
                    *cmd,
                    stdout=asyncio.subprocess.PIPE,
                    stderr=asyncio.subprocess.PIPE,
                    env=env,
                    cwd=self._settings.code_storage_dir,
                )

            async with self._lock:
                self._active_processes[process_id] = process

            try:
                stdout_bytes, stderr_bytes = await asyncio.wait_for(
                    process.communicate(),
                    timeout=self._settings.execution_timeout,
                )
            except asyncio.TimeoutError:
                process.kill()
                await process.wait()
                elapsed = time.monotonic() - start_time
                return ExecutionResult(
                    success=False,
                    stderr=f"Execution timed out after {self._settings.execution_timeout}s",
                    return_code=-1,
                    file_path=file_path,
                    execution_time=elapsed,
                )

            elapsed = time.monotonic() - start_time

            stdout = stdout_bytes.decode("utf-8", errors="replace")[: self._settings.max_output_size]
            stderr = stderr_bytes.decode("utf-8", errors="replace")[: self._settings.max_output_size]

            return ExecutionResult(
                success=process.returncode == 0,
                stdout=stdout,
                stderr=stderr,
                return_code=process.returncode or 0,
                file_path=file_path,
                execution_time=elapsed,
            )

        except Exception as e:
            elapsed = time.monotonic() - start_time
            logger.exception("Error executing %s", file_path)
            return ExecutionResult(
                success=False,
                stderr=f"Execution error: {str(e)}",
                return_code=-1,
                file_path=file_path,
                execution_time=elapsed,
            )
        finally:
            async with self._lock:
                self._active_processes.pop(process_id, None)

    async def cleanup(self) -> None:
        """Kill all active processes during shutdown."""
        async with self._lock:
            for pid, proc in self._active_processes.items():
                try:
                    proc.kill()
                    logger.info("Killed process %s", pid)
                except ProcessLookupError:
                    pass
            self._active_processes.clear()


# Module-level singleton
_executor: CodeExecutor | None = None


def get_executor() -> CodeExecutor:
    global _executor
    if _executor is None:
        _executor = CodeExecutor()
    return _executor


def reset_executor(settings: Settings | None = None) -> CodeExecutor:
    global _executor
    _executor = CodeExecutor(settings)
    return _executor