| import asyncio |
| from typing import Optional, TypeVar |
|
|
| from pydantic import Field |
|
|
| from app.daytona.tool_base import Sandbox, SandboxToolsBase |
| from app.tool.base import ToolResult |
| from app.utils.files_utils import clean_path, should_exclude_file |
| from app.utils.logger import logger |
|
|
|
|
| Context = TypeVar("Context") |
|
|
| _FILES_DESCRIPTION = """\ |
| A sandbox-based file system tool that allows file operations in a secure sandboxed environment. |
| * This tool provides commands for creating, reading, updating, and deleting files in the workspace |
| * All operations are performed relative to the /workspace directory for security |
| * Use this when you need to manage files, edit code, or manipulate file contents in a sandbox |
| * Each action requires specific parameters as defined in the tool's dependencies |
| Key capabilities include: |
| * File creation: Create new files with specified content and permissions |
| * File modification: Replace specific strings or completely rewrite files |
| * File deletion: Remove files from the workspace |
| * File reading: Read file contents with optional line range specification |
| """ |
|
|
|
|
| class SandboxFilesTool(SandboxToolsBase): |
| name: str = "sandbox_files" |
| description: str = _FILES_DESCRIPTION |
| parameters: dict = { |
| "type": "object", |
| "properties": { |
| "action": { |
| "type": "string", |
| "enum": [ |
| "create_file", |
| "str_replace", |
| "full_file_rewrite", |
| "delete_file", |
| ], |
| "description": "The file operation to perform", |
| }, |
| "file_path": { |
| "type": "string", |
| "description": "Path to the file, relative to /workspace (e.g., 'src/main.py')", |
| }, |
| "file_contents": { |
| "type": "string", |
| "description": "Content to write to the file", |
| }, |
| "old_str": { |
| "type": "string", |
| "description": "Text to be replaced (must appear exactly once)", |
| }, |
| "new_str": { |
| "type": "string", |
| "description": "Replacement text", |
| }, |
| "permissions": { |
| "type": "string", |
| "description": "File permissions in octal format (e.g., '644')", |
| "default": "644", |
| }, |
| }, |
| "required": ["action"], |
| "dependencies": { |
| "create_file": ["file_path", "file_contents"], |
| "str_replace": ["file_path", "old_str", "new_str"], |
| "full_file_rewrite": ["file_path", "file_contents"], |
| "delete_file": ["file_path"], |
| }, |
| } |
| SNIPPET_LINES: int = Field(default=4, exclude=True) |
| |
| |
|
|
| def __init__( |
| self, sandbox: Optional[Sandbox] = None, thread_id: Optional[str] = None, **data |
| ): |
| """Initialize with optional sandbox and thread_id.""" |
| super().__init__(**data) |
| if sandbox is not None: |
| self._sandbox = sandbox |
|
|
| def clean_path(self, path: str) -> str: |
| """Clean and normalize a path to be relative to /workspace""" |
| return clean_path(path, self.workspace_path) |
|
|
| def _should_exclude_file(self, rel_path: str) -> bool: |
| """Check if a file should be excluded based on path, name, or extension""" |
| return should_exclude_file(rel_path) |
|
|
| def _file_exists(self, path: str) -> bool: |
| """Check if a file exists in the sandbox""" |
| try: |
| self.sandbox.fs.get_file_info(path) |
| return True |
| except Exception: |
| return False |
|
|
| async def get_workspace_state(self) -> dict: |
| """Get the current workspace state by reading all files""" |
| files_state = {} |
| try: |
| |
| await self._ensure_sandbox() |
|
|
| files = self.sandbox.fs.list_files(self.workspace_path) |
| for file_info in files: |
| rel_path = file_info.name |
|
|
| |
| if self._should_exclude_file(rel_path) or file_info.is_dir: |
| continue |
|
|
| try: |
| full_path = f"{self.workspace_path}/{rel_path}" |
| content = self.sandbox.fs.download_file(full_path).decode() |
| files_state[rel_path] = { |
| "content": content, |
| "is_dir": file_info.is_dir, |
| "size": file_info.size, |
| "modified": file_info.mod_time, |
| } |
| except Exception as e: |
| print(f"Error reading file {rel_path}: {e}") |
| except UnicodeDecodeError: |
| print(f"Skipping binary file: {rel_path}") |
|
|
| return files_state |
|
|
| except Exception as e: |
| print(f"Error getting workspace state: {str(e)}") |
| return {} |
|
|
| async def execute( |
| self, |
| action: str, |
| file_path: Optional[str] = None, |
| file_contents: Optional[str] = None, |
| old_str: Optional[str] = None, |
| new_str: Optional[str] = None, |
| permissions: Optional[str] = "644", |
| **kwargs, |
| ) -> ToolResult: |
| """ |
| Execute a file operation in the sandbox environment. |
| Args: |
| action: The file operation to perform |
| file_path: Path to the file relative to /workspace |
| file_contents: Content to write to the file |
| old_str: Text to be replaced (for str_replace) |
| new_str: Replacement text (for str_replace) |
| permissions: File permissions in octal format |
| Returns: |
| ToolResult with the operation's output or error |
| """ |
| async with asyncio.Lock(): |
| try: |
| |
| if action == "create_file": |
| if not file_path or not file_contents: |
| return self.fail_response( |
| "file_path and file_contents are required for create_file" |
| ) |
| return await self._create_file( |
| file_path, file_contents, permissions |
| ) |
|
|
| |
| elif action == "str_replace": |
| if not file_path or not old_str or not new_str: |
| return self.fail_response( |
| "file_path, old_str, and new_str are required for str_replace" |
| ) |
| return await self._str_replace(file_path, old_str, new_str) |
|
|
| |
| elif action == "full_file_rewrite": |
| if not file_path or not file_contents: |
| return self.fail_response( |
| "file_path and file_contents are required for full_file_rewrite" |
| ) |
| return await self._full_file_rewrite( |
| file_path, file_contents, permissions |
| ) |
|
|
| |
| elif action == "delete_file": |
| if not file_path: |
| return self.fail_response( |
| "file_path is required for delete_file" |
| ) |
| return await self._delete_file(file_path) |
|
|
| else: |
| return self.fail_response(f"Unknown action: {action}") |
|
|
| except Exception as e: |
| logger.error(f"Error executing file action: {e}") |
| return self.fail_response(f"Error executing file action: {e}") |
|
|
| async def _create_file( |
| self, file_path: str, file_contents: str, permissions: str = "644" |
| ) -> ToolResult: |
| """Create a new file with the provided contents""" |
| try: |
| |
| await self._ensure_sandbox() |
|
|
| file_path = self.clean_path(file_path) |
| full_path = f"{self.workspace_path}/{file_path}" |
| if self._file_exists(full_path): |
| return self.fail_response( |
| f"File '{file_path}' already exists. Use full_file_rewrite to modify existing files." |
| ) |
|
|
| |
| parent_dir = "/".join(full_path.split("/")[:-1]) |
| if parent_dir: |
| self.sandbox.fs.create_folder(parent_dir, "755") |
|
|
| |
| self.sandbox.fs.upload_file(file_contents.encode(), full_path) |
| self.sandbox.fs.set_file_permissions(full_path, permissions) |
|
|
| message = f"File '{file_path}' created successfully." |
|
|
| |
| if file_path.lower() == "index.html": |
| try: |
| website_link = self.sandbox.get_preview_link(8080) |
| website_url = ( |
| website_link.url |
| if hasattr(website_link, "url") |
| else str(website_link).split("url='")[1].split("'")[0] |
| ) |
| message += f"\n\n[Auto-detected index.html - HTTP server available at: {website_url}]" |
| message += "\n[Note: Use the provided HTTP server URL above instead of starting a new server]" |
| except Exception as e: |
| logger.warning( |
| f"Failed to get website URL for index.html: {str(e)}" |
| ) |
|
|
| return self.success_response(message) |
| except Exception as e: |
| return self.fail_response(f"Error creating file: {str(e)}") |
|
|
| async def _str_replace( |
| self, file_path: str, old_str: str, new_str: str |
| ) -> ToolResult: |
| """Replace specific text in a file""" |
| try: |
| |
| await self._ensure_sandbox() |
|
|
| file_path = self.clean_path(file_path) |
| full_path = f"{self.workspace_path}/{file_path}" |
| if not self._file_exists(full_path): |
| return self.fail_response(f"File '{file_path}' does not exist") |
|
|
| content = self.sandbox.fs.download_file(full_path).decode() |
| old_str = old_str.expandtabs() |
| new_str = new_str.expandtabs() |
|
|
| occurrences = content.count(old_str) |
| if occurrences == 0: |
| return self.fail_response(f"String '{old_str}' not found in file") |
| if occurrences > 1: |
| lines = [ |
| i + 1 |
| for i, line in enumerate(content.split("\n")) |
| if old_str in line |
| ] |
| return self.fail_response( |
| f"Multiple occurrences found in lines {lines}. Please ensure string is unique" |
| ) |
|
|
| |
| new_content = content.replace(old_str, new_str) |
| self.sandbox.fs.upload_file(new_content.encode(), full_path) |
|
|
| |
| replacement_line = content.split(old_str)[0].count("\n") |
| start_line = max(0, replacement_line - self.SNIPPET_LINES) |
| end_line = replacement_line + self.SNIPPET_LINES + new_str.count("\n") |
| snippet = "\n".join(new_content.split("\n")[start_line : end_line + 1]) |
|
|
| message = f"Replacement successful." |
|
|
| return self.success_response(message) |
|
|
| except Exception as e: |
| return self.fail_response(f"Error replacing string: {str(e)}") |
|
|
| async def _full_file_rewrite( |
| self, file_path: str, file_contents: str, permissions: str = "644" |
| ) -> ToolResult: |
| """Completely rewrite an existing file with new content""" |
| try: |
| |
| await self._ensure_sandbox() |
|
|
| file_path = self.clean_path(file_path) |
| full_path = f"{self.workspace_path}/{file_path}" |
| if not self._file_exists(full_path): |
| return self.fail_response( |
| f"File '{file_path}' does not exist. Use create_file to create a new file." |
| ) |
|
|
| self.sandbox.fs.upload_file(file_contents.encode(), full_path) |
| self.sandbox.fs.set_file_permissions(full_path, permissions) |
|
|
| message = f"File '{file_path}' completely rewritten successfully." |
|
|
| |
| if file_path.lower() == "index.html": |
| try: |
| website_link = self.sandbox.get_preview_link(8080) |
| website_url = ( |
| website_link.url |
| if hasattr(website_link, "url") |
| else str(website_link).split("url='")[1].split("'")[0] |
| ) |
| message += f"\n\n[Auto-detected index.html - HTTP server available at: {website_url}]" |
| message += "\n[Note: Use the provided HTTP server URL above instead of starting a new server]" |
| except Exception as e: |
| logger.warning( |
| f"Failed to get website URL for index.html: {str(e)}" |
| ) |
|
|
| return self.success_response(message) |
| except Exception as e: |
| return self.fail_response(f"Error rewriting file: {str(e)}") |
|
|
| async def _delete_file(self, file_path: str) -> ToolResult: |
| """Delete a file at the given path""" |
| try: |
| |
| await self._ensure_sandbox() |
|
|
| file_path = self.clean_path(file_path) |
| full_path = f"{self.workspace_path}/{file_path}" |
| if not self._file_exists(full_path): |
| return self.fail_response(f"File '{file_path}' does not exist") |
|
|
| self.sandbox.fs.delete_file(full_path) |
| return self.success_response(f"File '{file_path}' deleted successfully.") |
| except Exception as e: |
| return self.fail_response(f"Error deleting file: {str(e)}") |
|
|
| async def cleanup(self): |
| """Clean up sandbox resources.""" |
|
|
| @classmethod |
| def create_with_context(cls, context: Context) -> "SandboxFilesTool[Context]": |
| """Factory method to create a SandboxFilesTool with a specific context.""" |
| raise NotImplementedError( |
| "create_with_context not implemented for SandboxFilesTool" |
| ) |
|
|