InfiAgent / src /infiagent /tools /code_tool_docker.py
g3eIL's picture
Upload 80 files
77320e4 verified
import os
import pathlib
from typing import Tuple, Optional, IO, Union, Dict
import time
from hashlib import md5
import docker
from ..tools.base_tool import BaseTool, BaseToolRequest, BaseToolResponse
import re
from ..exceptions.exceptions import InputErrorException, SandBoxFileUploadException
from werkzeug.datastructures import FileStorage
from ..utils import get_logger
logger = get_logger()
try:
import docker
except ImportError:
docker = None
WORKING_DIR = os.path.join(os.getcwd(), "tmp/code_space")
OUTPUT_DIR = os.path.join(os.getcwd(), "tmp/output_space")
UPLOAD_PATH = os.path.join(os.getcwd(), "tmp/upload_files")
class CodeToolRequest(BaseToolRequest):
"""
Request for Code Tool
"""
def __init__(self, code_str: str):
# code_str = 'import pandas as pd\nimport numpy as np\n'+ code_str
code_blocks = re.findall(r'```(?:python)?\s*(.*?)\s*```', code_str, re.DOTALL)
python_code_cleaned = '\n'.join(code_blocks).strip()
self.code = python_code_cleaned
class PythonSandBoxToolResponseDocker:
def __init__(self, formatter, raw_output) -> None:
self.formatter = formatter
self.raw_output = raw_output
@property
def output_text(self):
return self.formatter.format(self.raw_output)
class CodeToolResponse(BaseToolResponse):
"""
Response for Code Tool
"""
def __init__(self, exit_code: int, log: str, output_dir: str):
self.exit_code = exit_code
self.log = log
self.output_dir = output_dir
self.output_text = log
def to_dict(self):
return {
"exit_code": self.exit_code,
"log": self.log,
"output_dir": self.output_dir
}
class CodeTool(BaseTool):
"""
Code Tool for code execution
"""
def __init__(self,
name: Optional[str] = "Code Tool",
description: Optional[str] = "tool for code_exec",
# code_tool_id: Optional[str] = "code",
image: Optional[str] = "myimg",
time_out: Optional[int] = 60,
work_dir: Optional[str] = WORKING_DIR,
output_dir: Optional[str] = OUTPUT_DIR,
**kwargs
):
super().__init__(name, description, **kwargs)
self._client = docker.from_env()
self._image = image
self._time_out = time_out
self._work_dir = work_dir
self._output_dir = output_dir
self._upload_file_name = None
self._upload_file_path = None
self._code_idx = md5(str(time.time()).encode()).digest().hex()
self._log_len = 0
@classmethod
async def create(cls, config_data, **params):
# Unpack the config_data dictionary and any additional parameters
instance = cls(name=config_data['name'], description=config_data['description'], **params)
return instance
async def set_sandbox_id(self, sandbox_id):
self._sandbox_id = sandbox_id
@property
def sandbox_id(self):
"""Getter for sandbox_id."""
return self._sandbox_id if self._sandbox_id else None
async def async_run(self, req: str):
req = CodeToolRequest(req)
code = req.code
if code is None:
return "No code to execute", 1, ""
# path and file name for python script
abs_path = pathlib.Path(self._work_dir).absolute()
code_hash = self._code_idx
file_name = f"exec_code_{code_hash}.py"
file_path = os.path.join(self._work_dir, file_name)
self._file_path = file_path
file_dir = os.path.dirname(file_path)
self._file_dir = file_dir
os.makedirs(file_dir, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
if self._upload_file_name:
upload_file_path = os.path.join(UPLOAD_PATH, self._upload_file_name)
# write code to file
with open(file_path, "a", encoding="utf-8") as fout:
fout.write(code)
cmd = f'python3 {file_name}'
# create docker container
start_time = time.time()
if self._upload_file_name:
container = self._client.containers.run(
image=self._image,
command=cmd,
detach=True,
working_dir="/workspace",
mem_limit='1024m',
volumes={abs_path: {'bind': '/workspace','mode': 'rw'},
upload_file_path: {'bind': f'/tmp/upload_files/{self._upload_file_name}','mode': 'rw'}},
)
else:
container = self._client.containers.run(
image=self._image,
command=cmd,
detach=True,
working_dir="/workspace",
mem_limit='10m',
volumes={abs_path: {'bind': '/workspace','mode': 'rw'}},
)
# hold for time_out seconds
while container.status != "exited" and time.time() - start_time < self._time_out:
container.reload()
# if time out, stop and remove container
if container.status != "exited":
container.stop()
container.remove()
return "TIMEOUT", 1, ""
# save log to file
logs = container.logs().decode("utf-8").rstrip()
with open(os.path.join(file_dir, f'log.txt'), 'w') as log_file:
log_file.write(logs)
new_len = len(logs)
logs = logs[self._log_len:]
self._log_len = new_len
exit_code = container.attrs["State"]["ExitCode"]
container.remove()
# save files to output space and rmv files in working space
output_dir = os.path.join(OUTPUT_DIR, f'output_{code_hash}')
self._output_dir = output_dir
# os.makedirs(output_dir, exist_ok=True)
# os.rename(file_path, os.path.join(file_dir, 'exec_code.py'))
# for f in os.listdir(abs_path):
# os.rename(os.path.join(abs_path, f), os.path.join(output_dir, f))
# os.rmdir(abs_path)
response = CodeToolResponse(exit_code, logs, output_dir)
return response
async def sync_to_sandbox(self, file: Union[str, Dict, FileStorage]) -> str:
if isinstance(file, str):
logger.info(f"Upload File As FilePath: {file}")
file_path = await self.upload_file(file)
else:
err_msg = f"Invalid file input type. Expected str, FileStorage, or Dict. Got {type(file)}"
logger.error(err_msg)
raise InputErrorException(err_msg)
return file_path
async def upload_file(self, file_path: str):
file_name = file_path.split("/")[-1] # Extract the file name from the path
self._upload_file_path = file_path
self._upload_file_name = file_name
return file_path
async def save_file(self):
output_dir = self._output_dir
file_path = self._file_path
file_dir = self._file_dir
abs_path = pathlib.Path(self._work_dir).absolute()
os.makedirs(output_dir, exist_ok=True)
os.rename(file_path, os.path.join(file_dir, 'exec_code.py'))
for f in os.listdir(abs_path):
os.rename(os.path.join(abs_path, f), os.path.join(output_dir, f))
os.rmdir(abs_path)