|
|
import io |
|
|
import shlex |
|
|
import tarfile |
|
|
import uuid |
|
|
import docker |
|
|
from pathlib import Path |
|
|
from typing import ClassVar, Dict, List, Optional |
|
|
from .interpreter_base import BaseInterpreter |
|
|
from .tool import Tool,Toolkit |
|
|
from .storage_handler import FileStorageHandler |
|
|
from pydantic import Field |
|
|
|
|
|
class DockerInterpreter(BaseInterpreter): |
|
|
""" |
|
|
A Docker-based interpreter for executing Python, Bash, and R scripts in an isolated environment. |
|
|
""" |
|
|
|
|
|
CODE_EXECUTE_CMD_MAPPING: ClassVar[Dict[str, str]] = { |
|
|
"python": "python {file_name}", |
|
|
} |
|
|
|
|
|
CODE_TYPE_MAPPING: ClassVar[Dict[str, str]] = { |
|
|
"python": "python", |
|
|
"py3": "python", |
|
|
"python3": "python", |
|
|
"py": "python", |
|
|
} |
|
|
|
|
|
require_confirm:bool = Field(default=False, description="Whether to require confirmation before executing code") |
|
|
print_stdout:bool = Field(default=True, description="Whether to print stdout") |
|
|
print_stderr:bool = Field(default=True, description="Whether to print stderr") |
|
|
host_directory:str = Field(default="", description="The path to the host directory to use for the container") |
|
|
container_directory:str = Field(default="/home/app/", description="The directory to use for the container") |
|
|
container_command:str = Field(default="tail -f /dev/null", description="The command to use for the container") |
|
|
tmp_directory:str = Field(default="/tmp", description="The directory to use for the container") |
|
|
image_tag:Optional[str] = Field(default=None, description="The Docker image tag to use") |
|
|
dockerfile_path:Optional[str] = Field(default=None, description="Path to the Dockerfile to build") |
|
|
auto_cleanup:bool = Field(default=True, description="Whether to automatically cleanup container on cleanup() call") |
|
|
auto_destroy:bool = Field(default=True, description="Whether to automatically cleanup container on object destruction") |
|
|
|
|
|
class Config: |
|
|
arbitrary_types_allowed = True |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
name:str = "DockerInterpreter", |
|
|
image_tag:Optional[str] = None, |
|
|
dockerfile_path:Optional[str] = None, |
|
|
require_confirm:bool = False, |
|
|
print_stdout:bool = True, |
|
|
print_stderr:bool = True, |
|
|
host_directory:str = "", |
|
|
container_directory:str = "/home/app/", |
|
|
container_command:str = "tail -f /dev/null", |
|
|
tmp_directory:str = "/tmp", |
|
|
storage_handler: FileStorageHandler = None, |
|
|
auto_cleanup:bool = True, |
|
|
auto_destroy:bool = True, |
|
|
**data |
|
|
): |
|
|
""" |
|
|
Initialize a Docker-based interpreter for executing code in an isolated environment. |
|
|
|
|
|
Args: |
|
|
name (str): The name of the interpreter |
|
|
image_tag (str, optional): The Docker image tag to use. Must be provided if dockerfile_path is not. |
|
|
dockerfile_path (str, optional): Path to the Dockerfile to build. Must be provided if image_tag is not. |
|
|
require_confirm (bool): Whether to require confirmation before executing code |
|
|
print_stdout (bool): Whether to print stdout from code execution |
|
|
print_stderr (bool): Whether to print stderr from code execution |
|
|
host_directory (str): The path to the host directory to mount in the container |
|
|
container_directory (str): The target directory inside the container |
|
|
container_command (str): The command to run in the container |
|
|
tmp_directory (str): The temporary directory to use for file creation in the container |
|
|
**data: Additional data to pass to the parent class |
|
|
""" |
|
|
|
|
|
super().__init__(name=name, **data) |
|
|
|
|
|
self.require_confirm = require_confirm |
|
|
self.print_stdout = print_stdout |
|
|
self.print_stderr = print_stderr |
|
|
self.host_directory = host_directory |
|
|
self.container_directory = container_directory |
|
|
self.container_command = container_command |
|
|
self.tmp_directory = tmp_directory |
|
|
|
|
|
|
|
|
self.client = docker.from_env() |
|
|
self.container = None |
|
|
self.image_tag = image_tag |
|
|
self.dockerfile_path = dockerfile_path |
|
|
self.storage_handler = storage_handler |
|
|
self.auto_cleanup = auto_cleanup |
|
|
self.auto_destroy = auto_destroy |
|
|
self._initialize_if_needed() |
|
|
|
|
|
|
|
|
if self.host_directory: |
|
|
self._upload_directory_to_container(self.host_directory) |
|
|
|
|
|
def __del__(self): |
|
|
try: |
|
|
if hasattr(self, 'auto_destroy') and self.auto_destroy and hasattr(self, 'container') and self.container is not None: |
|
|
self.container.remove(force=True) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def __enter__(self): |
|
|
return self |
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
|
self.cleanup() |
|
|
|
|
|
def cleanup(self): |
|
|
"""Explicitly clean up the container and Docker client.""" |
|
|
if self.auto_cleanup: |
|
|
try: |
|
|
if hasattr(self, 'container') and self.container is not None: |
|
|
self.container.remove(force=True) |
|
|
self.container = None |
|
|
except Exception: |
|
|
pass |
|
|
try: |
|
|
if hasattr(self, 'client') and self.client is not None: |
|
|
self.client.close() |
|
|
self.client = None |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def _initialize_if_needed(self): |
|
|
image_tag = self.image_tag |
|
|
dockerfile_path = self.dockerfile_path |
|
|
if image_tag: |
|
|
try: |
|
|
|
|
|
self.client.images.get(image_tag) |
|
|
except Exception as e: |
|
|
raise ValueError(f"Image provided in image_tag but not found: {e}") |
|
|
else: |
|
|
|
|
|
if not dockerfile_path: |
|
|
raise ValueError("dockerfile_path or image_tag must be provided to build the image") |
|
|
|
|
|
dockerfile_path = Path(dockerfile_path) |
|
|
if not dockerfile_path.exists(): |
|
|
raise FileNotFoundError(f"Dockerfile not found at provided path: {dockerfile_path}") |
|
|
|
|
|
dockerfile_dir = dockerfile_path.parent |
|
|
self.client.images.build(path=str(dockerfile_dir), tag=image_tag, rm=True, buildargs={}) |
|
|
|
|
|
|
|
|
try: |
|
|
self.client.ping() |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Docker daemon is not running: {e}") |
|
|
|
|
|
|
|
|
self.container = self.client.containers.run( |
|
|
image_tag, |
|
|
detach=True, |
|
|
command=self.container_command, |
|
|
working_dir=self.container_directory |
|
|
) |
|
|
|
|
|
def _upload_directory_to_container(self, host_directory: str): |
|
|
""" |
|
|
Uploads all files and directories from the given host directory to the container directory. |
|
|
|
|
|
:param host_directory: Path to the local directory containing files to upload. |
|
|
:param container_directory: Target directory inside the container (defaults to self.container_directory). |
|
|
""" |
|
|
host_directory = Path(host_directory).resolve() |
|
|
if not host_directory.exists() or not host_directory.is_dir(): |
|
|
raise FileNotFoundError(f"Directory not found: {host_directory}") |
|
|
|
|
|
tar_stream = io.BytesIO() |
|
|
|
|
|
with tarfile.open(fileobj=tar_stream, mode="w") as tar: |
|
|
for file_path in host_directory.rglob("*"): |
|
|
if file_path.is_file(): |
|
|
|
|
|
relative_path = file_path.relative_to(host_directory) |
|
|
target_path = Path(self.container_directory) / relative_path |
|
|
|
|
|
tarinfo = tarfile.TarInfo(name=str(target_path.relative_to(self.container_directory))) |
|
|
tarinfo.size = file_path.stat().st_size |
|
|
with open(file_path, "rb") as f: |
|
|
tar.addfile(tarinfo, f) |
|
|
|
|
|
tar_stream.seek(0) |
|
|
|
|
|
if self.container is None: |
|
|
raise RuntimeError("Container is not initialized.") |
|
|
|
|
|
self.container.put_archive(self.container_directory, tar_stream) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _create_file_in_container(self, content: str) -> Path: |
|
|
filename = str(uuid.uuid4()) |
|
|
tar_stream = io.BytesIO() |
|
|
with tarfile.open(fileobj=tar_stream, mode='w') as tar: |
|
|
tarinfo = tarfile.TarInfo(name=filename) |
|
|
tarinfo.size = len(content.encode('utf-8')) |
|
|
tar.addfile(tarinfo, io.BytesIO(content.encode('utf-8'))) |
|
|
tar_stream.seek(0) |
|
|
|
|
|
if self.container is None: |
|
|
raise RuntimeError("Container is not initialized.") |
|
|
|
|
|
try: |
|
|
self.container.put_archive(self.tmp_directory, tar_stream) |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to create file in container: {e}") |
|
|
|
|
|
return Path(f"{self.tmp_directory}/{filename}") |
|
|
|
|
|
def _run_file_in_container(self, file: Path, language: str) -> str: |
|
|
"""Execute a file in the container with timeout and security checks.""" |
|
|
if not self.container: |
|
|
raise RuntimeError("Container is not initialized") |
|
|
|
|
|
|
|
|
container_info = self.client.api.inspect_container(self.container.id) |
|
|
if not container_info['State']['Running']: |
|
|
raise RuntimeError("Container is not running") |
|
|
|
|
|
language = self._check_language(language) |
|
|
command = shlex.split(self.CODE_EXECUTE_CMD_MAPPING[language].format(file_name=file.as_posix())) |
|
|
if self.container is None: |
|
|
raise RuntimeError("Container is not initialized.") |
|
|
result = self.container.exec_run(command, demux=True) |
|
|
|
|
|
stdout, stderr = result.output |
|
|
if self.print_stdout and stdout: |
|
|
print(stdout.decode()) |
|
|
if self.print_stderr and stderr: |
|
|
print(stderr.decode()) |
|
|
|
|
|
stdout_str = stdout.decode() if stdout else "" |
|
|
stderr_str = stderr.decode() if stderr else "" |
|
|
return stdout_str + stderr_str |
|
|
|
|
|
def execute(self, code: str, language: str) -> str: |
|
|
""" |
|
|
Executes code in a Docker container. |
|
|
|
|
|
Args: |
|
|
code (str): The code to execute |
|
|
language (str): The programming language to use |
|
|
|
|
|
Returns: |
|
|
str: The execution output |
|
|
|
|
|
Raises: |
|
|
RuntimeError: If container is not properly initialized or execution fails |
|
|
ValueError: If code content is invalid or exceeds limits |
|
|
""" |
|
|
if not code or not code.strip(): |
|
|
raise ValueError("Code content cannot be empty") |
|
|
|
|
|
if not self.container: |
|
|
raise RuntimeError("Container is not initialized") |
|
|
|
|
|
|
|
|
try: |
|
|
container_info = self.client.api.inspect_container(self.container.id) |
|
|
if not container_info['State']['Running']: |
|
|
raise RuntimeError("Container is not running") |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to check container status: {e}") |
|
|
|
|
|
if self.host_directory: |
|
|
code = f"import sys; sys.path.insert(0, '{self.container_directory}');" + code |
|
|
|
|
|
language = self._check_language(language) |
|
|
|
|
|
if self.require_confirm: |
|
|
confirmation = input(f"Confirm execution of {language} code? [Y/n]: ") |
|
|
if confirmation.lower() not in ["y", "yes", ""]: |
|
|
raise RuntimeError("Execution aborted by user.") |
|
|
|
|
|
try: |
|
|
file_path = self._create_file_in_container(code) |
|
|
return self._run_file_in_container(file_path, language) |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Code execution failed: {e}") |
|
|
finally: |
|
|
|
|
|
try: |
|
|
if hasattr(self, 'container') and self.container: |
|
|
self.container.exec_run(f"rm -f {file_path}") |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def execute_script(self, file_path: str, language: str = None) -> str: |
|
|
""" |
|
|
Reads code from a file and executes it in a Docker container. |
|
|
|
|
|
Args: |
|
|
file_path (str): The path to the script file to execute |
|
|
language (str, optional): The programming language of the code. If None, will be determined from the file extension. |
|
|
|
|
|
Returns: |
|
|
str: The execution output |
|
|
|
|
|
Raises: |
|
|
FileNotFoundError: If the script file does not exist |
|
|
RuntimeError: If container is not properly initialized or execution fails |
|
|
ValueError: If file content is invalid or exceeds limits |
|
|
""" |
|
|
|
|
|
result = self.storage_handler.read(file_path) |
|
|
if result["success"]: |
|
|
code = result["content"] |
|
|
else: |
|
|
raise RuntimeError(f"Could not read file '{file_path}': {result.get('error', 'Unknown error')}") |
|
|
|
|
|
|
|
|
return self.execute(code, language) |
|
|
|
|
|
def _check_language(self, language: str) -> str: |
|
|
if language not in self.CODE_TYPE_MAPPING: |
|
|
raise ValueError(f"Unsupported language: {language}") |
|
|
return self.CODE_TYPE_MAPPING[language] |
|
|
|
|
|
|
|
|
class DockerExecuteTool(Tool): |
|
|
name: str = "docker_execute" |
|
|
description: str = "Execute code in a secure Docker container environment" |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"code": { |
|
|
"type": "string", |
|
|
"description": "The code to execute" |
|
|
}, |
|
|
"language": { |
|
|
"type": "string", |
|
|
"description": "The programming language of the code (e.g., python, py, python3)" |
|
|
} |
|
|
} |
|
|
required: Optional[List[str]] = ["code", "language"] |
|
|
|
|
|
def __init__(self, docker_interpreter: DockerInterpreter = None): |
|
|
super().__init__() |
|
|
self.docker_interpreter = docker_interpreter |
|
|
|
|
|
def __call__(self, code: str, language: str) -> str: |
|
|
"""Execute code using the Docker interpreter.""" |
|
|
if not self.docker_interpreter: |
|
|
raise RuntimeError("Docker interpreter not initialized") |
|
|
|
|
|
try: |
|
|
return self.docker_interpreter.execute(code, language) |
|
|
except Exception as e: |
|
|
return f"Error executing code: {str(e)}" |
|
|
|
|
|
|
|
|
class DockerExecuteScriptTool(Tool): |
|
|
name: str = "docker_execute_script" |
|
|
description: str = "Execute code from a script file in a secure Docker container environment" |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"file_path": { |
|
|
"type": "string", |
|
|
"description": "The path to the script file to execute" |
|
|
}, |
|
|
"language": { |
|
|
"type": "string", |
|
|
"description": "The programming language of the code. If not provided, will be determined from file extension" |
|
|
} |
|
|
} |
|
|
required: Optional[List[str]] = ["file_path", "language"] |
|
|
|
|
|
def __init__(self, docker_interpreter: DockerInterpreter = None): |
|
|
super().__init__() |
|
|
self.docker_interpreter = docker_interpreter |
|
|
|
|
|
def __call__(self, file_path: str, language: str) -> str: |
|
|
"""Execute script file using the Docker interpreter.""" |
|
|
if not self.docker_interpreter: |
|
|
raise RuntimeError("Docker interpreter not initialized") |
|
|
|
|
|
try: |
|
|
return self.docker_interpreter.execute_script(file_path, language) |
|
|
except Exception as e: |
|
|
return f"Error executing script: {str(e)}" |
|
|
|
|
|
|
|
|
class DockerInterpreterToolkit(Toolkit): |
|
|
def __init__( |
|
|
self, |
|
|
name: str = "DockerInterpreterToolkit", |
|
|
image_tag: Optional[str] = None, |
|
|
dockerfile_path: Optional[str] = None, |
|
|
require_confirm: bool = False, |
|
|
print_stdout: bool = True, |
|
|
print_stderr: bool = True, |
|
|
host_directory: str = "", |
|
|
container_directory: str = "/home/app/", |
|
|
container_command: str = "tail -f /dev/null", |
|
|
tmp_directory: str = "/tmp", |
|
|
storage_handler: FileStorageHandler = None, |
|
|
auto_cleanup: bool = True, |
|
|
auto_destroy: bool = True, |
|
|
**kwargs |
|
|
): |
|
|
|
|
|
if storage_handler is None: |
|
|
from .storage_handler import LocalStorageHandler |
|
|
storage_handler = LocalStorageHandler(base_path="./workplace/docker") |
|
|
|
|
|
|
|
|
docker_interpreter = DockerInterpreter( |
|
|
name="DockerInterpreter", |
|
|
image_tag=image_tag, |
|
|
dockerfile_path=dockerfile_path, |
|
|
require_confirm=require_confirm, |
|
|
print_stdout=print_stdout, |
|
|
print_stderr=print_stderr, |
|
|
host_directory=host_directory, |
|
|
container_directory=container_directory, |
|
|
container_command=container_command, |
|
|
tmp_directory=tmp_directory, |
|
|
storage_handler=storage_handler, |
|
|
auto_cleanup=auto_cleanup, |
|
|
auto_destroy=auto_destroy, |
|
|
**kwargs |
|
|
) |
|
|
|
|
|
|
|
|
tools = [ |
|
|
DockerExecuteTool(docker_interpreter=docker_interpreter), |
|
|
DockerExecuteScriptTool(docker_interpreter=docker_interpreter) |
|
|
] |
|
|
|
|
|
|
|
|
super().__init__(name=name, tools=tools) |
|
|
|
|
|
|
|
|
self.docker_interpreter = docker_interpreter |
|
|
self.storage_handler = storage_handler |
|
|
self.auto_cleanup = auto_cleanup |
|
|
self.auto_destroy = auto_destroy |
|
|
|
|
|
def cleanup(self): |
|
|
"""Clean up the Docker interpreter and storage handler.""" |
|
|
try: |
|
|
if hasattr(self, 'auto_cleanup') and self.auto_cleanup: |
|
|
if hasattr(self, 'docker_interpreter') and self.docker_interpreter: |
|
|
self.docker_interpreter.cleanup() |
|
|
if hasattr(self, 'storage_handler') and self.storage_handler: |
|
|
try: |
|
|
self.storage_handler.cleanup() |
|
|
except Exception: |
|
|
pass |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def __del__(self): |
|
|
"""Cleanup when toolkit is destroyed.""" |
|
|
try: |
|
|
if hasattr(self, 'auto_destroy') and self.auto_destroy: |
|
|
self.cleanup() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|