sarveshpatel commited on
Commit
6b64e5c
·
verified ·
1 Parent(s): d54d7d2

Create app/executor.py

Browse files
Files changed (1) hide show
  1. app/executor.py +189 -0
app/executor.py ADDED
@@ -0,0 +1,189 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Code execution engine with concurrency control and timeout management."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import asyncio
6
+ import logging
7
+ import os
8
+ import shlex
9
+ import time
10
+ from pathlib import Path
11
+
12
+ from app.config import Settings, get_settings
13
+ from app.models import ExecutionResult
14
+
15
+ logger = logging.getLogger(__name__)
16
+
17
+
18
+ class CodeExecutor:
19
+ """Manages concurrent Python code execution with resource limits."""
20
+
21
+ def __init__(self, settings: Settings | None = None):
22
+ self._settings = settings or get_settings()
23
+ self._semaphore = asyncio.Semaphore(self._settings.max_concurrent_executions)
24
+ self._active_processes: dict[str, asyncio.subprocess.Process] = {}
25
+ self._lock = asyncio.Lock()
26
+
27
+ @property
28
+ def settings(self) -> Settings:
29
+ return self._settings
30
+
31
+ def update_settings(self, settings: Settings) -> None:
32
+ self._settings = settings
33
+ self._semaphore = asyncio.Semaphore(settings.max_concurrent_executions)
34
+
35
+ async def execute_file(self, file_path: str) -> ExecutionResult:
36
+ """Execute a Python file with concurrency control and timeout."""
37
+ path = Path(file_path)
38
+ if not path.exists():
39
+ return ExecutionResult(
40
+ success=False,
41
+ stderr=f"File not found: {file_path}",
42
+ return_code=-1,
43
+ file_path=file_path,
44
+ )
45
+
46
+ if not path.suffix == ".py":
47
+ return ExecutionResult(
48
+ success=False,
49
+ stderr="Only .py files can be executed",
50
+ return_code=-1,
51
+ file_path=file_path,
52
+ )
53
+
54
+ async with self._semaphore:
55
+ return await self._run_python(file_path)
56
+
57
+ async def execute_code(self, code: str, filename: str | None = None) -> ExecutionResult:
58
+ """Write code to a file and execute it."""
59
+ from app.file_manager import FileManager
60
+
61
+ fm = FileManager(self._settings)
62
+ result = fm.create_file(code, filename)
63
+ if not result.success:
64
+ return ExecutionResult(
65
+ success=False,
66
+ stderr=result.message,
67
+ return_code=-1,
68
+ )
69
+
70
+ return await self.execute_file(result.file_path)
71
+
72
+ async def _run_python(self, file_path: str) -> ExecutionResult:
73
+ """Run a Python file as a subprocess."""
74
+ start_time = time.monotonic()
75
+ python_exec = self._settings.get_python_executable()
76
+
77
+ # Build command
78
+ if self._settings.env_type.value == "conda":
79
+ cmd = f"{python_exec} {shlex.quote(file_path)}"
80
+ use_shell = True
81
+ else:
82
+ cmd_parts = [python_exec, file_path]
83
+ cmd = cmd_parts
84
+ use_shell = False
85
+
86
+ process = None
87
+ process_id = f"{file_path}_{time.monotonic()}"
88
+
89
+ try:
90
+ env = os.environ.copy()
91
+ # Ensure the virtual environment is activated properly
92
+ if self._settings.env_type.value in ("venv", "venv-uv"):
93
+ venv = self._settings.venv_path or self._settings.uv_venv_path
94
+ if venv:
95
+ env["VIRTUAL_ENV"] = venv
96
+ env["PATH"] = f"{Path(venv) / 'bin'}:{env.get('PATH', '')}"
97
+
98
+ if use_shell:
99
+ process = await asyncio.create_subprocess_shell(
100
+ cmd,
101
+ stdout=asyncio.subprocess.PIPE,
102
+ stderr=asyncio.subprocess.PIPE,
103
+ env=env,
104
+ cwd=self._settings.code_storage_dir,
105
+ )
106
+ else:
107
+ process = await asyncio.create_subprocess_exec(
108
+ *cmd,
109
+ stdout=asyncio.subprocess.PIPE,
110
+ stderr=asyncio.subprocess.PIPE,
111
+ env=env,
112
+ cwd=self._settings.code_storage_dir,
113
+ )
114
+
115
+ async with self._lock:
116
+ self._active_processes[process_id] = process
117
+
118
+ try:
119
+ stdout_bytes, stderr_bytes = await asyncio.wait_for(
120
+ process.communicate(),
121
+ timeout=self._settings.execution_timeout,
122
+ )
123
+ except asyncio.TimeoutError:
124
+ process.kill()
125
+ await process.wait()
126
+ elapsed = time.monotonic() - start_time
127
+ return ExecutionResult(
128
+ success=False,
129
+ stderr=f"Execution timed out after {self._settings.execution_timeout}s",
130
+ return_code=-1,
131
+ file_path=file_path,
132
+ execution_time=elapsed,
133
+ )
134
+
135
+ elapsed = time.monotonic() - start_time
136
+
137
+ stdout = stdout_bytes.decode("utf-8", errors="replace")[: self._settings.max_output_size]
138
+ stderr = stderr_bytes.decode("utf-8", errors="replace")[: self._settings.max_output_size]
139
+
140
+ return ExecutionResult(
141
+ success=process.returncode == 0,
142
+ stdout=stdout,
143
+ stderr=stderr,
144
+ return_code=process.returncode or 0,
145
+ file_path=file_path,
146
+ execution_time=elapsed,
147
+ )
148
+
149
+ except Exception as e:
150
+ elapsed = time.monotonic() - start_time
151
+ logger.exception("Error executing %s", file_path)
152
+ return ExecutionResult(
153
+ success=False,
154
+ stderr=f"Execution error: {str(e)}",
155
+ return_code=-1,
156
+ file_path=file_path,
157
+ execution_time=elapsed,
158
+ )
159
+ finally:
160
+ async with self._lock:
161
+ self._active_processes.pop(process_id, None)
162
+
163
+ async def cleanup(self) -> None:
164
+ """Kill all active processes during shutdown."""
165
+ async with self._lock:
166
+ for pid, proc in self._active_processes.items():
167
+ try:
168
+ proc.kill()
169
+ logger.info("Killed process %s", pid)
170
+ except ProcessLookupError:
171
+ pass
172
+ self._active_processes.clear()
173
+
174
+
175
+ # Module-level singleton
176
+ _executor: CodeExecutor | None = None
177
+
178
+
179
+ def get_executor() -> CodeExecutor:
180
+ global _executor
181
+ if _executor is None:
182
+ _executor = CodeExecutor()
183
+ return _executor
184
+
185
+
186
+ def reset_executor(settings: Settings | None = None) -> CodeExecutor:
187
+ global _executor
188
+ _executor = CodeExecutor(settings)
189
+ return _executor