| """ |
| Code Forge — generates and executes optimized Python code. |
| |
| "Code-as-action" paradigm: instead of calling predefined tools, |
| the agent generates standalone Python scripts to accomplish goals. |
| Generated code is sandboxed, has resource limits, and includes |
| automatic error recovery. |
| |
| Ultra-lightweight: uses Python's ast module for pre-validation |
| and asyncio subprocess for safe execution. |
| """ |
| import asyncio |
| import os |
| import sys |
| import json |
| import ast |
| import traceback |
| import io |
| import time |
| from typing import Optional |
| from schemas.agent import ToolOutput |
|
|
| _MAX_EXECUTION_TIME = int(os.getenv("ADAM_CODE_TIMEOUT", "20")) |
| _MAX_OUTPUT_SIZE = int(os.getenv("ADAM_CODE_MAX_OUTPUT", "10000")) |
| _ENABLE_CODE_EXEC = os.getenv("ADAM_ENABLE_CODE", "true").lower() == "true" |
|
|
|
|
| class CodeForge: |
| """ |
| Generates and executes Python code to accomplish agent goals. |
| |
| Features: |
| - Self-healing: if generated code fails, analyzes error and fixes it |
| - Sandboxed execution: resource limits and timeout |
| - Static pre-validation: checks code safety before running |
| - Optimized generation: produces minimal, efficient code |
| """ |
|
|
| def __init__(self, llm_call_fn=None): |
| self._llm = llm_call_fn |
| self._auto_fix = True |
| self._synthesis_count = 0 |
| self._success_count = 0 |
|
|
| async def execute(self, goal: str, context: str = "", |
| previous_results: dict = None, |
| fast_mode: bool = False) -> str: |
| """ |
| Generate and execute code to accomplish a goal. |
| Returns the execution output. |
| """ |
| if not _ENABLE_CODE_EXEC: |
| return await self._generate_only(goal) |
|
|
| |
| code = await self._generate_code(goal, context, previous_results, fast_mode) |
| if not code: |
| return "Failed to generate code." |
|
|
| |
| is_safe, error = self._validate_code(code) |
| if not is_safe: |
| return f"Code validation failed: {error}" |
|
|
| |
| result = await self._execute_safe(code, goal) |
| self._synthesis_count += 1 |
|
|
| if result.error and self._auto_fix: |
| |
| fixed = await self._fix_code(code, result.error, goal) |
| if fixed and fixed != code: |
| result = await self._execute_safe(fixed, goal) |
| if not result.error: |
| self._success_count += 1 |
| return result.output |
|
|
| if result.error: |
| return f"Execution error: {result.error[:500]}" |
| return result.output |
|
|
| async def _generate_code(self, goal: str, context: str, |
| previous_results: dict = None, |
| fast_mode: bool = False) -> Optional[str]: |
| """Generate Python code using the LLM.""" |
| if not self._llm: |
| return None |
|
|
| context_str = "" |
| if previous_results: |
| context_str = "\nPrevious results:\n" + json.dumps( |
| {k: str(v)[:200] for k, v in previous_results.items()}, |
| indent=2 |
| )[:1000] |
|
|
| prompt = f"""Generate Python code to accomplish this goal. |
| |
| Goal: {goal} |
| Context: {context[:500]}{context_str} |
| |
| Requirements: |
| - Use ONLY standard library modules (os, json, sys, math, time, re, collections, itertools, typing, dataclasses, hashlib) |
| - Handle errors gracefully with try/except |
| - Print the result at the end |
| - No external API calls unless goal explicitly requires it |
| - Max 50 lines |
| - Return ONLY the Python code in a ```python code block |
| |
| The code must be complete and runnable. |
| """ |
| try: |
| raw = await self._llm(prompt, model_hint="fast", max_tokens=2000) |
| return self._extract_code(raw) |
| except Exception: |
| return None |
|
|
| def _extract_code(self, text: str) -> Optional[str]: |
| """Extract Python code from LLM output.""" |
| import re |
| |
| match = re.search(r'```(?:python|py)?\s*\n?(.*?)\n?```', text, re.DOTALL) |
| if match: |
| return match.group(1).strip() |
| |
| match = re.search(r'```\s*\n?(.*?)\n?```', text, re.DOTALL) |
| if match: |
| return match.group(1).strip() |
| return text.strip() |
|
|
| def _validate_code(self, code: str) -> tuple[bool, Optional[str]]: |
| """Pre-validate code for safety before execution.""" |
| if not code: |
| return False, "Empty code" |
|
|
| |
| dangerous = ["__import__", "eval(", "exec(", "compile(", |
| "open(", "os.system", "subprocess", "shutil", |
| "socket", "requests.get", "urllib.request"] |
| for d in dangerous: |
| if d in code: |
| return False, f"Dangerous operation blocked: {d}" |
|
|
| |
| try: |
| tree = ast.parse(code) |
| except SyntaxError as e: |
| return False, f"Syntax error: {e}" |
|
|
| |
| for node in ast.walk(tree): |
| if isinstance(node, (ast.Import, ast.ImportFrom)): |
| for alias in node.names: |
| if alias.name in ("os", "subprocess", "shutil", "socket", |
| "ctypes", "multiprocessing"): |
| if not any(getattr(n, 'attr', '') == 'path' for n in ast.walk(node) |
| if isinstance(n, ast.Attribute)): |
| return False, f"Unsafe import: {alias.name}" |
|
|
| return True, None |
|
|
| async def _execute_safe(self, code: str, goal: str) -> ToolOutput: |
| """Execute Python code in a sandboxed environment.""" |
| start = time.time() |
|
|
| local_vars = {"__builtins__": __builtins__} |
| stdout_capture = io.StringIO() |
| stderr_capture = io.StringIO() |
| old_stdout = sys.stdout |
| old_stderr = sys.stderr |
|
|
| try: |
| sys.stdout = stdout_capture |
| sys.stderr = stderr_capture |
|
|
| compiled = compile(code.strip(), "<agent_code>", "exec") |
| loop = asyncio.get_running_loop() |
|
|
| def run(): |
| try: |
| exec(compiled, local_vars) |
| except Exception: |
| traceback.print_exc() |
|
|
| try: |
| await asyncio.wait_for( |
| loop.run_in_executor(None, run), |
| timeout=_MAX_EXECUTION_TIME |
| ) |
| except asyncio.TimeoutError: |
| return ToolOutput( |
| tool_name="code_forge", |
| output="", |
| error=f"Execution timed out ({_MAX_EXECUTION_TIME}s)", |
| latency_ms=int((time.time() - start) * 1000), |
| ) |
|
|
| output = stdout_capture.getvalue()[:_MAX_OUTPUT_SIZE] |
| error = stderr_capture.getvalue()[:_MAX_OUTPUT_SIZE] |
|
|
| return ToolOutput( |
| tool_name="code_forge", |
| output=output or "(no output)", |
| error=error if error else None, |
| latency_ms=int((time.time() - start) * 1000), |
| ) |
|
|
| except Exception as e: |
| return ToolOutput( |
| tool_name="code_forge", |
| output="", |
| error=str(e)[:500], |
| latency_ms=int((time.time() - start) * 1000), |
| ) |
| finally: |
| sys.stdout = old_stdout |
| sys.stderr = old_stderr |
|
|
| async def _fix_code(self, code: str, error: str, goal: str) -> Optional[str]: |
| """Self-heal: analyze error and fix the code.""" |
| if not self._llm: |
| return None |
|
|
| prompt = f"""The following Python code had an error. Fix it. |
| |
| Code: |
| ```python |
| {code} |
| ``` |
| |
| Error: |
| {error[:500]} |
| |
| Goal: {goal} |
| |
| Return the FIXED code in a ```python block. |
| """ |
| try: |
| raw = await self._llm(prompt, model_hint="fast", max_tokens=2000) |
| return self._extract_code(raw) |
| except Exception: |
| return None |
|
|
| async def _generate_only(self, goal: str) -> str: |
| """Generate code without executing (display-only mode).""" |
| if not self._llm: |
| return "Code generation disabled." |
| code = await self._generate_code(goal, "", fast_mode=True) |
| if code: |
| return f"Generated code:\n```python\n{code}\n```\n\n(Execution disabled. Set ADAM_ENABLE_CODE=true to run.)" |
| return "Failed to generate code." |
|
|
| @property |
| def success_rate(self) -> float: |
| if self._synthesis_count == 0: |
| return 1.0 |
| return self._success_count / self._synthesis_count |
|
|