import re from typing import Dict, List, Optional, Union import json5 from qwen_agent.tools.base import BaseToolWithFileAccess, register_tool from qwen_agent.utils.utils import extract_code from sandbox_fusion import run_code, RunCodeRequest, RunStatus from requests.exceptions import Timeout import os import random import time from concurrent.futures import ThreadPoolExecutor, as_completed SANDBOX_URL = os.getenv('SANDBOX_URL', '') SANDBOX_FUSION_ENDPOINTS = [SANDBOX_URL] # Fallback to single endpoint if environment variable exists if 'SANDBOX_FUSION_ENDPOINT' in os.environ: SANDBOX_FUSION_ENDPOINTS = os.environ['SANDBOX_FUSION_ENDPOINT'].split(',') def has_chinese_chars(data) -> bool: CHINESE_CHAR_RE = re.compile(r'[\u4e00-\u9fff]') text = f'{data}' return bool(CHINESE_CHAR_RE.search(text)) @register_tool('PythonInterpreter', allow_overwrite=True) class PythonInterpreter(BaseToolWithFileAccess): name = "PythonInterpreter" description = 'Execute Python code in a sandboxed environment. Use this to run Python code and get the execution results.\n**Make sure to use print() for any output you want to see in the results.**\nFor code parameters, use placeholders first, and then put the code within XML tags, such as:\n\n{"purpose": , "name": , "arguments": {"code": ""}}\n\nHere is the code.\n\n\n' parameters = { "type": "object", "properties": { "code": { "type": "string", "description": "The Python code to execute. Must be provided within XML tags. Remember to use print() statements for any output you want to see.", } }, "required": ["code"], } def __init__(self, cfg: Optional[Dict] = None): super().__init__(cfg) @property def args_format(self) -> str: fmt = self.cfg.get('args_format') if fmt is None: if has_chinese_chars([self.name_for_human, self.name, self.description, self.parameters]): fmt = 'The input for this tool should be a Markdown code block.' else: fmt = 'Enclose the code within triple backticks (`) at the beginning and end of the code.' return fmt def observation(self, tool: dict, tool_dict: dict, tool_results, empty_mode: bool = False, readpage: bool = False, max_observation_length: int = None, tokenizer=None): print('test') assert isinstance(tool_results, str), f"result of python code should be str, instead of {type(tool_results)}. {tool_results}" return tool_results @property def function(self) -> dict: return { 'name': self.name, 'description': self.description, 'parameters': self.parameters, } def call(self, params, files=None, timeout=50, **kwargs) -> str: try: try: code = params.split('')[1].split('= timeout - 1: result.append(f"[PythonInterpreter Error] TimeoutError: Execution timed out.") result = '\n'.join(result) print('SUCCESS RUNNING TOOL') return result if result.strip() else 'Finished execution.' except Timeout as e: last_error = f'[Python Interpreter Error] TimeoutError: Execution timed out on endpoint {endpoint}.' print(f"Timeout on attempt {attempt + 1}: {last_error}") if attempt == 4: # Last attempt return last_error continue except Exception as e: last_error = f'[Python Interpreter Error]: {str(e)} on endpoint {endpoint}' print(f"Error on attempt {attempt + 1}: {last_error}") if attempt == 4: # Last attempt return last_error continue return last_error if last_error else '[Python Interpreter Error]: All attempts failed.' except Exception as e: return f"[Python Interpreter Error]: {str(e)}" def call_specific_endpoint(self, params: Union[str, dict], endpoint: str, timeout: Optional[int] = 30, **kwargs) -> tuple: """Test a specific endpoint directly""" try: if type(params) is str: params = json5.loads(params) code = params.get('code', '') if not code: code = params.get('raw', '') triple_match = re.search(r'```[^\n]*\n(.+?)```', code, re.DOTALL) if triple_match: code = triple_match.group(1) except Exception: code = extract_code(params) if not code.strip(): return False, '[Python Interpreter Error]: Empty code.' try: start_time = time.time() code_result = run_code(RunCodeRequest(code=code, language='python', run_timeout=timeout), max_attempts=1, client_timeout=timeout, endpoint=endpoint) end_time = time.time() result = [] if code_result.run_result.stdout: result.append(f"stdout:\n{code_result.run_result.stdout}") if code_result.run_result.stderr: result.append(f"stderr:\n{code_result.run_result.stderr}") result = '\n'.join(result) execution_time = end_time - start_time return True, result if result.strip() else 'Finished execution.', execution_time except Timeout as e: return False, f'[Python Interpreter Error] TimeoutError: Execution timed out.', None except Exception as e: return False, f'[Python Interpreter Error]: {str(e)}', None