""" Code Forge — generates and executes optimized Python code. "Code-as-action" paradigm: instead of calling predefined tools, the agent generates standalone Python scripts to accomplish goals. Generated code is sandboxed, has resource limits, and includes automatic error recovery. Ultra-lightweight: uses Python's ast module for pre-validation and asyncio subprocess for safe execution. """ import asyncio import os import sys import json import ast import traceback import io import time from typing import Optional from schemas.agent import ToolOutput _MAX_EXECUTION_TIME = int(os.getenv("ADAM_CODE_TIMEOUT", "20")) _MAX_OUTPUT_SIZE = int(os.getenv("ADAM_CODE_MAX_OUTPUT", "10000")) _ENABLE_CODE_EXEC = os.getenv("ADAM_ENABLE_CODE", "true").lower() == "true" class CodeForge: """ Generates and executes Python code to accomplish agent goals. Features: - Self-healing: if generated code fails, analyzes error and fixes it - Sandboxed execution: resource limits and timeout - Static pre-validation: checks code safety before running - Optimized generation: produces minimal, efficient code """ def __init__(self, llm_call_fn=None): self._llm = llm_call_fn self._auto_fix = True self._synthesis_count = 0 self._success_count = 0 async def execute(self, goal: str, context: str = "", previous_results: dict = None, fast_mode: bool = False) -> str: """ Generate and execute code to accomplish a goal. Returns the execution output. """ if not _ENABLE_CODE_EXEC: return await self._generate_only(goal) # 1. Generate code code = await self._generate_code(goal, context, previous_results, fast_mode) if not code: return "Failed to generate code." # 2. Validate code safety is_safe, error = self._validate_code(code) if not is_safe: return f"Code validation failed: {error}" # 3. Execute with sandbox result = await self._execute_safe(code, goal) self._synthesis_count += 1 if result.error and self._auto_fix: # Self-healing: try to fix the code fixed = await self._fix_code(code, result.error, goal) if fixed and fixed != code: result = await self._execute_safe(fixed, goal) if not result.error: self._success_count += 1 return result.output if result.error: return f"Execution error: {result.error[:500]}" return result.output async def _generate_code(self, goal: str, context: str, previous_results: dict = None, fast_mode: bool = False) -> Optional[str]: """Generate Python code using the LLM.""" if not self._llm: return None context_str = "" if previous_results: context_str = "\nPrevious results:\n" + json.dumps( {k: str(v)[:200] for k, v in previous_results.items()}, indent=2 )[:1000] prompt = f"""Generate Python code to accomplish this goal. Goal: {goal} Context: {context[:500]}{context_str} Requirements: - Use ONLY standard library modules (os, json, sys, math, time, re, collections, itertools, typing, dataclasses, hashlib) - Handle errors gracefully with try/except - Print the result at the end - No external API calls unless goal explicitly requires it - Max 50 lines - Return ONLY the Python code in a ```python code block The code must be complete and runnable. """ try: raw = await self._llm(prompt, model_hint="fast", max_tokens=2000) return self._extract_code(raw) except Exception: return None def _extract_code(self, text: str) -> Optional[str]: """Extract Python code from LLM output.""" import re # Match ```python ... ``` blocks match = re.search(r'```(?:python|py)?\s*\n?(.*?)\n?```', text, re.DOTALL) if match: return match.group(1).strip() # Fallback: match any code-looking block match = re.search(r'```\s*\n?(.*?)\n?```', text, re.DOTALL) if match: return match.group(1).strip() return text.strip() def _validate_code(self, code: str) -> tuple[bool, Optional[str]]: """Pre-validate code for safety before execution.""" if not code: return False, "Empty code" # Check for dangerous operations dangerous = ["__import__", "eval(", "exec(", "compile(", "open(", "os.system", "subprocess", "shutil", "socket", "requests.get", "urllib.request"] for d in dangerous: if d in code: return False, f"Dangerous operation blocked: {d}" # AST validation try: tree = ast.parse(code) except SyntaxError as e: return False, f"Syntax error: {e}" # Check for unsafe AST nodes for node in ast.walk(tree): if isinstance(node, (ast.Import, ast.ImportFrom)): for alias in node.names: if alias.name in ("os", "subprocess", "shutil", "socket", "ctypes", "multiprocessing"): if not any(getattr(n, 'attr', '') == 'path' for n in ast.walk(node) if isinstance(n, ast.Attribute)): return False, f"Unsafe import: {alias.name}" return True, None async def _execute_safe(self, code: str, goal: str) -> ToolOutput: """Execute Python code in a sandboxed environment.""" start = time.time() local_vars = {"__builtins__": __builtins__} stdout_capture = io.StringIO() stderr_capture = io.StringIO() old_stdout = sys.stdout old_stderr = sys.stderr try: sys.stdout = stdout_capture sys.stderr = stderr_capture compiled = compile(code.strip(), "", "exec") loop = asyncio.get_running_loop() def run(): try: exec(compiled, local_vars) except Exception: traceback.print_exc() try: await asyncio.wait_for( loop.run_in_executor(None, run), timeout=_MAX_EXECUTION_TIME ) except asyncio.TimeoutError: return ToolOutput( tool_name="code_forge", output="", error=f"Execution timed out ({_MAX_EXECUTION_TIME}s)", latency_ms=int((time.time() - start) * 1000), ) output = stdout_capture.getvalue()[:_MAX_OUTPUT_SIZE] error = stderr_capture.getvalue()[:_MAX_OUTPUT_SIZE] return ToolOutput( tool_name="code_forge", output=output or "(no output)", error=error if error else None, latency_ms=int((time.time() - start) * 1000), ) except Exception as e: return ToolOutput( tool_name="code_forge", output="", error=str(e)[:500], latency_ms=int((time.time() - start) * 1000), ) finally: sys.stdout = old_stdout sys.stderr = old_stderr async def _fix_code(self, code: str, error: str, goal: str) -> Optional[str]: """Self-heal: analyze error and fix the code.""" if not self._llm: return None prompt = f"""The following Python code had an error. Fix it. Code: ```python {code} ``` Error: {error[:500]} Goal: {goal} Return the FIXED code in a ```python block. """ try: raw = await self._llm(prompt, model_hint="fast", max_tokens=2000) return self._extract_code(raw) except Exception: return None async def _generate_only(self, goal: str) -> str: """Generate code without executing (display-only mode).""" if not self._llm: return "Code generation disabled." code = await self._generate_code(goal, "", fast_mode=True) if code: return f"Generated code:\n```python\n{code}\n```\n\n(Execution disabled. Set ADAM_ENABLE_CODE=true to run.)" return "Failed to generate code." @property def success_rate(self) -> float: if self._synthesis_count == 0: return 1.0 return self._success_count / self._synthesis_count