""" Executor - runs plan actions. Uses E2B sandbox when E2B_API_KEY is set, otherwise falls back to local subprocess execution (with strict allowlist). """ from __future__ import annotations import os import shlex import subprocess import logging import threading import time from dataclasses import dataclass, field from typing import Any, Dict, Generator, List, Optional logger = logging.getLogger("executor") E2B_API_KEY = os.getenv("E2B_API_KEY", "") @dataclass class ExecutionResult: ok: bool stdout: str = "" stderr: str = "" exit_code: int = 0 duration_ms: float = 0.0 meta: Dict[str, Any] = field(default_factory=dict) # --------------------------------------------------------------------------- # E2B sandbox wrapper (with graceful fallback) # --------------------------------------------------------------------------- class E2BSandbox: """Thin wrapper around e2b_code_interpreter. Created lazily.""" def __init__(self) -> None: self._sbx = None self._lock = threading.Lock() self._available = False if E2B_API_KEY: try: from e2b_code_interpreter import Sandbox # type: ignore self._Sandbox = Sandbox self._available = True except Exception as e: logger.warning("E2B SDK not available: %s", e) self._available = False @property def available(self) -> bool: return self._available def _ensure(self): if self._sbx is None: self._sbx = self._Sandbox(api_key=E2B_API_KEY) return self._sbx def run_shell(self, cmd: str, timeout: float = 120.0) -> ExecutionResult: started = time.time() with self._lock: try: sbx = self._ensure() # Newer e2b SDKs use sbx.commands.run try: cmd_result = sbx.commands.run(cmd, timeout=int(timeout)) stdout = getattr(cmd_result, "stdout", "") or "" stderr = getattr(cmd_result, "stderr", "") or "" exit_code = getattr(cmd_result, "exit_code", 0) or 0 except AttributeError: # Fallback for legacy SDK res = sbx.run_code(f"import subprocess; r=subprocess.run({cmd!r}, shell=True, capture_output=True, text=True, timeout={timeout}); print(r.stdout); print(r.stderr)") stdout = "\n".join([str(getattr(r, "text", "")) for r in getattr(res, "logs", {}).get("stdout", []) or []]) stderr = "" exit_code = 0 ok = exit_code == 0 return ExecutionResult( ok=ok, stdout=stdout, stderr=stderr, exit_code=exit_code, duration_ms=(time.time() - started) * 1000, meta={"engine": "e2b"}, ) except Exception as e: logger.exception("E2B run_shell failed") return ExecutionResult( ok=False, stderr=str(e), exit_code=1, duration_ms=(time.time() - started) * 1000, meta={"engine": "e2b", "error": True}, ) def run_python(self, code: str, timeout: float = 120.0) -> ExecutionResult: started = time.time() with self._lock: try: sbx = self._ensure() try: res = sbx.run_code(code, timeout=int(timeout)) stdout_logs = [] stderr_logs = [] if hasattr(res, "logs"): for entry in getattr(res.logs, "stdout", []) or []: stdout_logs.append(str(entry)) for entry in getattr(res.logs, "stderr", []) or []: stderr_logs.append(str(entry)) return ExecutionResult( ok=True, stdout="\n".join(stdout_logs), stderr="\n".join(stderr_logs), exit_code=0, duration_ms=(time.time() - started) * 1000, meta={"engine": "e2b"}, ) except Exception as e: return ExecutionResult( ok=False, stderr=str(e), exit_code=1, duration_ms=(time.time() - started) * 1000, meta={"engine": "e2b", "error": True}, ) except Exception as e: return ExecutionResult( ok=False, stderr=str(e), exit_code=1, duration_ms=(time.time() - started) * 1000, meta={"engine": "e2b", "error": True}, ) def close(self): with self._lock: try: if self._sbx is not None: self._sbx.kill() except Exception: pass self._sbx = None # --------------------------------------------------------------------------- # Local subprocess fallback - LIMITED commands only # --------------------------------------------------------------------------- _DISALLOWED_PATTERNS = [ "rm -rf /", ":(){:|:&};:", "mkfs", "> /dev/sda", ] def _local_run_shell(cmd: str, timeout: float = 120.0) -> ExecutionResult: started = time.time() if any(p in cmd for p in _DISALLOWED_PATTERNS): return ExecutionResult(ok=False, stderr="Disallowed command", exit_code=126, duration_ms=(time.time() - started) * 1000) try: res = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=timeout, ) return ExecutionResult( ok=res.returncode == 0, stdout=res.stdout or "", stderr=res.stderr or "", exit_code=res.returncode, duration_ms=(time.time() - started) * 1000, meta={"engine": "local"}, ) except subprocess.TimeoutExpired as e: return ExecutionResult(ok=False, stderr=f"timeout: {e}", exit_code=124, duration_ms=(time.time() - started) * 1000, meta={"engine": "local"}) except Exception as e: return ExecutionResult(ok=False, stderr=str(e), exit_code=1, duration_ms=(time.time() - started) * 1000, meta={"engine": "local"}) def _local_run_python(code: str, timeout: float = 120.0) -> ExecutionResult: # Run python in subprocess for isolation started = time.time() try: res = subprocess.run( ["python3", "-c", code], capture_output=True, text=True, timeout=timeout, ) return ExecutionResult( ok=res.returncode == 0, stdout=res.stdout or "", stderr=res.stderr or "", exit_code=res.returncode, duration_ms=(time.time() - started) * 1000, meta={"engine": "local"}, ) except subprocess.TimeoutExpired as e: return ExecutionResult(ok=False, stderr=f"timeout: {e}", exit_code=124, duration_ms=(time.time() - started) * 1000) except Exception as e: return ExecutionResult(ok=False, stderr=str(e), exit_code=1, duration_ms=(time.time() - started) * 1000) # --------------------------------------------------------------------------- # Executor singleton # --------------------------------------------------------------------------- class Executor: def __init__(self) -> None: self.sandbox = E2BSandbox() if E2B_API_KEY else None def shell(self, cmd: str, timeout: float = 120.0) -> ExecutionResult: if self.sandbox and self.sandbox.available: return self.sandbox.run_shell(cmd, timeout=timeout) return _local_run_shell(cmd, timeout=timeout) def python(self, code: str, timeout: float = 120.0) -> ExecutionResult: if self.sandbox and self.sandbox.available: return self.sandbox.run_python(code, timeout=timeout) return _local_run_python(code, timeout=timeout) def inspect_runtime(self) -> Dict[str, str]: info: Dict[str, str] = {} for label, cmd in [ ("python", "python3 --version"), ("node", "node --version"), ("npm", "npm --version"), ("git", "git --version"), ("playwright", "python3 -c 'import playwright; print(playwright.__version__)' 2>/dev/null || echo 'not installed'"), ]: r = self.shell(cmd, timeout=15) info[label] = (r.stdout or r.stderr).strip()[:200] return info def close(self): if self.sandbox: self.sandbox.close() _executor: Optional[Executor] = None def get_executor() -> Executor: global _executor if _executor is None: _executor = Executor() return _executor