File size: 13,756 Bytes
d12a6df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# agentflow/tools/python_coder/tool.py

import os
import re
import sys
from io import StringIO
import contextlib

import threading
from agentflow.tools.base import BaseTool
from agentflow.engine.factory import create_llm_engine

import signal
from contextlib import contextmanager

import platform

# Tool name mapping - this defines the external name for this tool
TOOL_NAME = "Python_Code_Generator_Tool"

# Custom exception for code execution timeout
class TimeoutException(Exception):
    pass

# Custom context manager for code execution timeout
@contextmanager
def timeout(seconds):
    timer = threading.Timer(seconds, lambda: (_ for _ in ()).throw(TimeoutException("Code execution timed out")))
    timer.start()
    try:
        yield
    finally:
        timer.cancel()


LIMITATION = f"""
The {TOOL_NAME} has several limitations:
1. Restricted to basic Python arithmetic operations and built-in mathematical functions.
2. Cannot use any external libraries or modules, including those in the Python standard library.
3. Limited to simple mathematical calculations and problems.
4. Cannot perform any string processing, data structure manipulation, or complex algorithms.
5. No access to any system resources, file operations, or network requests.
6. Cannot use 'import' statements.
7. All calculations must be self-contained within a single function or script.
8. Input must be provided directly in the query string.
9. Output is limited to numerical results or simple lists/tuples of numbers.
10. Output should be kept to a single numerical result or a simple list/tuple of numbers.
11. DO NOT generate loop output.
"""

BEST_PRACTICE = f"""
For optimal results with the {TOOL_NAME}:
1. Provide clear and specific queries that describe the desired mathematical calculation.
2. Include all necessary numerical inputs directly in the query string.
3. Keep tasks focused on basic arithmetic, algebraic calculations, or simple mathematical algorithms.
4. Ensure all required numerical data is included in the query.
5. Verify that the query only involves mathematical operations and does not require any data processing or complex algorithms.
6. Review generated code to ensure it only uses basic Python arithmetic operations and built-in math functions.
"""

