File size: 9,098 Bytes
763ef0d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
"""
Executor - runs plan actions. Uses E2B sandbox when E2B_API_KEY is set,
otherwise falls back to local subprocess execution (with strict allowlist).
"""
from __future__ import annotations

import os
import shlex
import subprocess
import logging
import threading
import time
from dataclasses import dataclass, field
from typing import Any, Dict, Generator, List, Optional

logger = logging.getLogger("executor")

E2B_API_KEY = os.getenv("E2B_API_KEY", "")


@dataclass
class ExecutionResult:
    ok: bool
    stdout: str = ""
    stderr: str = ""
    exit_code: int = 0
    duration_ms: float = 0.0
    meta: Dict[str, Any] = field(default_factory=dict)


# ---------------------------------------------------------------------------
# E2B sandbox wrapper (with graceful fallback)
# ---------------------------------------------------------------------------
class E2BSandbox:
    """Thin wrapper around e2b_code_interpreter. Created lazily."""

    def __init__(self) -> None:
        self._sbx = None
        self._lock = threading.Lock()
        self._available = False
        if E2B_API_KEY:
            try:
                from e2b_code_interpreter import Sandbox  # type: ignore
                self._Sandbox = Sandbox
                self._available = True
            except Exception as e:
                logger.warning("E2B SDK not available: %s", e)
                self._available = False

    @property
    def available(self) -> bool:
        return self._available

    def _ensure(self):
        if self._sbx is None:
            self._sbx = self._Sandbox(api_key=E2B_API_KEY)
        return self._sbx

    def run_shell(self, cmd: str, timeout: float = 120.0) -> ExecutionResult:
        started = time.time()
        with self._lock:
            try:
                sbx = self._ensure()
                # Newer e2b SDKs use sbx.commands.run
                try:
                    cmd_result = sbx.commands.run(cmd, timeout=int(timeout))
                    stdout = getattr(cmd_result, "stdout", "") or ""
                    stderr = getattr(cmd_result, "stderr", "") or ""
                    exit_code = getattr(cmd_result, "exit_code", 0) or 0
                except AttributeError:
                    # Fallback for legacy SDK
                    res = sbx.run_code(f"import subprocess; r=subprocess.run({cmd!r}, shell=True, capture_output=True, text=True, timeout={timeout}); print(r.stdout); print(r.stderr)")
                    stdout = "\n".join([str(getattr(r, "text", "")) for r in getattr(res, "logs", {}).get("stdout", []) or []])
                    stderr = ""
                    exit_code = 0
                ok = exit_code == 0
                return ExecutionResult(
                    ok=ok, stdout=stdout, stderr=stderr, exit_code=exit_code,
                    duration_ms=(time.time() - started) * 1000,
                    meta={"engine": "e2b"},
                )
            except Exception as e:
                logger.exception("E2B run_shell failed")
                return ExecutionResult(
                    ok=False, stderr=str(e), exit_code=1,
                    duration_ms=(time.time() - started) * 1000,
                    meta={"engine": "e2b", "error": True},
                )

    def run_python(self, code: str, timeout: float = 120.0) -> ExecutionResult:
        started = time.time()
        with self._lock:
            try:
                sbx = self._ensure()
                try:
                    res = sbx.run_code(code, timeout=int(timeout))
                    stdout_logs = []
                    stderr_logs = []
                    if hasattr(res, "logs"):
                        for entry in getattr(res.logs, "stdout", []) or []:
                            stdout_logs.append(str(entry))
                        for entry in getattr(res.logs, "stderr", []) or []:
                            stderr_logs.append(str(entry))
                    return ExecutionResult(
                        ok=True,
                        stdout="\n".join(stdout_logs),
                        stderr="\n".join(stderr_logs),
                        exit_code=0,
                        duration_ms=(time.time() - started) * 1000,
                        meta={"engine": "e2b"},
                    )
                except Exception as e:
                    return ExecutionResult(
                        ok=False, stderr=str(e), exit_code=1,
                        duration_ms=(time.time() - started) * 1000,
                        meta={"engine": "e2b", "error": True},
                    )
            except Exception as e:
                return ExecutionResult(
                    ok=False, stderr=str(e), exit_code=1,
                    duration_ms=(time.time() - started) * 1000,
                    meta={"engine": "e2b", "error": True},
                )

    def close(self):
        with self._lock:
            try:
                if self._sbx is not None:
                    self._sbx.kill()
            except Exception:
                pass
            self._sbx = None


