""" Code agent backend - handles code execution with E2B """ import json import logging import os import re from typing import List, Dict, Optional from e2b_code_interpreter import Sandbox from .tools import execute_code, upload_files, download_files from .image import resize_image_for_vlm logger = logging.getLogger(__name__) TOOLS = [execute_code, upload_files, download_files] MAX_TURNS = 40 def parse_execution_result(execution, max_output_length=4000): """Parse execution result for LLM feedback""" output = [] def truncate_if_needed(text): if len(text) > max_output_length: half = max_output_length // 2 return text[:half] + f"\n\n[... truncated {len(text) - max_output_length} of {len(text)} chars ...]\n\n" + text[-half:] return text # Check for images/plots has_images = any(result.png or result.jpeg or result.svg for result in execution.results) if has_images: output.append("[Plot/Image generated]") if execution.results: # Filter out Figure text representations text_results = [result.text for result in execution.results if result.text and not result.text.startswith(' str: """ Upload multiple files to the sandbox. Args: sbx: E2B sandbox instance paths: List of relative file paths files_root: Root directory to resolve relative paths Returns: String describing what was uploaded or errors encountered """ results = [] for rel_path in paths: # Normalize the path (remove ./ prefix if present) rel_path = rel_path.lstrip('./') local_path = os.path.join(files_root, rel_path) # Security check: ensure path doesn't escape files_root real_local = os.path.realpath(local_path) real_root = os.path.realpath(files_root) if not real_local.startswith(real_root): results.append(f"Error: {rel_path} - path outside workspace") continue if not os.path.exists(local_path): results.append(f"Error: {rel_path} - file not found") continue if not os.path.isfile(local_path): results.append(f"Error: {rel_path} - not a file") continue try: # Get just the filename for the sandbox path filename = os.path.basename(rel_path) sandbox_path = f"/home/user/{filename}" with open(local_path, "rb") as f: sbx.files.write(sandbox_path, f) results.append(f"Uploaded: {rel_path} -> {sandbox_path}") except Exception as e: results.append(f"Error uploading {rel_path}: {str(e)}") return "\n".join(results) def download_files_from_sandbox(sbx: Sandbox, files: List[Dict], files_root: str) -> str: """ Download multiple files from the sandbox to the local workspace. Args: sbx: E2B sandbox instance files: List of dicts with 'sandbox_path' and 'local_path' keys files_root: Root directory to resolve relative paths Returns: String describing what was downloaded or errors encountered """ results = [] for file_spec in files: sandbox_path = file_spec.get('sandbox_path', '') local_rel_path = file_spec.get('local_path', '') if not sandbox_path or not local_rel_path: results.append(f"Error: Missing sandbox_path or local_path") continue # Normalize the local path (remove ./ prefix if present) local_rel_path = local_rel_path.lstrip('./') local_path = os.path.join(files_root, local_rel_path) # Security check: ensure path doesn't escape files_root real_local = os.path.realpath(os.path.dirname(local_path)) real_root = os.path.realpath(files_root) # Need to handle case where parent dir doesn't exist yet test_path = local_path while not os.path.exists(os.path.dirname(test_path)): test_path = os.path.dirname(test_path) real_local = os.path.realpath(test_path) if not real_local.startswith(real_root): results.append(f"Error: {local_rel_path} - path outside workspace") continue try: # Read file content from sandbox (use bytes for binary files) content = sbx.files.read(sandbox_path, format='bytes') # Create parent directories if needed os.makedirs(os.path.dirname(local_path), exist_ok=True) # Write to local file with open(local_path, 'wb') as f: f.write(content) results.append(f"Downloaded: {sandbox_path} -> {local_rel_path}") except Exception as e: results.append(f"Error downloading {sandbox_path}: {str(e)}") return "\n".join(results) def stream_code_execution(client, model: str, messages: List[Dict], sbx: Sandbox, files_root: str = None, extra_params: Optional[Dict] = None, abort_event=None, multimodal: bool = False, tab_id: str = "0", figure_store: Optional[Dict[str, dict]] = None): """ Stream code execution results Yields: dict: Updates with type 'thinking', 'code', or 'done' Args: client: OpenAI-compatible client model: Model name to use messages: Conversation messages sbx: E2B sandbox instance files_root: Root directory for file uploads (optional) extra_params: Extra parameters for API calls (optional) """ from .agents import call_llm turns = 0 done = False figure_counter = 0 # Track figure numbers figure_prefix = f"figure_T{tab_id}_" # Use shared global store if provided, otherwise create local one if figure_store is None: figure_store = {} figure_data = figure_store # Alias for clarity in this function has_result = False debug_call_number = 0 while not done and turns < MAX_TURNS: # Check abort before each turn if abort_event and abort_event.is_set(): yield {"type": "aborted"} return turns += 1 # LLM call with retries and debug events response = None for event in call_llm(client, model, messages, tools=TOOLS, extra_params=extra_params, abort_event=abort_event, call_number=debug_call_number): if "_response" in event: response = event["_response"] debug_call_number = event["_call_number"] else: yield event if event.get("type") in ("error", "aborted"): return if response is None: return # Get response assistant_message = response.choices[0].message content = assistant_message.content or "" tool_calls = assistant_message.tool_calls or [] # Check for result tags result_match = re.search(r'(.*?)', content, re.DOTALL | re.IGNORECASE) result_content = None thinking_content = content if result_match: logger.debug(f"Result found: {content[:200]}...") result_content = result_match.group(1).strip() # Remove result tags from thinking display thinking_content = re.sub(r'.*?', '', content, flags=re.DOTALL | re.IGNORECASE).strip() # Send thinking if there's content (excluding result tags) if thinking_content.strip(): yield format_thinking_cell(thinking_content) # Send result as a special highlighted message in the CODE notebook if result_content: yield {"type": "result_preview", "content": result_content, "figures": figure_data} # Handle tool calls if tool_calls: for tool_call in tool_calls: # Check abort between tool calls if abort_event and abort_event.is_set(): yield {"type": "aborted"} return if tool_call.function.name == "execute_code": # Parse arguments try: args = json.loads(tool_call.function.arguments) code = args["code"] except json.JSONDecodeError as e: error_msg = f"JSON parse error: {e}. Raw arguments: {tool_call.function.arguments[:500]}" logger.error(error_msg) # Treat as tool error so LLM can recover output = f"Error parsing code arguments: {e}" messages.append({ "role": "assistant", "content": content, "tool_calls": [{ "id": tool_call.id, "type": "function", "function": { "name": tool_call.function.name, "arguments": tool_call.function.arguments, } }] }) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": output }) yield {"type": "error", "content": f"Failed to parse code arguments: {e}"} continue except KeyError as e: error_msg = f"Missing required key {e} in arguments: {tool_call.function.arguments[:500]}" logger.error(error_msg) output = f"Error: Missing required 'code' parameter" messages.append({ "role": "assistant", "content": content, "tool_calls": [{ "id": tool_call.id, "type": "function", "function": { "name": tool_call.function.name, "arguments": tool_call.function.arguments, } }] }) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": output }) yield {"type": "error", "content": output} continue # Send code cell (before execution) yield {"type": "code_start", "code": code} # Execute code try: execution = sbx.run_code(code) output = parse_execution_result(execution) has_error = execution.error is not None # Extract images and assign figure names images = [] figure_names = [] for result in execution.results: if not (result.png or result.jpeg or result.svg): continue figure_counter += 1 figure_name = f"{figure_prefix}{figure_counter}" figure_names.append(figure_name) if result.png: images.append({"type": "png", "data": result.png, "name": figure_name}) figure_data[figure_name] = {"type": "png", "data": result.png} elif result.jpeg: images.append({"type": "jpeg", "data": result.jpeg, "name": figure_name}) figure_data[figure_name] = {"type": "jpeg", "data": result.jpeg} elif result.svg: images.append({"type": "svg", "data": result.svg, "name": figure_name}) figure_data[figure_name] = {"type": "svg", "data": result.svg} # Add figure info to output for LLM if figure_names: figure_info = f"\n[Generated figures: {', '.join(figure_names)}]" output = (output + figure_info) if output else figure_info.strip() # Send execution result yield format_code_cell(code, output, has_error, images) except Exception as e: error_str = str(e) # Check if this is a sandbox timeout error - if so, re-raise to trigger cleanup if "502" in error_str or "sandbox was not found" in error_str.lower() or "timeout" in error_str.lower(): raise # Re-raise to be caught by main.py handler yield format_code_cell(code, f"Execution error: {str(e)}", True) output = f"Execution failed: {str(e)}" has_error = True # Add to message history messages.append({ "role": "assistant", "content": content, "tool_calls": [{ "id": tool_call.id, "type": "function", "function": { "name": tool_call.function.name, "arguments": tool_call.function.arguments, } }] }) # Build tool response — include figures if multimodal if multimodal and images: tool_content = [{"type": "text", "text": output}] for img in images: if img["type"] in ("png", "jpeg"): vlm_img = resize_image_for_vlm(img["data"]) tool_content.append({ "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{vlm_img}"} }) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": tool_content }) else: messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": output }) elif tool_call.function.name == "upload_files": # Parse arguments try: args = json.loads(tool_call.function.arguments) paths = args["paths"] except (json.JSONDecodeError, KeyError) as e: error_msg = f"Failed to parse upload_files arguments: {e}. Raw: {tool_call.function.arguments[:500]}" logger.error(error_msg) output = f"Error parsing upload_files arguments: {e}" messages.append({ "role": "assistant", "content": content, "tool_calls": [{ "id": tool_call.id, "type": "function", "function": { "name": tool_call.function.name, "arguments": tool_call.function.arguments, } }] }) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": output }) yield {"type": "error", "content": output} continue # Check if files_root is available if not files_root: output = "Error: File upload not available - no workspace configured" else: # Upload files output = upload_files_to_sandbox(sbx, paths, files_root) # Send upload notification to UI yield {"type": "upload", "paths": paths, "output": output} # Add to message history messages.append({ "role": "assistant", "content": content, "tool_calls": [{ "id": tool_call.id, "type": "function", "function": { "name": tool_call.function.name, "arguments": tool_call.function.arguments, } }] }) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": output }) elif tool_call.function.name == "download_files": # Parse arguments try: args = json.loads(tool_call.function.arguments) files = args["files"] except (json.JSONDecodeError, KeyError) as e: error_msg = f"Failed to parse download_files arguments: {e}. Raw: {tool_call.function.arguments[:500]}" logger.error(error_msg) output = f"Error parsing download_files arguments: {e}" messages.append({ "role": "assistant", "content": content, "tool_calls": [{ "id": tool_call.id, "type": "function", "function": { "name": tool_call.function.name, "arguments": tool_call.function.arguments, } }] }) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": output }) yield {"type": "error", "content": output} continue # Check if files_root is available if not files_root: output = "Error: File download not available - no workspace configured" else: # Download files output = download_files_from_sandbox(sbx, files, files_root) # Extract paths for UI display paths = [f"{f.get('sandbox_path', '')} -> {f.get('local_path', '')}" for f in files] # Send download notification to UI yield {"type": "download", "paths": paths, "output": output} # Add to message history messages.append({ "role": "assistant", "content": content, "tool_calls": [{ "id": tool_call.id, "type": "function", "function": { "name": tool_call.function.name, "arguments": tool_call.function.arguments, } }] }) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": output }) else: # No tool calls - we're done messages.append({"role": "assistant", "content": content}) done = True # If we found a result tag, send it with figure data if result_content: has_result = True yield {"type": "result", "content": result_content, "figures": figure_data} # Yield generating state between turns if not done: yield {"type": "generating"} # If agent finished without a , nudge it for one if not has_result: from .agents import nudge_for_result yield from nudge_for_result(client, model, messages, extra_params=extra_params, extra_result_data={"figures": figure_data}, call_number=debug_call_number) # Send done signal yield {"type": "done"}