"""Tool system for Data Analyzer Agent. Provides execute_code function for running Python code in E2B sandbox, along with tool schema and routing infrastructure. """ import json from typing import Callable from e2b_code_interpreter import Sandbox def execute_code(code: str, sbx: Sandbox): """Execute Python code in E2B sandbox and return results with metadata. Args: code: Python code string to execute sbx: E2B Sandbox instance Returns: Tuple of (execution.to_json(), metadata dict) - execution.to_json() contains results and errors - metadata contains images list with PNG data extracted from results """ execution = sbx.run_code(code) metadata = {} # Extract PNG data from execution results into metadata if hasattr(execution, "results") and execution.results: for result in execution.results: if getattr(result, "png", None): metadata.setdefault("images", []).append(result.png) result.png = None # Clear PNG from result to avoid duplication if execution.error: metadata["error"] = str(execution.error) return execution.to_json(), metadata execute_code_schema = { "type": "function", "name": "execute_code", "description": "Execute Python code and return result", "parameters": { "type": "object", "properties": { "code": {"type": "string", "description": "Python code"} }, "required": ["code"], "additionalProperties": False } } tools = {"execute_code": execute_code} def execute_tool(name: str, args: str, tools: dict, **kwargs): """Route tool calls from LLM to implementation functions. Args: name: Tool name to execute args: JSON string with tool arguments tools: Dict mapping tool names to functions **kwargs: Additional parameters passed to tool function (e.g., sbx) Returns: Tuple of (result dict, metadata dict) - result contains execution output or error - metadata contains additional data like images """ try: args_dict = json.loads(args) if name not in tools: return {"error": f"Tool {name} does not exist."}, {} return tools[name](**args_dict, **kwargs) except json.JSONDecodeError as e: return {"error": f"Failed to parse JSON arguments: {str(e)}"}, {} except KeyError as e: return {"error": f"Missing key in arguments: {str(e)}"}, {} except Exception as e: return {"error": str(e)}, {}