# ---------------------------------------------------------------------------
# Local subprocess fallback - LIMITED commands only
# ---------------------------------------------------------------------------
_DISALLOWED_PATTERNS = [
    "rm -rf /",
    ":(){:|:&};:",
    "mkfs",
    "> /dev/sda",
]


def _local_run_shell(cmd: str, timeout: float = 120.0) -> ExecutionResult:
    started = time.time()
    if any(p in cmd for p in _DISALLOWED_PATTERNS):
        return ExecutionResult(ok=False, stderr="Disallowed command", exit_code=126,
                               duration_ms=(time.time() - started) * 1000)
    try:
        res = subprocess.run(
            cmd, shell=True, capture_output=True, text=True, timeout=timeout,
        )
        return ExecutionResult(
            ok=res.returncode == 0,
            stdout=res.stdout or "",
            stderr=res.stderr or "",
            exit_code=res.returncode,
            duration_ms=(time.time() - started) * 1000,
            meta={"engine": "local"},
        )
    except subprocess.TimeoutExpired as e:
        return ExecutionResult(ok=False, stderr=f"timeout: {e}", exit_code=124,
                               duration_ms=(time.time() - started) * 1000,
                               meta={"engine": "local"})
    except Exception as e:
        return ExecutionResult(ok=False, stderr=str(e), exit_code=1,
                               duration_ms=(time.time() - started) * 1000,
                               meta={"engine": "local"})


def _local_run_python(code: str, timeout: float = 120.0) -> ExecutionResult:
    # Run python in subprocess for isolation
    started = time.time()
    try:
        res = subprocess.run(
            ["python3", "-c", code], capture_output=True, text=True, timeout=timeout,
        )
        return ExecutionResult(
            ok=res.returncode == 0,
            stdout=res.stdout or "",
            stderr=res.stderr or "",
            exit_code=res.returncode,
            duration_ms=(time.time() - started) * 1000,
            meta={"engine": "local"},
        )
    except subprocess.TimeoutExpired as e:
        return ExecutionResult(ok=False, stderr=f"timeout: {e}", exit_code=124,
                               duration_ms=(time.time() - started) * 1000)
    except Exception as e:
        return ExecutionResult(ok=False, stderr=str(e), exit_code=1,
                               duration_ms=(time.time() - started) * 1000)


# ---------------------------------------------------------------------------
# Executor singleton
# ---------------------------------------------------------------------------
class Executor:
    def __init__(self) -> None:
        self.sandbox = E2BSandbox() if E2B_API_KEY else None

    def shell(self, cmd: str, timeout: float = 120.0) -> ExecutionResult:
        if self.sandbox and self.sandbox.available:
            return self.sandbox.run_shell(cmd, timeout=timeout)
        return _local_run_shell(cmd, timeout=timeout)

    def python(self, code: str, timeout: float = 120.0) -> ExecutionResult:
        if self.sandbox and self.sandbox.available:
            return self.sandbox.run_python(code, timeout=timeout)
        return _local_run_python(code, timeout=timeout)

    def inspect_runtime(self) -> Dict[str, str]:
        info: Dict[str, str] = {}
        for label, cmd in [
            ("python", "python3 --version"),
            ("node", "node --version"),
            ("npm", "npm --version"),
            ("git", "git --version"),
            ("playwright", "python3 -c 'import playwright; print(playwright.__version__)' 2>/dev/null || echo 'not installed'"),
        ]:
            r = self.shell(cmd, timeout=15)
            info[label] = (r.stdout or r.stderr).strip()[:200]
        return info

    def close(self):
        if self.sandbox:
            self.sandbox.close()


_executor: Optional[Executor] = None


def get_executor() -> Executor:
    global _executor
    if _executor is None:
        _executor = Executor()
    return _executor