File size: 8,481 Bytes
46258b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"""
E2B Sandbox Executor.

Owns the real execution runtime. Wraps the e2b_code_interpreter SDK so the
rest of the backend never imports e2b directly.

Capabilities (Phase 1):
  - run_python(code): execute Python in a sandbox, stream stdout/stderr
  - run_shell(cmd): execute shell command, stream stdout/stderr
  - write_file(path, contents)
  - read_file(path)
  - close()

A sandbox is created per task and closed at the end (Phase 1: no reuse).
"""

from __future__ import annotations

import asyncio
import logging
import os
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import AsyncIterator, Dict, List, Optional

logger = logging.getLogger(__name__)


# ----------------------------------------------------------------------------
# Event types streamed back to the client
# ----------------------------------------------------------------------------

@dataclass
class ExecEvent:
    type: str          # 'sandbox_started' | 'stdout' | 'stderr' | 'result' | 'error' | 'sandbox_closed'
    data: str = ""
    meta: Dict = field(default_factory=dict)


# ----------------------------------------------------------------------------
# E2B SDK import (lazy so missing dep doesn't crash module import)
# ----------------------------------------------------------------------------

def _get_sandbox_class():
    try:
        from e2b_code_interpreter import Sandbox  # type: ignore
        return Sandbox
    except ImportError as e:
        raise RuntimeError(
            "e2b_code_interpreter not installed. Add `e2b-code-interpreter` to requirements."
        ) from e


# ----------------------------------------------------------------------------
# Executor
# ----------------------------------------------------------------------------

class E2BExecutor:
    """One sandbox = one E2BExecutor instance.

    The SDK is synchronous; we offload calls to a thread to keep the event
    loop free.
    """

    def __init__(self, api_key: Optional[str] = None, template: Optional[str] = None,
                 timeout: int = 300) -> None:
        self.api_key = api_key or os.environ.get("E2B_API_KEY", "")
        if not self.api_key:
            raise RuntimeError("E2B_API_KEY is not configured")
        self.template = template or os.environ.get("E2B_TEMPLATE")  # None → default
        self.timeout = timeout
        self._sandbox = None  # type: ignore
        self._lock = asyncio.Lock()

    # ---- lifecycle ----------------------------------------------------------

    async def start(self) -> None:
        if self._sandbox is not None:
            return
        Sandbox = _get_sandbox_class()
        def _create():
            kwargs = {"api_key": self.api_key, "timeout": self.timeout}
            if self.template:
                return Sandbox(self.template, **kwargs)
            return Sandbox(**kwargs)
        self._sandbox = await asyncio.to_thread(_create)
        logger.info("E2B sandbox started: id=%s", getattr(self._sandbox, "sandbox_id", "?"))

    async def close(self) -> None:
        if self._sandbox is None:
            return
        sb = self._sandbox
        self._sandbox = None
        try:
            await asyncio.to_thread(sb.kill)
        except Exception as e:
            logger.warning("E2B close error (non-fatal): %s", e)

    @property
    def sandbox_id(self) -> Optional[str]:
        return getattr(self._sandbox, "sandbox_id", None) if self._sandbox else None

    # ---- execution ----------------------------------------------------------

    async def run_python(self, code: str) -> AsyncIterator[ExecEvent]:
        """Run Python code; yield streaming events."""
        if self._sandbox is None:
            await self.start()
        sb = self._sandbox

        # Queue bridging the SDK callback thread → asyncio loop
        loop = asyncio.get_running_loop()
        queue: asyncio.Queue[ExecEvent] = asyncio.Queue()

        def on_stdout(msg) -> None:
            text = getattr(msg, "line", None) or getattr(msg, "text", None) or str(msg)
            loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("stdout", text))

        def on_stderr(msg) -> None:
            text = getattr(msg, "line", None) or getattr(msg, "text", None) or str(msg)
            loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("stderr", text))

        async def runner():
            try:
                def _exec():
                    return sb.run_code(code, on_stdout=on_stdout, on_stderr=on_stderr)
                execution = await asyncio.to_thread(_exec)
                # Final result
                result_text = ""
                if execution is not None:
                    err = getattr(execution, "error", None)
                    if err is not None:
                        loop.call_soon_threadsafe(
                            queue.put_nowait,
                            ExecEvent("error", f"{getattr(err, 'name', 'Error')}: {getattr(err, 'value', err)}",
                                      {"traceback": getattr(err, "traceback", "")}),
                        )
                    results = getattr(execution, "results", []) or []
                    if results:
                        for r in results:
                            t = getattr(r, "text", None)
                            if t:
                                result_text += t + "\n"
                loop.call_soon_threadsafe(
                    queue.put_nowait,
                    ExecEvent("result", result_text.strip()),
                )
            except Exception as e:
                loop.call_soon_threadsafe(
                    queue.put_nowait, ExecEvent("error", str(e)),
                )
            finally:
                loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("__done__"))

        task = asyncio.create_task(runner())
        try:
            while True:
                ev = await queue.get()
                if ev.type == "__done__":
                    break
                yield ev
        finally:
            if not task.done():
                task.cancel()

    async def run_shell(self, cmd: str) -> AsyncIterator[ExecEvent]:
        """Run shell command via sandbox.commands.run()."""
        if self._sandbox is None:
            await self.start()
        sb = self._sandbox
        loop = asyncio.get_running_loop()
        queue: asyncio.Queue[ExecEvent] = asyncio.Queue()

        def on_stdout(data) -> None:
            loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("stdout", str(data)))

        def on_stderr(data) -> None:
            loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("stderr", str(data)))

        async def runner():
            try:
                def _exec():
                    return sb.commands.run(cmd, on_stdout=on_stdout, on_stderr=on_stderr)
                result = await asyncio.to_thread(_exec)
                exit_code = getattr(result, "exit_code", None)
                loop.call_soon_threadsafe(
                    queue.put_nowait,
                    ExecEvent("result", "", {"exit_code": exit_code}),
                )
            except Exception as e:
                loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("error", str(e)))
            finally:
                loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("__done__"))

        task = asyncio.create_task(runner())
        try:
            while True:
                ev = await queue.get()
                if ev.type == "__done__":
                    break
                yield ev
        finally:
            if not task.done():
                task.cancel()

    async def write_file(self, path: str, contents: str) -> None:
        if self._sandbox is None:
            await self.start()
        sb = self._sandbox
        await asyncio.to_thread(sb.files.write, path, contents)

    async def read_file(self, path: str) -> str:
        if self._sandbox is None:
            await self.start()
        sb = self._sandbox
        return await asyncio.to_thread(sb.files.read, path)


# ----------------------------------------------------------------------------
# Convenience context manager
# ----------------------------------------------------------------------------

@asynccontextmanager
async def sandbox_session(timeout: int = 300):
    ex = E2BExecutor(timeout=timeout)
    try:
        await ex.start()
        yield ex
    finally:
        await ex.close()