openhands-backend / executor.py
PYAE1994's picture
Phase 1 backend deploy
46258b3 verified
"""
E2B Sandbox Executor.
Owns the real execution runtime. Wraps the e2b_code_interpreter SDK so the
rest of the backend never imports e2b directly.
Capabilities (Phase 1):
- run_python(code): execute Python in a sandbox, stream stdout/stderr
- run_shell(cmd): execute shell command, stream stdout/stderr
- write_file(path, contents)
- read_file(path)
- close()
A sandbox is created per task and closed at the end (Phase 1: no reuse).
"""
from __future__ import annotations
import asyncio
import logging
import os
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import AsyncIterator, Dict, List, Optional
logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------
# Event types streamed back to the client
# ----------------------------------------------------------------------------
@dataclass
class ExecEvent:
type: str # 'sandbox_started' | 'stdout' | 'stderr' | 'result' | 'error' | 'sandbox_closed'
data: str = ""
meta: Dict = field(default_factory=dict)
# ----------------------------------------------------------------------------
# E2B SDK import (lazy so missing dep doesn't crash module import)
# ----------------------------------------------------------------------------
def _get_sandbox_class():
try:
from e2b_code_interpreter import Sandbox # type: ignore
return Sandbox
except ImportError as e:
raise RuntimeError(
"e2b_code_interpreter not installed. Add `e2b-code-interpreter` to requirements."
) from e
# ----------------------------------------------------------------------------
# Executor
# ----------------------------------------------------------------------------
class E2BExecutor:
"""One sandbox = one E2BExecutor instance.
The SDK is synchronous; we offload calls to a thread to keep the event
loop free.
"""
def __init__(self, api_key: Optional[str] = None, template: Optional[str] = None,
timeout: int = 300) -> None:
self.api_key = api_key or os.environ.get("E2B_API_KEY", "")
if not self.api_key:
raise RuntimeError("E2B_API_KEY is not configured")
self.template = template or os.environ.get("E2B_TEMPLATE") # None → default
self.timeout = timeout
self._sandbox = None # type: ignore
self._lock = asyncio.Lock()
# ---- lifecycle ----------------------------------------------------------
async def start(self) -> None:
if self._sandbox is not None:
return
Sandbox = _get_sandbox_class()
def _create():
kwargs = {"api_key": self.api_key, "timeout": self.timeout}
if self.template:
return Sandbox(self.template, **kwargs)
return Sandbox(**kwargs)
self._sandbox = await asyncio.to_thread(_create)
logger.info("E2B sandbox started: id=%s", getattr(self._sandbox, "sandbox_id", "?"))
async def close(self) -> None:
if self._sandbox is None:
return
sb = self._sandbox
self._sandbox = None
try:
await asyncio.to_thread(sb.kill)
except Exception as e:
logger.warning("E2B close error (non-fatal): %s", e)
@property
def sandbox_id(self) -> Optional[str]:
return getattr(self._sandbox, "sandbox_id", None) if self._sandbox else None
# ---- execution ----------------------------------------------------------
async def run_python(self, code: str) -> AsyncIterator[ExecEvent]:
"""Run Python code; yield streaming events."""
if self._sandbox is None:
await self.start()
sb = self._sandbox
# Queue bridging the SDK callback thread → asyncio loop
loop = asyncio.get_running_loop()
queue: asyncio.Queue[ExecEvent] = asyncio.Queue()
def on_stdout(msg) -> None:
text = getattr(msg, "line", None) or getattr(msg, "text", None) or str(msg)
loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("stdout", text))
def on_stderr(msg) -> None:
text = getattr(msg, "line", None) or getattr(msg, "text", None) or str(msg)
loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("stderr", text))
async def runner():
try:
def _exec():
return sb.run_code(code, on_stdout=on_stdout, on_stderr=on_stderr)
execution = await asyncio.to_thread(_exec)
# Final result
result_text = ""
if execution is not None:
err = getattr(execution, "error", None)
if err is not None:
loop.call_soon_threadsafe(
queue.put_nowait,
ExecEvent("error", f"{getattr(err, 'name', 'Error')}: {getattr(err, 'value', err)}",
{"traceback": getattr(err, "traceback", "")}),
)
results = getattr(execution, "results", []) or []
if results:
for r in results:
t = getattr(r, "text", None)
if t:
result_text += t + "\n"
loop.call_soon_threadsafe(
queue.put_nowait,
ExecEvent("result", result_text.strip()),
)
except Exception as e:
loop.call_soon_threadsafe(
queue.put_nowait, ExecEvent("error", str(e)),
)
finally:
loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("__done__"))
task = asyncio.create_task(runner())
try:
while True:
ev = await queue.get()
if ev.type == "__done__":
break
yield ev
finally:
if not task.done():
task.cancel()
async def run_shell(self, cmd: str) -> AsyncIterator[ExecEvent]:
"""Run shell command via sandbox.commands.run()."""
if self._sandbox is None:
await self.start()
sb = self._sandbox
loop = asyncio.get_running_loop()
queue: asyncio.Queue[ExecEvent] = asyncio.Queue()
def on_stdout(data) -> None:
loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("stdout", str(data)))
def on_stderr(data) -> None:
loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("stderr", str(data)))
async def runner():
try:
def _exec():
return sb.commands.run(cmd, on_stdout=on_stdout, on_stderr=on_stderr)
result = await asyncio.to_thread(_exec)
exit_code = getattr(result, "exit_code", None)
loop.call_soon_threadsafe(
queue.put_nowait,
ExecEvent("result", "", {"exit_code": exit_code}),
)
except Exception as e:
loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("error", str(e)))
finally:
loop.call_soon_threadsafe(queue.put_nowait, ExecEvent("__done__"))
task = asyncio.create_task(runner())
try:
while True:
ev = await queue.get()
if ev.type == "__done__":
break
yield ev
finally:
if not task.done():
task.cancel()
async def write_file(self, path: str, contents: str) -> None:
if self._sandbox is None:
await self.start()
sb = self._sandbox
await asyncio.to_thread(sb.files.write, path, contents)
async def read_file(self, path: str) -> str:
if self._sandbox is None:
await self.start()
sb = self._sandbox
return await asyncio.to_thread(sb.files.read, path)
# ----------------------------------------------------------------------------
# Convenience context manager
# ----------------------------------------------------------------------------
@asynccontextmanager
async def sandbox_session(timeout: int = 300):
ex = E2BExecutor(timeout=timeout)
try:
await ex.start()
yield ex
finally:
await ex.close()