ai-developer-agent / backend /executor.py
AI Developer Agent
AI Developer Agent v1.0 backend
763ef0d
"""
Executor - runs plan actions. Uses E2B sandbox when E2B_API_KEY is set,
otherwise falls back to local subprocess execution (with strict allowlist).
"""
from __future__ import annotations
import os
import shlex
import subprocess
import logging
import threading
import time
from dataclasses import dataclass, field
from typing import Any, Dict, Generator, List, Optional
logger = logging.getLogger("executor")
E2B_API_KEY = os.getenv("E2B_API_KEY", "")
@dataclass
class ExecutionResult:
ok: bool
stdout: str = ""
stderr: str = ""
exit_code: int = 0
duration_ms: float = 0.0
meta: Dict[str, Any] = field(default_factory=dict)
# ---------------------------------------------------------------------------
# E2B sandbox wrapper (with graceful fallback)
# ---------------------------------------------------------------------------
class E2BSandbox:
"""Thin wrapper around e2b_code_interpreter. Created lazily."""
def __init__(self) -> None:
self._sbx = None
self._lock = threading.Lock()
self._available = False
if E2B_API_KEY:
try:
from e2b_code_interpreter import Sandbox # type: ignore
self._Sandbox = Sandbox
self._available = True
except Exception as e:
logger.warning("E2B SDK not available: %s", e)
self._available = False
@property
def available(self) -> bool:
return self._available
def _ensure(self):
if self._sbx is None:
self._sbx = self._Sandbox(api_key=E2B_API_KEY)
return self._sbx
def run_shell(self, cmd: str, timeout: float = 120.0) -> ExecutionResult:
started = time.time()
with self._lock:
try:
sbx = self._ensure()
# Newer e2b SDKs use sbx.commands.run
try:
cmd_result = sbx.commands.run(cmd, timeout=int(timeout))
stdout = getattr(cmd_result, "stdout", "") or ""
stderr = getattr(cmd_result, "stderr", "") or ""
exit_code = getattr(cmd_result, "exit_code", 0) or 0
except AttributeError:
# Fallback for legacy SDK
res = sbx.run_code(f"import subprocess; r=subprocess.run({cmd!r}, shell=True, capture_output=True, text=True, timeout={timeout}); print(r.stdout); print(r.stderr)")
stdout = "\n".join([str(getattr(r, "text", "")) for r in getattr(res, "logs", {}).get("stdout", []) or []])
stderr = ""
exit_code = 0
ok = exit_code == 0
return ExecutionResult(
ok=ok, stdout=stdout, stderr=stderr, exit_code=exit_code,
duration_ms=(time.time() - started) * 1000,
meta={"engine": "e2b"},
)
except Exception as e:
logger.exception("E2B run_shell failed")
return ExecutionResult(
ok=False, stderr=str(e), exit_code=1,
duration_ms=(time.time() - started) * 1000,
meta={"engine": "e2b", "error": True},
)
def run_python(self, code: str, timeout: float = 120.0) -> ExecutionResult:
started = time.time()
with self._lock:
try:
sbx = self._ensure()
try:
res = sbx.run_code(code, timeout=int(timeout))
stdout_logs = []
stderr_logs = []
if hasattr(res, "logs"):
for entry in getattr(res.logs, "stdout", []) or []:
stdout_logs.append(str(entry))
for entry in getattr(res.logs, "stderr", []) or []:
stderr_logs.append(str(entry))
return ExecutionResult(
ok=True,
stdout="\n".join(stdout_logs),
stderr="\n".join(stderr_logs),
exit_code=0,
duration_ms=(time.time() - started) * 1000,
meta={"engine": "e2b"},
)
except Exception as e:
return ExecutionResult(
ok=False, stderr=str(e), exit_code=1,
duration_ms=(time.time() - started) * 1000,
meta={"engine": "e2b", "error": True},
)
except Exception as e:
return ExecutionResult(
ok=False, stderr=str(e), exit_code=1,
duration_ms=(time.time() - started) * 1000,
meta={"engine": "e2b", "error": True},
)
def close(self):
with self._lock:
try:
if self._sbx is not None:
self._sbx.kill()
except Exception:
pass
self._sbx = None
# ---------------------------------------------------------------------------
# Local subprocess fallback - LIMITED commands only
# ---------------------------------------------------------------------------
_DISALLOWED_PATTERNS = [
"rm -rf /",
":(){:|:&};:",
"mkfs",
"> /dev/sda",
]
def _local_run_shell(cmd: str, timeout: float = 120.0) -> ExecutionResult:
started = time.time()
if any(p in cmd for p in _DISALLOWED_PATTERNS):
return ExecutionResult(ok=False, stderr="Disallowed command", exit_code=126,
duration_ms=(time.time() - started) * 1000)
try:
res = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=timeout,
)
return ExecutionResult(
ok=res.returncode == 0,
stdout=res.stdout or "",
stderr=res.stderr or "",
exit_code=res.returncode,
duration_ms=(time.time() - started) * 1000,
meta={"engine": "local"},
)
except subprocess.TimeoutExpired as e:
return ExecutionResult(ok=False, stderr=f"timeout: {e}", exit_code=124,
duration_ms=(time.time() - started) * 1000,
meta={"engine": "local"})
except Exception as e:
return ExecutionResult(ok=False, stderr=str(e), exit_code=1,
duration_ms=(time.time() - started) * 1000,
meta={"engine": "local"})
def _local_run_python(code: str, timeout: float = 120.0) -> ExecutionResult:
# Run python in subprocess for isolation
started = time.time()
try:
res = subprocess.run(
["python3", "-c", code], capture_output=True, text=True, timeout=timeout,
)
return ExecutionResult(
ok=res.returncode == 0,
stdout=res.stdout or "",
stderr=res.stderr or "",
exit_code=res.returncode,
duration_ms=(time.time() - started) * 1000,
meta={"engine": "local"},
)
except subprocess.TimeoutExpired as e:
return ExecutionResult(ok=False, stderr=f"timeout: {e}", exit_code=124,
duration_ms=(time.time() - started) * 1000)
except Exception as e:
return ExecutionResult(ok=False, stderr=str(e), exit_code=1,
duration_ms=(time.time() - started) * 1000)
# ---------------------------------------------------------------------------
# Executor singleton
# ---------------------------------------------------------------------------
class Executor:
def __init__(self) -> None:
self.sandbox = E2BSandbox() if E2B_API_KEY else None
def shell(self, cmd: str, timeout: float = 120.0) -> ExecutionResult:
if self.sandbox and self.sandbox.available:
return self.sandbox.run_shell(cmd, timeout=timeout)
return _local_run_shell(cmd, timeout=timeout)
def python(self, code: str, timeout: float = 120.0) -> ExecutionResult:
if self.sandbox and self.sandbox.available:
return self.sandbox.run_python(code, timeout=timeout)
return _local_run_python(code, timeout=timeout)
def inspect_runtime(self) -> Dict[str, str]:
info: Dict[str, str] = {}
for label, cmd in [
("python", "python3 --version"),
("node", "node --version"),
("npm", "npm --version"),
("git", "git --version"),
("playwright", "python3 -c 'import playwright; print(playwright.__version__)' 2>/dev/null || echo 'not installed'"),
]:
r = self.shell(cmd, timeout=15)
info[label] = (r.stdout or r.stderr).strip()[:200]
return info
def close(self):
if self.sandbox:
self.sandbox.close()
_executor: Optional[Executor] = None
def get_executor() -> Executor:
global _executor
if _executor is None:
_executor = Executor()
return _executor