""" E2B Sandbox Executor. Owns the real execution runtime. Wraps the e2b_code_interpreter SDK so the rest of the backend never imports e2b directly. Capabilities (Phase 1): - run_python(code): execute Python in a sandbox, stream stdout/stderr - run_shell(cmd): execute shell command, stream stdout/stderr - write_file(path, contents) - read_file(path) - close() A sandbox is created per task and closed at the end (Phase 1: no reuse). """ from __future__ import annotations import asyncio import logging import os from contextlib import asynccontextmanager from dataclasses import dataclass, field from typing import AsyncIterator, Dict, List, Optional logger = logging.getLogger(__name__) # ---------------------------------------------------------------------------- # Event types streamed back to the client # ---------------------------------------------------------------------------- @dataclass class ExecEvent: type: str # 'sandbox_started' | 'stdout' | 'stderr' | 'result' | 'error' | 'sandbox_closed' data: str = "" meta: Dict = field(default_factory=dict) # ---------------------------------------------------------------------------- # E2B SDK import (lazy so missing dep doesn't crash module import) # ---------------------------------------------------------------------------- def _get_sandbox_class(): try: from e2b_code_interpreter import Sandbox # type: ignore return Sandbox except ImportError as e: raise RuntimeError( "e2b_code_interpreter not installed. Add `e2b-code-interpreter` to requirements." ) from e # ---------------------------------------------------------------------------- # Executor # ---------------------------------------------------------------------------- class E2BExecutor: """One sandbox = one E2BExecutor instance. The SDK is synchronous; we offload calls to a thread to keep the event loop free. """ def __init__(self, api_key: Optional[str] = None, template: Optional[str] = None, timeout: int = 300) -> None: self.api_key = api_key or os.environ.get("E2B_API_KEY", "") if not self.api_key: raise RuntimeError("E2B_API_KEY is not configured") self.template = template or os.environ.get("E2B_TEMPLATE") # None → default self.timeout = timeout self._sandbox = None # type: ignore self._lock = asyncio.Lock() # ---- lifecycle ---------------------------------------------------------- async def start(self) -> None: if self._sandbox is not None: return Sandbox = _get_sandbox_class() def _create(): kwargs = {"api_key": self.api_key, "timeout": self.timeout} if self.template: return Sandbox(self.template, **kwargs) return Sandbox(**kwargs) self._sandbox = await asyncio.to_thread(_create) logger.info("E2B sandbox started: id=%s", getattr(self._sandbox, "sandbox_id", "?")) async def close(self) -> None: if self._sandbox is None: return sb = self._sandbox self._sandbox = None try: await asyncio.to_thread(sb.kill) except Exception as e: logger.warning("E2B close error (non-fatal): %s", e) @property def sandbox_id(self) -> Optional[str]: return getattr(self._sandbox, "sandbox_id", None) if self._sandbox else None # ---- execution ---------------------------------------------------------- async def run_python(self, code: str) -> AsyncIterator[ExecEvent]: """Run Python code; yield streaming events.""" if self._sandbox is None: await self.start() sb = self._sandbox # Queue bridging the SDK callback thread → asyncio loop loop = asyncio.get_running_loop() queue: asyncio.Queue[ExecEvent] = asyncio.Queue() def on_stdout(msg) -> None: text = getattr(msg, "line", None) or getattr(msg, "text", None) or str(msg) loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("stdout", text)) def on_stderr(msg) -> None: text = getattr(msg, "line", None) or getattr(msg, "text", None) or str(msg) loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("stderr", text)) async def runner(): try: def _exec(): return sb.run_code(code, on_stdout=on_stdout, on_stderr=on_stderr) execution = await asyncio.to_thread(_exec) # Final result result_text = "" if execution is not None: err = getattr(execution, "error", None) if err is not None: loop.call_soon_threadsafe( queue.put_nowait, ExecEvent("error", f"{getattr(err, 'name', 'Error')}: {getattr(err, 'value', err)}", {"traceback": getattr(err, "traceback", "")}), ) results = getattr(execution, "results", []) or [] if results: for r in results: t = getattr(r, "text", None) if t: result_text += t + "\n" loop.call_soon_threadsafe( queue.put_nowait, ExecEvent("result", result_text.strip()), ) except Exception as e: loop.call_soon_threadsafe( queue.put_nowait, ExecEvent("error", str(e)), ) finally: loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("__done__")) task = asyncio.create_task(runner()) try: while True: ev = await queue.get() if ev.type == "__done__": break yield ev finally: if not task.done(): task.cancel() async def run_shell(self, cmd: str) -> AsyncIterator[ExecEvent]: """Run shell command via sandbox.commands.run().""" if self._sandbox is None: await self.start() sb = self._sandbox loop = asyncio.get_running_loop() queue: asyncio.Queue[ExecEvent] = asyncio.Queue() def on_stdout(data) -> None: loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("stdout", str(data))) def on_stderr(data) -> None: loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("stderr", str(data))) async def runner(): try: def _exec(): return sb.commands.run(cmd, on_stdout=on_stdout, on_stderr=on_stderr) result = await asyncio.to_thread(_exec) exit_code = getattr(result, "exit_code", None) loop.call_soon_threadsafe( queue.put_nowait, ExecEvent("result", "", {"exit_code": exit_code}), ) except Exception as e: loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("error", str(e))) finally: loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("__done__")) task = asyncio.create_task(runner()) try: while True: ev = await queue.get() if ev.type == "__done__": break yield ev finally: if not task.done(): task.cancel() async def write_file(self, path: str, contents: str) -> None: if self._sandbox is None: await self.start() sb = self._sandbox await asyncio.to_thread(sb.files.write, path, contents) async def read_file(self, path: str) -> str: if self._sandbox is None: await self.start() sb = self._sandbox return await asyncio.to_thread(sb.files.read, path) # ---------------------------------------------------------------------------- # Convenience context manager # ---------------------------------------------------------------------------- @asynccontextmanager async def sandbox_session(timeout: int = 300): ex = E2BExecutor(timeout=timeout) try: await ex.start() yield ex finally: await ex.close()