""" E2B Sandbox Execution Module Handles code execution in E2B remote sandboxes with caching support. Accepts pre-wrapped code from sandbox.py to maintain consistency across execution modes. """ import ast import asyncio import time from typing import Any, Dict from langfuse import observe, get_client from loguru import logger from cuga.config import settings langfuse = get_client() # E2B sandbox imports (optional) try: from e2b_code_interpreter import Sandbox E2B_AVAILABLE = True except ImportError: E2B_AVAILABLE = False Sandbox = None # Constant thread ID for "single" mode (uses cache with global ID) GLOBAL_THREAD_ID = "__global__" # ============================================================================ # Sandbox Cache (moved from e2b_sandbox_cache.py) # ============================================================================ class SandboxCacheEntry: """Entry in the sandbox cache containing sandbox instance and metadata.""" def __init__(self, sandbox: "Sandbox", thread_id: str): self.sandbox = sandbox self.thread_id = thread_id self.created_at = time.time() self.last_used = time.time() self.use_count = 0 def mark_used(self): """Update last used timestamp and increment use count.""" self.last_used = time.time() self.use_count += 1 def is_expired(self, ttl_seconds: int = 3600) -> bool: """Check if entry has expired based on TTL (default 1 hour).""" age = time.time() - self.created_at return age > ttl_seconds def is_alive(self) -> bool: """Check if sandbox is still alive and responsive.""" try: # Simple health check - try to execute trivial code execution = self.sandbox.run_code("print('alive')") return execution.error is None except Exception as e: logger.debug(f"Sandbox health check failed: {e}") return False def get_age(self) -> float: """Get age in seconds since creation.""" return time.time() - self.created_at def get_idle_time(self) -> float: """Get idle time in seconds since last use.""" return time.time() - self.last_used class E2BSandboxCache: """ Cache manager for E2B sandbox instances. Maintains one sandbox per thread_id with automatic cleanup of expired/dead sandboxes. """ _instance = None _sandboxes: Dict[str, SandboxCacheEntry] = {} _ttl_seconds: int = 3600 # 1 hour default _template_name: str = "cuga-langchain" def __new__(cls): """Singleton pattern to ensure one cache instance.""" if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): """Initialize the cache.""" if not hasattr(self, "_initialized"): self._sandboxes = {} self._ttl_seconds = 3600 self._template_name = "cuga-langchain" self._initialized = True def set_ttl(self, seconds: int): """Set time-to-live for cached sandboxes.""" self._ttl_seconds = seconds logger.info(f"E2B sandbox TTL set to {seconds} seconds ({seconds / 3600:.1f} hours)") def set_template(self, template_name: str): """Set E2B template name for sandbox creation.""" self._template_name = template_name logger.info(f"E2B sandbox template set to: {template_name}") def get_or_create(self, thread_id: str) -> "Sandbox": """ Get existing sandbox for thread_id or create new one. Args: thread_id: Unique identifier for the conversation thread Returns: E2B Sandbox instance Raises: RuntimeError: If E2B is not available or sandbox creation fails """ if not E2B_AVAILABLE: raise RuntimeError("e2b-code-interpreter package not installed") # Clean up expired/dead sandboxes before proceeding self._cleanup_expired() # Check if we have a valid cached sandbox if thread_id in self._sandboxes: entry = self._sandboxes[thread_id] # Check if expired if entry.is_expired(self._ttl_seconds): logger.info( f"Sandbox for thread {thread_id} expired (age: {entry.get_age():.1f}s), creating new one" ) self._remove_sandbox(thread_id) # Check if alive elif not entry.is_alive(): logger.warning( f"Sandbox for thread {thread_id} is dead (age: {entry.get_age():.1f}s), creating new one" ) self._remove_sandbox(thread_id) else: # Valid cached sandbox found entry.mark_used() logger.info( f"Reusing cached sandbox for thread {thread_id} " f"(age: {entry.get_age():.1f}s, uses: {entry.use_count}, " f"idle: {entry.get_idle_time():.1f}s)" ) return entry.sandbox # Create new sandbox logger.info( f"Creating new E2B sandbox for thread {thread_id} with template '{self._template_name}' " f"(TTL: {self._ttl_seconds}s)" ) with langfuse.start_as_current_observation( as_type="span", name="create-e2b-sandbox", input={"e2b_sandbox_mode": settings.advanced_features.e2b_sandbox_mode}, ) as root_span: try: sandbox = Sandbox.create(self._template_name, timeout=self._ttl_seconds) entry = SandboxCacheEntry(sandbox, thread_id) entry.mark_used() self._sandboxes[thread_id] = entry log_str = f"Successfully created sandbox for thread {thread_id} (total cached: {len(self._sandboxes)})" logger.info(log_str) root_span.update(output=log_str) return sandbox except Exception as e: log_str = f"Failed to create E2B sandbox for thread {thread_id}: {e}" logger.error(log_str) root_span.update(output=log_str) raise RuntimeError(f"Failed to create E2B sandbox: {e}") from e def _remove_sandbox(self, thread_id: str): """Remove and cleanup sandbox for given thread_id.""" if thread_id in self._sandboxes: entry = self._sandboxes[thread_id] try: entry.sandbox.kill() logger.debug(f"Killed sandbox for thread {thread_id}") except Exception as e: logger.debug(f"Error killing sandbox for thread {thread_id}: {e}") finally: del self._sandboxes[thread_id] def _cleanup_expired(self): """Remove expired and dead sandboxes from cache.""" threads_to_remove = [] for thread_id, entry in self._sandboxes.items(): if entry.is_expired(self._ttl_seconds): logger.info( f"Cleaning up expired sandbox for thread {thread_id} " f"(age: {entry.get_age():.1f}s, uses: {entry.use_count})" ) threads_to_remove.append(thread_id) elif not entry.is_alive(): logger.warning( f"Cleaning up dead sandbox for thread {thread_id} " f"(age: {entry.get_age():.1f}s, uses: {entry.use_count})" ) threads_to_remove.append(thread_id) for thread_id in threads_to_remove: self._remove_sandbox(thread_id) def remove(self, thread_id: str): """Manually remove sandbox for specific thread_id.""" if thread_id in self._sandboxes: logger.info(f"Manually removing sandbox for thread {thread_id}") self._remove_sandbox(thread_id) def clear_all(self): """Clear all cached sandboxes.""" logger.info(f"Clearing all cached sandboxes ({len(self._sandboxes)} total)") threads = list(self._sandboxes.keys()) for thread_id in threads: self._remove_sandbox(thread_id) def get_stats(self) -> Dict: """Get cache statistics.""" stats = { "total_sandboxes": len(self._sandboxes), "ttl_seconds": self._ttl_seconds, "template_name": self._template_name, "sandboxes": {}, } for thread_id, entry in self._sandboxes.items(): stats["sandboxes"][thread_id] = { "age_seconds": entry.get_age(), "idle_seconds": entry.get_idle_time(), "use_count": entry.use_count, "is_alive": entry.is_alive(), } return stats # Global cache instance _sandbox_cache = E2BSandboxCache() _cache_configured = False def get_sandbox_cache() -> E2BSandboxCache: """Get the global sandbox cache instance, configured with settings.""" global _cache_configured # Configure cache with settings on first access if not _cache_configured: _sandbox_cache.set_ttl(settings.advanced_features.e2b_sandbox_ttl) _cache_configured = True return _sandbox_cache # ============================================================================ # E2B Execution Functions # ============================================================================ @observe(as_type="span") async def execute_in_e2b_sandbox_lite( user_code: str, context_locals: dict[str, Any] = None, thread_id: str = None, apps_list: list[str] = None, state: Any = None, ) -> tuple[str, dict[str, Any]]: """ Execute code in E2B sandbox with automatic variable/tool serialization for lite mode. This high-level function is specific to lite mode. It handles serialization of variables and tools from context_locals, creates tool stubs, and combines them with the user code before executing in E2B. Args: user_code: User's wrapped Python code (e.g., with __async_main function) context_locals: Dictionary of variables and tools from previous execution thread_id: Thread ID for sandbox caching apps_list: List of app names (unused in current implementation) state: Optional AgentState instance. If provided, uses state.variables_manager. Returns: Tuple of (stdout_result, parsed_locals) Note: This function is specific to lite mode and handles context serialization. For other modes (balanced, etc.), use execute_code_in_e2b() directly with pre-formatted code from sandbox.py. """ from cuga.backend.cuga_graph.state.agent_state import VariablesManager if not E2B_AVAILABLE: raise RuntimeError("e2b-code-interpreter package not installed") if context_locals is None: context_locals = {} try: # Serialize variables using VariablesManager # Use state's variables_manager if provided, otherwise create new one if state is not None and hasattr(state, 'variables_manager'): var_manager = state.variables_manager else: var_manager = VariablesManager() variables_code = var_manager.get_variables_formatted() # Separate simple variables from callable tools # Handle both plain callables and StructuredTool objects from langchain_core.tools import StructuredTool simple_vars = {} tool_funcs = {} for k, v in context_locals.items(): if isinstance(v, StructuredTool): # It's a StructuredTool object - store it directly tool_funcs[k] = v elif callable(v) and not k.startswith("_"): # It's a plain callable - store it tool_funcs[k] = v else: # It's a simple variable simple_vars[k] = v # Add simple variables to variables code if simple_vars: vars_code_from_locals = "\n".join([f"{k} = {repr(v)}" for k, v in simple_vars.items()]) variables_code = ( variables_code + "\n" + vars_code_from_locals if variables_code else vars_code_from_locals ) # Create stub functions for tools that redirect to call_api # In lite mode, tools are called directly by name like: digital_sales_get_my_accounts_my_accounts_get() # We need to create async stubs that call call_api with the correct app and api names tools_code = "" for tool_name, tool_obj in tool_funcs.items(): # Check if this is a StructuredTool object is_structured_tool = isinstance(tool_obj, StructuredTool) # Parse tool name to extract app_name # Format is typically: {app_name}_{api_name} # First, try to match against known apps from apps_list app_name = None if apps_list: for known_app in apps_list: if tool_name.startswith(known_app + "_") or tool_name == known_app: app_name = known_app logger.debug(f"Matched tool {tool_name} to app {app_name} from apps_list") break # If no match, try heuristic parsing if not app_name: parts = tool_name.split("_") if len(parts) >= 2: # Heuristic: app name is usually 1-2 words at the start # For digital_sales_get_my_accounts_my_accounts_get -> app is "digital_sales" # Look for common API verb patterns api_verbs = ["get", "post", "put", "delete", "create", "update", "list", "fetch"] app_parts = [] for i, part in enumerate(parts): if part.lower() in api_verbs: break app_parts.append(part) if app_parts: app_name = "_".join(app_parts) else: app_name = parts[0] # Fallback to first part else: app_name = tool_name # Single word tool name # Try to extract parameter names from the tool's schema param_names = [] try: # For StructuredTool objects, check the tool itself for args_schema # For plain functions, check if they have args_schema attached args_schema = None if is_structured_tool: args_schema = getattr(tool_obj, 'args_schema', None) logger.debug( f"Checking schema for StructuredTool {tool_name}: has_args_schema={args_schema is not None}" ) else: args_schema = getattr(tool_obj, 'args_schema', None) logger.debug( f"Checking schema for function {tool_name}: has_args_schema={args_schema is not None}" ) if args_schema: logger.debug(f" args_schema type: {type(args_schema)}") logger.debug(f" has model_fields: {hasattr(args_schema, 'model_fields')}") logger.debug(f" has __fields__: {hasattr(args_schema, '__fields__')}") # Get field names from Pydantic model if hasattr(args_schema, 'model_fields'): param_names = list(args_schema.model_fields.keys()) logger.info(f"Extracted param_names from model_fields for {tool_name}: {param_names}") elif hasattr(args_schema, '__fields__'): param_names = list(args_schema.__fields__.keys()) logger.info(f"Extracted param_names from __fields__ for {tool_name}: {param_names}") else: logger.warning(f"No args_schema found for {tool_name}") except Exception as e: logger.error(f"Could not extract param names for {tool_name}: {e}", exc_info=True) # Generate stub that accepts both positional and keyword arguments # and maps positional args to parameter names if param_names: logger.info(f"Generating stub WITH positional args for {tool_name}, params={param_names}") stub = f""" async def {tool_name}(*args, **kwargs): \"\"\"Stub for {tool_name} - calls via registry API\"\"\" # Parameter names: {param_names} param_names = {param_names} all_kwargs = dict(kwargs) # Map positional arguments to parameter names for i, arg in enumerate(args): if i < len(param_names): all_kwargs[param_names[i]] = arg return await call_api("{app_name}", "{tool_name}", all_kwargs) """ else: # Fallback: no schema info, just pass kwargs logger.warning(f"Generating stub WITHOUT positional args for {tool_name} (no param_names)") stub = f""" async def {tool_name}(**kwargs): \"\"\"Stub for {tool_name} - calls via registry API\"\"\" return await call_api("{app_name}", "{tool_name}", kwargs) """ tools_code += stub # Get function_call_host for E2B (needs publicly accessible URL) from cuga.config import settings function_call_url = getattr(settings.server_ports, "function_call_host", None) if not function_call_url: function_call_url = getattr(settings.server_ports, "registry_host", None) if not function_call_url: logger.error( "E2B sandbox (lite mode) requires a publicly accessible URL. " "Please set 'function_call_host' or 'registry_host' in settings.toml." ) function_call_url = "http://localhost:8001" # Get trajectory path for call_api from cuga.backend.activity_tracker.tracker import ActivityTracker from urllib.parse import quote tracker = ActivityTracker() trajectory_path = quote(tracker.get_current_trajectory_path()) # Add call_api helper for registry tools (HTTP client) call_api_helper = f""" # HTTP client for calling registry tools import asyncio import json import urllib.request import urllib.error async def call_api(app_name, api_name, args=None): \"\"\"Call registry API tool via HTTP.\"\"\" if args is None: args = {{}} url = "{function_call_url}/functions/call?trajectory_path={trajectory_path}" headers = {{ "accept": "application/json", "Content-Type": "application/json" }} payload = {{ "function_name": api_name, "app_name": app_name, "args": args }} data = json.dumps(payload).encode('utf-8') req = urllib.request.Request(url, data=data, headers=headers, method='POST') loop = asyncio.get_event_loop() def _sync_call(): try: with urllib.request.urlopen(req, timeout=30) as response: response_data = response.read().decode('utf-8') try: response_data = json.loads(response_data) except Exception as e: pass return response_data except urllib.error.HTTPError as e: print(e) raise Exception(f"HTTP Error: {{e.code}} - {{e.reason}}") except urllib.error.URLError as e: print(e) raise Exception(f"URL Error: {{e.reason}}") return await loop.run_in_executor(None, _sync_call) """ # Build complete code with E2B-compatible structure complete_code = f""" {call_api_helper} # Tool function stubs {tools_code} # Variables from previous execution {variables_code} {user_code} # Execute and capture locals async def main(): __result_locals = await asyncio.wait_for(__async_main(), timeout=30) print("!!!===!!!") print(__result_locals) if __name__ == "__main__": await main() """ # Execute using the low-level E2B executor raw_output = await execute_code_in_e2b( code_content=complete_code, thread_id=thread_id, ) # Parse the output to extract result and locals if "!!!===!!!" in raw_output: result, locals_str = raw_output.split("!!!===!!!", 1) result = result.strip() # Parse locals from output result_locals = {} lines = locals_str.strip().split('\n') for line in reversed(lines): if line.strip().startswith('{'): try: result_locals = ast.literal_eval(line.strip()) break except (ValueError, SyntaxError): continue return result, result_locals else: # No delimiter found, return full output return raw_output, {} except Exception as e: error_msg = f"E2B sandbox execution failed: {e}" logger.error(error_msg) raise RuntimeError(error_msg) from e async def execute_code_in_e2b( code_content: str, thread_id: str = None, ) -> str: """Execute pre-wrapped code in E2B remote sandbox (async). This is the low-level E2B execution function used by all modes (lite, balanced, etc.). It accepts code that has already been wrapped and prepared by the caller. It does NOT do any additional wrapping or code generation - it executes the code as-is. Args: code_content: Complete Python code ready for execution (includes preamble, variables, and wrapped user code) thread_id: Thread ID for sandbox caching (if None, creates ephemeral sandbox) Returns: stdout output as string Raises: RuntimeError: If E2B execution fails """ with langfuse.start_as_current_observation( as_type="span", name="execute_code_in_e2b", input={"e2b_sandbox_mode": settings.advanced_features.e2b_sandbox_mode, "code_content": code_content}, ) as root_span: if not E2B_AVAILABLE: raise RuntimeError("e2b-code-interpreter package not installed") try: logger.debug("Executing code in E2B sandbox") # Debug: Print the complete code being sent to E2B logger.info("=" * 80) logger.info("CODE SENT TO E2B SANDBOX:") logger.info("=" * 80) logger.info(code_content) logger.info("=" * 80) # Get or create sandbox based on thread_id and e2b_sandbox_mode loop = asyncio.get_event_loop() sandbox_mode = settings.advanced_features.e2b_sandbox_mode if sandbox_mode == "per-session" and thread_id: # Use cached sandbox for this thread cache = get_sandbox_cache() sandbox = cache.get_or_create(thread_id) logger.debug(f"Executing in E2B sandbox {sandbox.sandbox_id} for thread {thread_id}") execution = await loop.run_in_executor(None, sandbox.run_code, code_content) elif sandbox_mode == "single": # Use single global sandbox (via cache with constant thread_id) cache = get_sandbox_cache() sandbox = cache.get_or_create(GLOBAL_THREAD_ID) logger.debug(f"Executing in global E2B sandbox {sandbox.sandbox_id}") execution = await loop.run_in_executor(None, sandbox.run_code, code_content) else: # Create ephemeral sandbox (no caching) - default mode (per-call) ttl = settings.advanced_features.e2b_sandbox_ttl with langfuse.start_as_current_observation( as_type="span", name="create-e2b-sandbox", input={"e2b_sandbox_mode": settings.advanced_features.e2b_sandbox_mode}, ) as root_span: logger.debug(f"Creating ephemeral E2B sandbox (per-call mode, timeout: {ttl}s)") with Sandbox.create("cuga-langchain", timeout=ttl) as sandbox: execution = await loop.run_in_executor(None, sandbox.run_code, code_content) # Log execution details logger.debug(f"E2B execution completed - has error: {execution.error is not None}") if execution.logs.stdout: logger.debug(f"E2B stdout: {execution.logs.stdout}") if execution.logs.stderr: logger.debug(f"E2B stderr: {execution.logs.stderr}") # Check for execution errors if execution.error: error_msg = f"E2B execution error: {execution.error.name} - {execution.error.value}" logger.error(error_msg) # Include stderr if available if execution.logs.stderr: error_msg += f"\nStderr: {chr(10).join(execution.logs.stderr)}" root_span.update(output=error_msg) return error_msg # Return stdout stdout_output = "\n".join(execution.logs.stdout) # Include stderr in output if present (warnings, etc.) if execution.logs.stderr: stderr_output = "\n".join(execution.logs.stderr) if stderr_output: logger.warning(f"E2B stderr output: {stderr_output}") if not stdout_output: logger.warning("E2B execution completed but produced no stdout output") root_span.update(output=stdout_output) return stdout_output except Exception as e: error_msg = f"E2B sandbox execution failed: {e}" logger.error(error_msg) import traceback logger.error(traceback.format_exc()) return error_msg