|
|
import os |
|
|
import pathlib |
|
|
from typing import Tuple, Optional, IO, Union, Dict |
|
|
import time |
|
|
from hashlib import md5 |
|
|
import docker |
|
|
from ..tools.base_tool import BaseTool, BaseToolRequest, BaseToolResponse |
|
|
import re |
|
|
from ..exceptions.exceptions import InputErrorException, SandBoxFileUploadException |
|
|
from werkzeug.datastructures import FileStorage |
|
|
from ..utils import get_logger |
|
|
|
|
|
logger = get_logger() |
|
|
|
|
|
try: |
|
|
import docker |
|
|
except ImportError: |
|
|
docker = None |
|
|
|
|
|
WORKING_DIR = os.path.join(os.getcwd(), "tmp/code_space") |
|
|
OUTPUT_DIR = os.path.join(os.getcwd(), "tmp/output_space") |
|
|
UPLOAD_PATH = os.path.join(os.getcwd(), "tmp/upload_files") |
|
|
|
|
|
|
|
|
class CodeToolRequest(BaseToolRequest): |
|
|
""" |
|
|
Request for Code Tool |
|
|
""" |
|
|
def __init__(self, code_str: str): |
|
|
|
|
|
code_blocks = re.findall(r'```(?:python)?\s*(.*?)\s*```', code_str, re.DOTALL) |
|
|
python_code_cleaned = '\n'.join(code_blocks).strip() |
|
|
self.code = python_code_cleaned |
|
|
|
|
|
class PythonSandBoxToolResponseDocker: |
|
|
def __init__(self, formatter, raw_output) -> None: |
|
|
self.formatter = formatter |
|
|
self.raw_output = raw_output |
|
|
|
|
|
@property |
|
|
def output_text(self): |
|
|
return self.formatter.format(self.raw_output) |
|
|
|
|
|
|
|
|
class CodeToolResponse(BaseToolResponse): |
|
|
""" |
|
|
Response for Code Tool |
|
|
""" |
|
|
def __init__(self, exit_code: int, log: str, output_dir: str): |
|
|
self.exit_code = exit_code |
|
|
self.log = log |
|
|
self.output_dir = output_dir |
|
|
self.output_text = log |
|
|
|
|
|
def to_dict(self): |
|
|
return { |
|
|
"exit_code": self.exit_code, |
|
|
"log": self.log, |
|
|
"output_dir": self.output_dir |
|
|
} |
|
|
|
|
|
|
|
|
class CodeTool(BaseTool): |
|
|
""" |
|
|
Code Tool for code execution |
|
|
""" |
|
|
def __init__(self, |
|
|
name: Optional[str] = "Code Tool", |
|
|
description: Optional[str] = "tool for code_exec", |
|
|
|
|
|
image: Optional[str] = "myimg", |
|
|
time_out: Optional[int] = 60, |
|
|
work_dir: Optional[str] = WORKING_DIR, |
|
|
output_dir: Optional[str] = OUTPUT_DIR, |
|
|
**kwargs |
|
|
): |
|
|
super().__init__(name, description, **kwargs) |
|
|
self._client = docker.from_env() |
|
|
self._image = image |
|
|
self._time_out = time_out |
|
|
self._work_dir = work_dir |
|
|
self._output_dir = output_dir |
|
|
self._upload_file_name = None |
|
|
self._upload_file_path = None |
|
|
self._code_idx = md5(str(time.time()).encode()).digest().hex() |
|
|
self._log_len = 0 |
|
|
|
|
|
@classmethod |
|
|
async def create(cls, config_data, **params): |
|
|
|
|
|
instance = cls(name=config_data['name'], description=config_data['description'], **params) |
|
|
return instance |
|
|
|
|
|
async def set_sandbox_id(self, sandbox_id): |
|
|
self._sandbox_id = sandbox_id |
|
|
|
|
|
@property |
|
|
def sandbox_id(self): |
|
|
"""Getter for sandbox_id.""" |
|
|
return self._sandbox_id if self._sandbox_id else None |
|
|
|
|
|
|
|
|
async def async_run(self, req: str): |
|
|
req = CodeToolRequest(req) |
|
|
code = req.code |
|
|
if code is None: |
|
|
return "No code to execute", 1, "" |
|
|
|
|
|
|
|
|
abs_path = pathlib.Path(self._work_dir).absolute() |
|
|
code_hash = self._code_idx |
|
|
file_name = f"exec_code_{code_hash}.py" |
|
|
file_path = os.path.join(self._work_dir, file_name) |
|
|
self._file_path = file_path |
|
|
file_dir = os.path.dirname(file_path) |
|
|
self._file_dir = file_dir |
|
|
os.makedirs(file_dir, exist_ok=True) |
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
if self._upload_file_name: |
|
|
upload_file_path = os.path.join(UPLOAD_PATH, self._upload_file_name) |
|
|
|
|
|
|
|
|
with open(file_path, "a", encoding="utf-8") as fout: |
|
|
fout.write(code) |
|
|
cmd = f'python3 {file_name}' |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
if self._upload_file_name: |
|
|
container = self._client.containers.run( |
|
|
image=self._image, |
|
|
command=cmd, |
|
|
detach=True, |
|
|
working_dir="/workspace", |
|
|
mem_limit='1024m', |
|
|
volumes={abs_path: {'bind': '/workspace','mode': 'rw'}, |
|
|
upload_file_path: {'bind': f'/tmp/upload_files/{self._upload_file_name}','mode': 'rw'}}, |
|
|
) |
|
|
else: |
|
|
container = self._client.containers.run( |
|
|
image=self._image, |
|
|
command=cmd, |
|
|
detach=True, |
|
|
working_dir="/workspace", |
|
|
mem_limit='10m', |
|
|
volumes={abs_path: {'bind': '/workspace','mode': 'rw'}}, |
|
|
) |
|
|
|
|
|
|
|
|
while container.status != "exited" and time.time() - start_time < self._time_out: |
|
|
container.reload() |
|
|
|
|
|
|
|
|
if container.status != "exited": |
|
|
container.stop() |
|
|
container.remove() |
|
|
return "TIMEOUT", 1, "" |
|
|
|
|
|
|
|
|
logs = container.logs().decode("utf-8").rstrip() |
|
|
with open(os.path.join(file_dir, f'log.txt'), 'w') as log_file: |
|
|
log_file.write(logs) |
|
|
new_len = len(logs) |
|
|
logs = logs[self._log_len:] |
|
|
self._log_len = new_len |
|
|
|
|
|
exit_code = container.attrs["State"]["ExitCode"] |
|
|
container.remove() |
|
|
|
|
|
|
|
|
output_dir = os.path.join(OUTPUT_DIR, f'output_{code_hash}') |
|
|
self._output_dir = output_dir |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
response = CodeToolResponse(exit_code, logs, output_dir) |
|
|
|
|
|
return response |
|
|
|
|
|
async def sync_to_sandbox(self, file: Union[str, Dict, FileStorage]) -> str: |
|
|
if isinstance(file, str): |
|
|
logger.info(f"Upload File As FilePath: {file}") |
|
|
file_path = await self.upload_file(file) |
|
|
else: |
|
|
err_msg = f"Invalid file input type. Expected str, FileStorage, or Dict. Got {type(file)}" |
|
|
logger.error(err_msg) |
|
|
raise InputErrorException(err_msg) |
|
|
|
|
|
return file_path |
|
|
|
|
|
async def upload_file(self, file_path: str): |
|
|
file_name = file_path.split("/")[-1] |
|
|
self._upload_file_path = file_path |
|
|
self._upload_file_name = file_name |
|
|
|
|
|
return file_path |
|
|
|
|
|
async def save_file(self): |
|
|
output_dir = self._output_dir |
|
|
file_path = self._file_path |
|
|
file_dir = self._file_dir |
|
|
abs_path = pathlib.Path(self._work_dir).absolute() |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
os.rename(file_path, os.path.join(file_dir, 'exec_code.py')) |
|
|
for f in os.listdir(abs_path): |
|
|
os.rename(os.path.join(abs_path, f), os.path.join(output_dir, f)) |
|
|
os.rmdir(abs_path) |
|
|
|