class Python_Coder_Tool(BaseTool):
    require_llm_engine = True
    def __init__(self, model_string="dashscope-qwen2.5-coder-7b-instruct"):
        super().__init__(
            tool_name=TOOL_NAME,
            tool_description="A tool that generates and executes simple Python code snippets for basic arithmetical calculations and math-related problems. The generated code runs in a highly restricted environment with only basic mathematical operations available.",
            tool_version="1.0.0",
            input_types={
                "query": "str - A clear, specific description of the arithmetic calculation or math problem to be solved, including any necessary numerical inputs."},
            output_type="dict - A dictionary containing the generated code, calculation result, and any error messages.",
            demo_commands=[
                # {
                #     "command": 'execution = tool.execute(query="Calculate the factorial of 5")',
                #     "description": "Generate a Python code snippet to calculate the factorial of 5."
                # },
                {
                    "command": 'execution = tool.execute(query="Find the sum of prime numbers up to 50")',
                    "description": "Generate a Python code snippet to find the sum of prime numbers up to 50."
                },
                {
                    "command": 'query="Given the list [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], calculate the sum of squares of odd numbers"\nexecution = tool.execute(query=query)',
                    "description": "Generate a Python function for a specific mathematical operation on a given list of numbers."
                },
            ],
            user_metadata = {
                "limitations": LIMITATION,
                "best_practices": BEST_PRACTICE
            }
        )
        print(f"Initializing Python_Coder_Tool with model_string: {model_string}")
        # self.llm_engine = create_llm_engine(model_string=model_string, is_multimodal=False, base_url=base_url) if model_string else None

        # NOTE: deterministic mode
        self.llm_engine = create_llm_engine(
            model_string=model_string, 
            is_multimodal=False, 
            temperature=0.0, 
            top_p=1.0, 
            frequency_penalty=0.0, 
            presence_penalty=0.0
            ) if model_string else None

    @staticmethod
    def preprocess_code(code):
        """
        Preprocesses the generated code snippet by extracting it from the response.
        Returns only the first Python code block found.

        Parameters:
            code (str): The response containing the code snippet.

        Returns:
            str: The extracted code snippet from the first Python block.
            
        Raises:
            ValueError: If no Python code block is found.
        """
        # Look for the first occurrence of a Python code block
        match = re.search(r"```python\s*(.*?)\s*```", code, re.DOTALL)
        if not match:
            raise ValueError("No Python code block found in the response")
        return match.group(1).strip()

    def truncate_string(self, text, max_length):
        """
        Truncates a string using middle truncation if it exceeds max_length.

        Parameters:
            text (str): The text to truncate
            max_length (int): Maximum allowed length

        Returns:
            str: Truncated text with middle omission if needed
        """
        if len(text) <= max_length:
            return text

        # Keep first and last portions
        head_size = max_length // 2 - 50  # Leave room for truncation message
        tail_size = max_length // 2 - 50

        return (
            text[:head_size] +
            " ... (truncated: middle content omitted) ... " +
            text[-tail_size:]
        )

    def safe_repr(self, obj, max_length=2000):
        """
        Safely represent a variable with truncation for large objects.

        Parameters:
            obj: The object to represent
            max_length (int): Maximum length for representation

        Returns:
            str: Safe string representation of the object
        """
        try:
            # Handle special cases that can be extremely verbose
            import types

            # Skip function objects, modules, classes
            if isinstance(obj, (types.FunctionType, types.ModuleType, type)):
                return f"<{type(obj).__name__}: {getattr(obj, '__name__', 'unnamed')}>"

            # Handle itertools and other iterator objects
            if hasattr(obj, '__iter__') and hasattr(obj, '__next__'):
                return f"<iterator: {type(obj).__name__}>"

            # Convert to string and truncate if needed
            obj_str = str(obj)
            return self.truncate_string(obj_str, max_length)

        except Exception as e:
            return f"<repr error: {type(obj).__name__}>"

    @contextlib.contextmanager
    def capture_output(self):
        """
        Context manager to capture the standard output.

        Yields:
            StringIO: The captured output.
        """
        new_out = StringIO()
        old_out = sys.stdout
        sys.stdout = new_out
        try:
            yield sys.stdout
        finally:
            sys.stdout = old_out

    def execute_code_snippet(self, code, max_head_tail=3500, max_var_length=2000, max_vars=20):
        """
        Executes the given Python code snippet.

        Parameters:
            code (str): The Python code snippet to be executed.
            max_head_tail (int): Maximum length for printed output before truncation
            max_var_length (int): Maximum length for each variable representation
            max_vars (int): Maximum number of variables to include in output

        Returns:
            dict: A dictionary containing the printed output and local variables.
        """
        # Check for dangerous functions and remove them
        dangerous_functions = ['exit', 'quit', 'sys.exit']
        for func in dangerous_functions:
            if func in code:
                print(f"Warning: Removing unsafe '{func}' call from code")
                # Use regex to remove function calls with any arguments
                code = re.sub(rf'{func}\s*\([^)]*\)', 'break', code)

        try:
            execution_code = self.preprocess_code(code)

            # Execute with 10-second timeout
            with timeout(10):
                try:
                    exec(execution_code)
                except TimeoutException:
                    print("Error: Code execution exceeded 60 seconds timeout")
                    return {"error": "Execution timed out after 60 seconds"}
                except Exception as e:
                    print(f"Error executing code: {e}")
                    return {"error": str(e)}
                
            # Capture the output and local variables
            local_vars = {}
            with self.capture_output() as output:
                exec(execution_code, {}, local_vars)
            raw_output = output.getvalue().strip()

            # Truncate printed output using middle truncation
            printed_output = self.truncate_string(raw_output, max_head_tail)

            # Filter and safely represent variables
            """
            Only the variables used in the code are returned,
            excluding built-in variables (which start with '__') and imported modules.
            All variables are safely represented with truncation.
            """
            used_vars = {}
            var_count = 0
            for k, v in local_vars.items():
                if not k.startswith('__') and not isinstance(v, type(sys)):
                    if var_count >= max_vars:
                        used_vars["__truncated__"] = f"... ({len(local_vars) - var_count} more variables omitted)"
                        break
                    # Safely represent the variable with truncation
                    used_vars[k] = self.safe_repr(v, max_var_length)
                    var_count += 1

            return {"printed_output": printed_output, "variables": used_vars, "execution_code":execution_code}
        
        except Exception as e:
            print(f"Error executing code: {e}")
            return {"error": str(e)}

    def execute(self, query):
        """
        Generates and executes Python code based on the provided query.

        Parameters:
            query (str): A query describing the desired operation.

        Returns:
            dict: A dictionary containing the executed output, local variables, or any error message.
        """

        if not self.llm_engine:
            raise ValueError("LLM engine not initialized. Please provide a valid model_string when initializing the tool.")

        task_description = """
        Given a query, generate a Python code snippet that performs the specified operation on the provided data. Please think step by step. Ensure to break down the process into clear, logical steps. Make sure to print the final result in the generated code snippet with a descriptive message explaining what the output represents. The final output should be presented in the following format:

        ```python
        <code snippet>
        ```
        """
        task_description = task_description.strip()
        full_prompt = f"Task:\n{task_description}\n\nQuery:\n{query}"

        response = self.llm_engine(full_prompt)
        result_or_error = self.execute_code_snippet(response)
        return result_or_error

    def get_metadata(self):
        """
        Returns the metadata for the Python_Coder_Tool.

        Returns:
            dict: A dictionary containing the tool's metadata.
        """
        metadata = super().get_metadata()
        metadata["require_llm_engine"] = self.require_llm_engine # NOTE: can be removed if not needed
        return metadata


if __name__ == "__main__":
    # Test command:
    """
    Run the following commands in the terminal to test the script:
    
    cd agentflow/tools/python_coder
    python tool.py
    """

    # Get the directory of the current script
    script_dir = os.path.dirname(os.path.abspath(__file__))

    # Example usage of the Python_Coder_Tool
    # tool = Python_Coder_Tool()

    # tool = Python_Coder_Tool(model_string="gpt-4o-mini") # NOTE: strong LLM for tool
    # tool = Python_Coder_Tool(model_string="gemini-1.5-flash") # NOTE: weak 8B model for tool
    tool = Python_Coder_Tool(model_string="dashscope") # NOTE: weak Qwen2.5-7B model for tool
    # tool = Python_Coder_Tool(model_string="together-Qwen/Qwen2.5-7B-Instruct") # NOTE: weak Qwen2.5-7B model for tool

    # Get tool metadata
    metadata = tool.get_metadata()
    print(metadata)

    # Sample query for generating and executing Python code
    queries = [
        # "Given the number list: [1, 2, 3, 4, 5], calculate the sum of all the numbers in the list.",
        # "Print numbers from 1 to 1000 in a loop to test output truncation",
        "Create a list variable containing all numbers from 1 to 5000 to test variable truncation",
    ]
    for query in queries:
        print(f"\n###Query: {query}")
        # Execute the tool with the sample query
        try:
            execution = tool.execute(query=query)
            print("\n###Execution Result:", execution)
        except ValueError as e:
            print(f"Execution failed: {e}")

    print("Done!")