File size: 7,388 Bytes
77320e4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
import os
import pathlib
from typing import Tuple, Optional, IO, Union, Dict
import time
from hashlib import md5
import docker
from ..tools.base_tool import BaseTool, BaseToolRequest, BaseToolResponse
import re
from ..exceptions.exceptions import InputErrorException, SandBoxFileUploadException
from werkzeug.datastructures import FileStorage
from ..utils import get_logger
logger = get_logger()
try:
import docker
except ImportError:
docker = None
WORKING_DIR = os.path.join(os.getcwd(), "tmp/code_space")
OUTPUT_DIR = os.path.join(os.getcwd(), "tmp/output_space")
UPLOAD_PATH = os.path.join(os.getcwd(), "tmp/upload_files")
class CodeToolRequest(BaseToolRequest):
"""
Request for Code Tool
"""
def __init__(self, code_str: str):
# code_str = 'import pandas as pd\nimport numpy as np\n'+ code_str
code_blocks = re.findall(r'```(?:python)?\s*(.*?)\s*```', code_str, re.DOTALL)
python_code_cleaned = '\n'.join(code_blocks).strip()
self.code = python_code_cleaned
class PythonSandBoxToolResponseDocker:
def __init__(self, formatter, raw_output) -> None:
self.formatter = formatter
self.raw_output = raw_output
@property
def output_text(self):
return self.formatter.format(self.raw_output)
class CodeToolResponse(BaseToolResponse):
"""
Response for Code Tool
"""
def __init__(self, exit_code: int, log: str, output_dir: str):
self.exit_code = exit_code
self.log = log
self.output_dir = output_dir
self.output_text = log
def to_dict(self):
return {
"exit_code": self.exit_code,
"log": self.log,
"output_dir": self.output_dir
}
class CodeTool(BaseTool):
"""
Code Tool for code execution
"""
def __init__(self,
name: Optional[str] = "Code Tool",
description: Optional[str] = "tool for code_exec",
# code_tool_id: Optional[str] = "code",
image: Optional[str] = "myimg",
time_out: Optional[int] = 60,
work_dir: Optional[str] = WORKING_DIR,
output_dir: Optional[str] = OUTPUT_DIR,
**kwargs
):
super().__init__(name, description, **kwargs)
self._client = docker.from_env()
self._image = image
self._time_out = time_out
self._work_dir = work_dir
self._output_dir = output_dir
self._upload_file_name = None
self._upload_file_path = None
self._code_idx = md5(str(time.time()).encode()).digest().hex()
self._log_len = 0
@classmethod
async def create(cls, config_data, **params):
# Unpack the config_data dictionary and any additional parameters
instance = cls(name=config_data['name'], description=config_data['description'], **params)
return instance
async def set_sandbox_id(self, sandbox_id):
self._sandbox_id = sandbox_id
@property
def sandbox_id(self):
"""Getter for sandbox_id."""
return self._sandbox_id if self._sandbox_id else None
async def async_run(self, req: str):
req = CodeToolRequest(req)
code = req.code
if code is None:
return "No code to execute", 1, ""
# path and file name for python script
abs_path = pathlib.Path(self._work_dir).absolute()
code_hash = self._code_idx
file_name = f"exec_code_{code_hash}.py"
file_path = os.path.join(self._work_dir, file_name)
self._file_path = file_path
file_dir = os.path.dirname(file_path)
self._file_dir = file_dir
os.makedirs(file_dir, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
if self._upload_file_name:
upload_file_path = os.path.join(UPLOAD_PATH, self._upload_file_name)
# write code to file
with open(file_path, "a", encoding="utf-8") as fout:
fout.write(code)
cmd = f'python3 {file_name}'
# create docker container
start_time = time.time()
if self._upload_file_name:
container = self._client.containers.run(
image=self._image,
command=cmd,
detach=True,
working_dir="/workspace",
mem_limit='1024m',
volumes={abs_path: {'bind': '/workspace','mode': 'rw'},
upload_file_path: {'bind': f'/tmp/upload_files/{self._upload_file_name}','mode': 'rw'}},
)
else:
container = self._client.containers.run(
image=self._image,
command=cmd,
detach=True,
working_dir="/workspace",
mem_limit='10m',
volumes={abs_path: {'bind': '/workspace','mode': 'rw'}},
)
# hold for time_out seconds
while container.status != "exited" and time.time() - start_time < self._time_out:
container.reload()
# if time out, stop and remove container
if container.status != "exited":
container.stop()
container.remove()
return "TIMEOUT", 1, ""
# save log to file
logs = container.logs().decode("utf-8").rstrip()
with open(os.path.join(file_dir, f'log.txt'), 'w') as log_file:
log_file.write(logs)
new_len = len(logs)
logs = logs[self._log_len:]
self._log_len = new_len
exit_code = container.attrs["State"]["ExitCode"]
container.remove()
# save files to output space and rmv files in working space
output_dir = os.path.join(OUTPUT_DIR, f'output_{code_hash}')
self._output_dir = output_dir
# os.makedirs(output_dir, exist_ok=True)
# os.rename(file_path, os.path.join(file_dir, 'exec_code.py'))
# for f in os.listdir(abs_path):
# os.rename(os.path.join(abs_path, f), os.path.join(output_dir, f))
# os.rmdir(abs_path)
response = CodeToolResponse(exit_code, logs, output_dir)
return response
async def sync_to_sandbox(self, file: Union[str, Dict, FileStorage]) -> str:
if isinstance(file, str):
logger.info(f"Upload File As FilePath: {file}")
file_path = await self.upload_file(file)
else:
err_msg = f"Invalid file input type. Expected str, FileStorage, or Dict. Got {type(file)}"
logger.error(err_msg)
raise InputErrorException(err_msg)
return file_path
async def upload_file(self, file_path: str):
file_name = file_path.split("/")[-1] # Extract the file name from the path
self._upload_file_path = file_path
self._upload_file_name = file_name
return file_path
async def save_file(self):
output_dir = self._output_dir
file_path = self._file_path
file_dir = self._file_dir
abs_path = pathlib.Path(self._work_dir).absolute()
os.makedirs(output_dir, exist_ok=True)
os.rename(file_path, os.path.join(file_dir, 'exec_code.py'))
for f in os.listdir(abs_path):
os.rename(os.path.join(abs_path, f), os.path.join(output_dir, f))
os.rmdir(abs_path)
|