| import asyncio |
| import sys |
| import io |
| import os |
| import json |
| import traceback |
| from schemas.agent import ToolOutput |
|
|
| MAX_EXECUTION_TIME = 15 |
| MAX_OUTPUT_SIZE = 10000 |
|
|
|
|
| async def execute_python(code: str) -> ToolOutput: |
| if not code: |
| return ToolOutput(tool_name="python", output="", error="No code provided", latency_ms=0) |
| local_vars = {} |
| stdout_capture = io.StringIO() |
| stderr_capture = io.StringIO() |
| old_stdout = sys.stdout |
| old_stderr = sys.stderr |
| try: |
| sys.stdout = stdout_capture |
| sys.stderr = stderr_capture |
| compiled = compile(code.strip(), "<agent_exec>", "exec", flags=0) |
| loop = asyncio.get_running_loop() |
|
|
| def run_code(): |
| try: |
| exec(compiled, {"__builtins__": __builtins__, "os": os, "json": json}, local_vars) |
| except Exception: |
| traceback.print_exc() |
|
|
| try: |
| await asyncio.wait_for(loop.run_in_executor(None, run_code), timeout=MAX_EXECUTION_TIME) |
| except asyncio.TimeoutError: |
| sys.stdout = old_stdout |
| sys.stderr = old_stderr |
| return ToolOutput(tool_name="python", output="", error=f"Execution timed out ({MAX_EXECUTION_TIME}s)", latency_ms=0) |
|
|
| output = stdout_capture.getvalue()[:MAX_OUTPUT_SIZE] |
| error = stderr_capture.getvalue()[:MAX_OUTPUT_SIZE] |
| if error and not output: |
| output = error |
| return ToolOutput(tool_name="python", output=output or "(no output)", latency_ms=0) |
| except SyntaxError as e: |
| return ToolOutput(tool_name="python", output="", error=f"SyntaxError: {e}", latency_ms=0) |
| except Exception as e: |
| return ToolOutput(tool_name="python", output="", error=f"{type(e).__name__}: {e}", latency_ms=0) |
| finally: |
| sys.stdout = old_stdout |
| sys.stderr = old_stderr |
|
|