File size: 8,779 Bytes
ba2ada2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
"""
Code Forge — generates and executes optimized Python code.

"Code-as-action" paradigm: instead of calling predefined tools,
the agent generates standalone Python scripts to accomplish goals.
Generated code is sandboxed, has resource limits, and includes
automatic error recovery.

Ultra-lightweight: uses Python's ast module for pre-validation
and asyncio subprocess for safe execution.
"""
import asyncio
import os
import sys
import json
import ast
import traceback
import io
import time
from typing import Optional
from schemas.agent import ToolOutput

_MAX_EXECUTION_TIME = int(os.getenv("ADAM_CODE_TIMEOUT", "20"))
_MAX_OUTPUT_SIZE = int(os.getenv("ADAM_CODE_MAX_OUTPUT", "10000"))
_ENABLE_CODE_EXEC = os.getenv("ADAM_ENABLE_CODE", "true").lower() == "true"


class CodeForge:
    """
    Generates and executes Python code to accomplish agent goals.

    Features:
    - Self-healing: if generated code fails, analyzes error and fixes it
    - Sandboxed execution: resource limits and timeout
    - Static pre-validation: checks code safety before running
    - Optimized generation: produces minimal, efficient code
    """

    def __init__(self, llm_call_fn=None):
        self._llm = llm_call_fn
        self._auto_fix = True
        self._synthesis_count = 0
        self._success_count = 0

    async def execute(self, goal: str, context: str = "",
                       previous_results: dict = None,
                       fast_mode: bool = False) -> str:
        """
        Generate and execute code to accomplish a goal.
        Returns the execution output.
        """
        if not _ENABLE_CODE_EXEC:
            return await self._generate_only(goal)

        # 1. Generate code
        code = await self._generate_code(goal, context, previous_results, fast_mode)
        if not code:
            return "Failed to generate code."

        # 2. Validate code safety
        is_safe, error = self._validate_code(code)
        if not is_safe:
            return f"Code validation failed: {error}"

        # 3. Execute with sandbox
        result = await self._execute_safe(code, goal)
        self._synthesis_count += 1

        if result.error and self._auto_fix:
            # Self-healing: try to fix the code
            fixed = await self._fix_code(code, result.error, goal)
            if fixed and fixed != code:
                result = await self._execute_safe(fixed, goal)
                if not result.error:
                    self._success_count += 1
                    return result.output

        if result.error:
            return f"Execution error: {result.error[:500]}"
        return result.output

    async def _generate_code(self, goal: str, context: str,
                              previous_results: dict = None,
                              fast_mode: bool = False) -> Optional[str]:
        """Generate Python code using the LLM."""
        if not self._llm:
            return None

        context_str = ""
        if previous_results:
            context_str = "\nPrevious results:\n" + json.dumps(
                {k: str(v)[:200] for k, v in previous_results.items()},
                indent=2
            )[:1000]

        prompt = f"""Generate Python code to accomplish this goal.

Goal: {goal}
Context: {context[:500]}{context_str}

Requirements:
- Use ONLY standard library modules (os, json, sys, math, time, re, collections, itertools, typing, dataclasses, hashlib)
- Handle errors gracefully with try/except
- Print the result at the end
- No external API calls unless goal explicitly requires it
- Max 50 lines
- Return ONLY the Python code in a ```python code block

The code must be complete and runnable.
"""
        try:
            raw = await self._llm(prompt, model_hint="fast", max_tokens=2000)
            return self._extract_code(raw)
        except Exception:
            return None

    def _extract_code(self, text: str) -> Optional[str]:
        """Extract Python code from LLM output."""
        import re
        # Match ```python ... ``` blocks
        match = re.search(r'```(?:python|py)?\s*\n?(.*?)\n?```', text, re.DOTALL)
        if match:
            return match.group(1).strip()
        # Fallback: match any code-looking block
        match = re.search(r'```\s*\n?(.*?)\n?```', text, re.DOTALL)
        if match:
            return match.group(1).strip()
        return text.strip()

    def _validate_code(self, code: str) -> tuple[bool, Optional[str]]:
        """Pre-validate code for safety before execution."""
        if not code:
            return False, "Empty code"

        # Check for dangerous operations
        dangerous = ["__import__", "eval(", "exec(", "compile(",
                      "open(", "os.system", "subprocess", "shutil",
                      "socket", "requests.get", "urllib.request"]
        for d in dangerous:
            if d in code:
                return False, f"Dangerous operation blocked: {d}"

        # AST validation
        try:
            tree = ast.parse(code)
        except SyntaxError as e:
            return False, f"Syntax error: {e}"

        # Check for unsafe AST nodes
        for node in ast.walk(tree):
            if isinstance(node, (ast.Import, ast.ImportFrom)):
                for alias in node.names:
                    if alias.name in ("os", "subprocess", "shutil", "socket",
                                       "ctypes", "multiprocessing"):
                        if not any(getattr(n, 'attr', '') == 'path' for n in ast.walk(node)
                                   if isinstance(n, ast.Attribute)):
                            return False, f"Unsafe import: {alias.name}"

        return True, None

    async def _execute_safe(self, code: str, goal: str) -> ToolOutput:
        """Execute Python code in a sandboxed environment."""
        start = time.time()

        local_vars = {"__builtins__": __builtins__}
        stdout_capture = io.StringIO()
        stderr_capture = io.StringIO()
        old_stdout = sys.stdout
        old_stderr = sys.stderr

        try:
            sys.stdout = stdout_capture
            sys.stderr = stderr_capture

            compiled = compile(code.strip(), "<agent_code>", "exec")
            loop = asyncio.get_running_loop()

            def run():
                try:
                    exec(compiled, local_vars)
                except Exception:
                    traceback.print_exc()

            try:
                await asyncio.wait_for(
                    loop.run_in_executor(None, run),
                    timeout=_MAX_EXECUTION_TIME
                )
            except asyncio.TimeoutError:
                return ToolOutput(
                    tool_name="code_forge",
                    output="",
                    error=f"Execution timed out ({_MAX_EXECUTION_TIME}s)",
                    latency_ms=int((time.time() - start) * 1000),
                )

            output = stdout_capture.getvalue()[:_MAX_OUTPUT_SIZE]
            error = stderr_capture.getvalue()[:_MAX_OUTPUT_SIZE]

            return ToolOutput(
                tool_name="code_forge",
                output=output or "(no output)",
                error=error if error else None,
                latency_ms=int((time.time() - start) * 1000),
            )

        except Exception as e:
            return ToolOutput(
                tool_name="code_forge",
                output="",
                error=str(e)[:500],
                latency_ms=int((time.time() - start) * 1000),
            )
        finally:
            sys.stdout = old_stdout
            sys.stderr = old_stderr

    async def _fix_code(self, code: str, error: str, goal: str) -> Optional[str]:
        """Self-heal: analyze error and fix the code."""
        if not self._llm:
            return None

        prompt = f"""The following Python code had an error. Fix it.

Code:
```python
{code}
```

Error:
{error[:500]}

Goal: {goal}

Return the FIXED code in a ```python block.
"""
        try:
            raw = await self._llm(prompt, model_hint="fast", max_tokens=2000)
            return self._extract_code(raw)
        except Exception:
            return None

    async def _generate_only(self, goal: str) -> str:
        """Generate code without executing (display-only mode)."""
        if not self._llm:
            return "Code generation disabled."
        code = await self._generate_code(goal, "", fast_mode=True)
        if code:
            return f"Generated code:\n```python\n{code}\n```\n\n(Execution disabled. Set ADAM_ENABLE_CODE=true to run.)"
        return "Failed to generate code."

    @property
    def success_rate(self) -> float:
        if self._synthesis_count == 0:
            return 1.0
        return self._success_count / self._synthesis_count