agent-ui / backend /code.py
lvwerra's picture
lvwerra HF Staff
Unify figure store globally, enable cross-agent figure references
4c24c65
"""
Code agent backend - handles code execution with E2B
"""
import json
import logging
import os
import re
from typing import List, Dict, Optional
from e2b_code_interpreter import Sandbox
from .tools import execute_code, upload_files, download_files
from .image import resize_image_for_vlm
logger = logging.getLogger(__name__)
TOOLS = [execute_code, upload_files, download_files]
MAX_TURNS = 40
def parse_execution_result(execution, max_output_length=4000):
"""Parse execution result for LLM feedback"""
output = []
def truncate_if_needed(text):
if len(text) > max_output_length:
half = max_output_length // 2
return text[:half] + f"\n\n[... truncated {len(text) - max_output_length} of {len(text)} chars ...]\n\n" + text[-half:]
return text
# Check for images/plots
has_images = any(result.png or result.jpeg or result.svg for result in execution.results)
if has_images:
output.append("[Plot/Image generated]")
if execution.results:
# Filter out Figure text representations
text_results = [result.text for result in execution.results if result.text and not result.text.startswith('<Figure')]
if text_results:
output.append(truncate_if_needed("\n".join(text_results)))
if execution.logs.stdout:
output.append(truncate_if_needed("\n".join(execution.logs.stdout)))
if execution.logs.stderr:
output.append(truncate_if_needed("\n".join(execution.logs.stderr)))
if execution.error is not None:
output.append(truncate_if_needed(execution.error.traceback))
return "\n".join(filter(None, output))
def format_code_cell(code: str, execution_result: str = None, error: bool = False, images: list = None):
"""Format a code cell for display in the UI"""
return {
"type": "code",
"code": code,
"output": execution_result,
"error": error,
"images": images or []
}
def format_thinking_cell(content: str):
"""Format assistant thinking for display"""
return {
"type": "thinking",
"content": content
}
def upload_files_to_sandbox(sbx: Sandbox, paths: List[str], files_root: str) -> str:
"""
Upload multiple files to the sandbox.
Args:
sbx: E2B sandbox instance
paths: List of relative file paths
files_root: Root directory to resolve relative paths
Returns:
String describing what was uploaded or errors encountered
"""
results = []
for rel_path in paths:
# Normalize the path (remove ./ prefix if present)
rel_path = rel_path.lstrip('./')
local_path = os.path.join(files_root, rel_path)
# Security check: ensure path doesn't escape files_root
real_local = os.path.realpath(local_path)
real_root = os.path.realpath(files_root)
if not real_local.startswith(real_root):
results.append(f"Error: {rel_path} - path outside workspace")
continue
if not os.path.exists(local_path):
results.append(f"Error: {rel_path} - file not found")
continue
if not os.path.isfile(local_path):
results.append(f"Error: {rel_path} - not a file")
continue
try:
# Get just the filename for the sandbox path
filename = os.path.basename(rel_path)
sandbox_path = f"/home/user/{filename}"
with open(local_path, "rb") as f:
sbx.files.write(sandbox_path, f)
results.append(f"Uploaded: {rel_path} -> {sandbox_path}")
except Exception as e:
results.append(f"Error uploading {rel_path}: {str(e)}")
return "\n".join(results)
def download_files_from_sandbox(sbx: Sandbox, files: List[Dict], files_root: str) -> str:
"""
Download multiple files from the sandbox to the local workspace.
Args:
sbx: E2B sandbox instance
files: List of dicts with 'sandbox_path' and 'local_path' keys
files_root: Root directory to resolve relative paths
Returns:
String describing what was downloaded or errors encountered
"""
results = []
for file_spec in files:
sandbox_path = file_spec.get('sandbox_path', '')
local_rel_path = file_spec.get('local_path', '')
if not sandbox_path or not local_rel_path:
results.append(f"Error: Missing sandbox_path or local_path")
continue
# Normalize the local path (remove ./ prefix if present)
local_rel_path = local_rel_path.lstrip('./')
local_path = os.path.join(files_root, local_rel_path)
# Security check: ensure path doesn't escape files_root
real_local = os.path.realpath(os.path.dirname(local_path))
real_root = os.path.realpath(files_root)
# Need to handle case where parent dir doesn't exist yet
test_path = local_path
while not os.path.exists(os.path.dirname(test_path)):
test_path = os.path.dirname(test_path)
real_local = os.path.realpath(test_path)
if not real_local.startswith(real_root):
results.append(f"Error: {local_rel_path} - path outside workspace")
continue
try:
# Read file content from sandbox (use bytes for binary files)
content = sbx.files.read(sandbox_path, format='bytes')
# Create parent directories if needed
os.makedirs(os.path.dirname(local_path), exist_ok=True)
# Write to local file
with open(local_path, 'wb') as f:
f.write(content)
results.append(f"Downloaded: {sandbox_path} -> {local_rel_path}")
except Exception as e:
results.append(f"Error downloading {sandbox_path}: {str(e)}")
return "\n".join(results)
def stream_code_execution(client, model: str, messages: List[Dict], sbx: Sandbox, files_root: str = None, extra_params: Optional[Dict] = None, abort_event=None, multimodal: bool = False, tab_id: str = "0", figure_store: Optional[Dict[str, dict]] = None):
"""
Stream code execution results
Yields:
dict: Updates with type 'thinking', 'code', or 'done'
Args:
client: OpenAI-compatible client
model: Model name to use
messages: Conversation messages
sbx: E2B sandbox instance
files_root: Root directory for file uploads (optional)
extra_params: Extra parameters for API calls (optional)
"""
from .agents import call_llm
turns = 0
done = False
figure_counter = 0 # Track figure numbers
figure_prefix = f"figure_T{tab_id}_"
# Use shared global store if provided, otherwise create local one
if figure_store is None:
figure_store = {}
figure_data = figure_store # Alias for clarity in this function
has_result = False
debug_call_number = 0
while not done and turns < MAX_TURNS:
# Check abort before each turn
if abort_event and abort_event.is_set():
yield {"type": "aborted"}
return
turns += 1
# LLM call with retries and debug events
response = None
for event in call_llm(client, model, messages, tools=TOOLS, extra_params=extra_params, abort_event=abort_event, call_number=debug_call_number):
if "_response" in event:
response = event["_response"]
debug_call_number = event["_call_number"]
else:
yield event
if event.get("type") in ("error", "aborted"):
return
if response is None:
return
# Get response
assistant_message = response.choices[0].message
content = assistant_message.content or ""
tool_calls = assistant_message.tool_calls or []
# Check for result tags
result_match = re.search(r'<result>(.*?)</result>', content, re.DOTALL | re.IGNORECASE)
result_content = None
thinking_content = content
if result_match:
logger.debug(f"Result found: {content[:200]}...")
result_content = result_match.group(1).strip()
# Remove result tags from thinking display
thinking_content = re.sub(r'<result>.*?</result>', '', content, flags=re.DOTALL | re.IGNORECASE).strip()
# Send thinking if there's content (excluding result tags)
if thinking_content.strip():
yield format_thinking_cell(thinking_content)
# Send result as a special highlighted message in the CODE notebook
if result_content:
yield {"type": "result_preview", "content": result_content, "figures": figure_data}
# Handle tool calls
if tool_calls:
for tool_call in tool_calls:
# Check abort between tool calls
if abort_event and abort_event.is_set():
yield {"type": "aborted"}
return
if tool_call.function.name == "execute_code":
# Parse arguments
try:
args = json.loads(tool_call.function.arguments)
code = args["code"]
except json.JSONDecodeError as e:
error_msg = f"JSON parse error: {e}. Raw arguments: {tool_call.function.arguments[:500]}"
logger.error(error_msg)
# Treat as tool error so LLM can recover
output = f"Error parsing code arguments: {e}"
messages.append({
"role": "assistant",
"content": content,
"tool_calls": [{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
}
}]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": output
})
yield {"type": "error", "content": f"Failed to parse code arguments: {e}"}
continue
except KeyError as e:
error_msg = f"Missing required key {e} in arguments: {tool_call.function.arguments[:500]}"
logger.error(error_msg)
output = f"Error: Missing required 'code' parameter"
messages.append({
"role": "assistant",
"content": content,
"tool_calls": [{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
}
}]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": output
})
yield {"type": "error", "content": output}
continue
# Send code cell (before execution)
yield {"type": "code_start", "code": code}
# Execute code
try:
execution = sbx.run_code(code)
output = parse_execution_result(execution)
has_error = execution.error is not None
# Extract images and assign figure names
images = []
figure_names = []
for result in execution.results:
if not (result.png or result.jpeg or result.svg):
continue
figure_counter += 1
figure_name = f"{figure_prefix}{figure_counter}"
figure_names.append(figure_name)
if result.png:
images.append({"type": "png", "data": result.png, "name": figure_name})
figure_data[figure_name] = {"type": "png", "data": result.png}
elif result.jpeg:
images.append({"type": "jpeg", "data": result.jpeg, "name": figure_name})
figure_data[figure_name] = {"type": "jpeg", "data": result.jpeg}
elif result.svg:
images.append({"type": "svg", "data": result.svg, "name": figure_name})
figure_data[figure_name] = {"type": "svg", "data": result.svg}
# Add figure info to output for LLM
if figure_names:
figure_info = f"\n[Generated figures: {', '.join(figure_names)}]"
output = (output + figure_info) if output else figure_info.strip()
# Send execution result
yield format_code_cell(code, output, has_error, images)
except Exception as e:
error_str = str(e)
# Check if this is a sandbox timeout error - if so, re-raise to trigger cleanup
if "502" in error_str or "sandbox was not found" in error_str.lower() or "timeout" in error_str.lower():
raise # Re-raise to be caught by main.py handler
yield format_code_cell(code, f"Execution error: {str(e)}", True)
output = f"Execution failed: {str(e)}"
has_error = True
# Add to message history
messages.append({
"role": "assistant",
"content": content,
"tool_calls": [{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
}
}]
})
# Build tool response — include figures if multimodal
if multimodal and images:
tool_content = [{"type": "text", "text": output}]
for img in images:
if img["type"] in ("png", "jpeg"):
vlm_img = resize_image_for_vlm(img["data"])
tool_content.append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{vlm_img}"}
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_content
})
else:
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": output
})
elif tool_call.function.name == "upload_files":
# Parse arguments
try:
args = json.loads(tool_call.function.arguments)
paths = args["paths"]
except (json.JSONDecodeError, KeyError) as e:
error_msg = f"Failed to parse upload_files arguments: {e}. Raw: {tool_call.function.arguments[:500]}"
logger.error(error_msg)
output = f"Error parsing upload_files arguments: {e}"
messages.append({
"role": "assistant",
"content": content,
"tool_calls": [{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
}
}]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": output
})
yield {"type": "error", "content": output}
continue
# Check if files_root is available
if not files_root:
output = "Error: File upload not available - no workspace configured"
else:
# Upload files
output = upload_files_to_sandbox(sbx, paths, files_root)
# Send upload notification to UI
yield {"type": "upload", "paths": paths, "output": output}
# Add to message history
messages.append({
"role": "assistant",
"content": content,
"tool_calls": [{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
}
}]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": output
})
elif tool_call.function.name == "download_files":
# Parse arguments
try:
args = json.loads(tool_call.function.arguments)
files = args["files"]
except (json.JSONDecodeError, KeyError) as e:
error_msg = f"Failed to parse download_files arguments: {e}. Raw: {tool_call.function.arguments[:500]}"
logger.error(error_msg)
output = f"Error parsing download_files arguments: {e}"
messages.append({
"role": "assistant",
"content": content,
"tool_calls": [{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
}
}]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": output
})
yield {"type": "error", "content": output}
continue
# Check if files_root is available
if not files_root:
output = "Error: File download not available - no workspace configured"
else:
# Download files
output = download_files_from_sandbox(sbx, files, files_root)
# Extract paths for UI display
paths = [f"{f.get('sandbox_path', '')} -> {f.get('local_path', '')}" for f in files]
# Send download notification to UI
yield {"type": "download", "paths": paths, "output": output}
# Add to message history
messages.append({
"role": "assistant",
"content": content,
"tool_calls": [{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
}
}]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": output
})
else:
# No tool calls - we're done
messages.append({"role": "assistant", "content": content})
done = True
# If we found a result tag, send it with figure data
if result_content:
has_result = True
yield {"type": "result", "content": result_content, "figures": figure_data}
# Yield generating state between turns
if not done:
yield {"type": "generating"}
# If agent finished without a <result>, nudge it for one
if not has_result:
from .agents import nudge_for_result
yield from nudge_for_result(client, model, messages, extra_params=extra_params, extra_result_data={"figures": figure_data}, call_number=debug_call_number)
# Send done signal
yield {"type": "done"}