| | """ |
| | File System Operations Tool - Session-based file isolation |
| | |
| | Each session gets its own isolated folder for complete data separation. |
| | """ |
| |
|
| | import json |
| | import logging |
| | import mimetypes |
| | import os |
| | from dataclasses import dataclass |
| | from datetime import datetime |
| | from pathlib import Path |
| | from typing import Any, Dict, List, Optional |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | from agent.core.session import Event |
| |
|
| |
|
| | @dataclass |
| | class FileCreated: |
| | """Result of file creation""" |
| | path: str |
| | full_path: str |
| | created: bool |
| | size: int |
| |
|
| |
|
| | @dataclass |
| | class DirectoryListing: |
| | """Result of directory listing""" |
| | path: str |
| | items: List[Dict[str, Any]] |
| |
|
| |
|
| | @dataclass |
| | class FileTree: |
| | """File tree structure""" |
| | root: str |
| | session_id: str |
| | structure: List[Dict[str, Any]] |
| |
|
| |
|
| | class SessionFileOperator: |
| | """ |
| | File operations restricted to session folder. |
| | Cannot escape session boundaries. |
| | """ |
| | |
| | def __init__(self, session_folder: Path): |
| | self.session_folder = Path(session_folder).resolve() |
| | self.files_folder = self.session_folder / "files" |
| | self.documents_folder = self.session_folder / "documents" |
| | |
| | |
| | self.files_folder.mkdir(parents=True, exist_ok=True) |
| | self.documents_folder.mkdir(parents=True, exist_ok=True) |
| | |
| | def _validate_path(self, relative_path: str) -> Path: |
| | """ |
| | Validate that the path is within session folder. |
| | Raises PermissionError if path escapes session boundaries. |
| | """ |
| | |
| | if ".." in relative_path: |
| | raise PermissionError("Path traversal not allowed") |
| | |
| | |
| | target = (self.session_folder / relative_path).resolve() |
| | |
| | |
| | try: |
| | target.relative_to(self.session_folder) |
| | except ValueError: |
| | raise PermissionError("Cannot access outside session folder") |
| | |
| | return target |
| | |
| | def _detect_language(self, extension: str) -> Optional[str]: |
| | """Detect programming language from file extension""" |
| | language_map = { |
| | ".py": "python", |
| | ".js": "javascript", |
| | ".ts": "typescript", |
| | ".jsx": "jsx", |
| | ".tsx": "tsx", |
| | ".html": "html", |
| | ".css": "css", |
| | ".scss": "scss", |
| | ".sass": "sass", |
| | ".json": "json", |
| | ".yaml": "yaml", |
| | ".yml": "yaml", |
| | ".md": "markdown", |
| | ".rs": "rust", |
| | ".go": "go", |
| | ".java": "java", |
| | ".kt": "kotlin", |
| | ".swift": "swift", |
| | ".cpp": "cpp", |
| | ".c": "c", |
| | ".h": "c", |
| | ".hpp": "cpp", |
| | ".rb": "ruby", |
| | ".php": "php", |
| | ".sh": "bash", |
| | ".sql": "sql", |
| | ".xml": "xml", |
| | ".dockerfile": "dockerfile", |
| | ".vue": "vue", |
| | ".svelte": "svelte", |
| | } |
| | return language_map.get(extension.lower()) |
| | |
| | async def create_file( |
| | self, |
| | relative_path: str, |
| | content: str = "", |
| | folder: str = "files" |
| | ) -> FileCreated: |
| | """Create file only within session folder.""" |
| | |
| | |
| | if folder == "documents": |
| | base_folder = self.documents_folder |
| | else: |
| | base_folder = self.files_folder |
| | |
| | |
| | if ".." in relative_path or relative_path.startswith("/"): |
| | raise PermissionError("Invalid path") |
| | |
| | target = base_folder / relative_path |
| | |
| | |
| | if not str(target.resolve()).startswith(str(self.session_folder.resolve())): |
| | raise PermissionError("Cannot create outside session folder") |
| | |
| | |
| | target.parent.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | target.write_text(content, encoding='utf-8') |
| | |
| | |
| | rel_path = str(target.relative_to(self.session_folder)) |
| | |
| | return FileCreated( |
| | path=rel_path, |
| | full_path=str(target), |
| | created=True, |
| | size=len(content.encode('utf-8')) |
| | ) |
| | |
| | async def read_file(self, relative_path: str) -> str: |
| | """Read file content from session.""" |
| | |
| | target = self._validate_path(relative_path) |
| | |
| | if not target.exists(): |
| | raise FileNotFoundError(f"File not found: {relative_path}") |
| | |
| | if not target.is_file(): |
| | raise IsADirectoryError(f"Path is a directory: {relative_path}") |
| | |
| | return target.read_text(encoding='utf-8') |
| | |
| | async def update_file(self, relative_path: str, content: str) -> FileCreated: |
| | """Update file content.""" |
| | |
| | target = self._validate_path(relative_path) |
| | |
| | if not target.exists(): |
| | raise FileNotFoundError(f"File not found: {relative_path}") |
| | |
| | |
| | target.write_text(content, encoding='utf-8') |
| | |
| | rel_path = str(target.relative_to(self.session_folder)) |
| | |
| | return FileCreated( |
| | path=rel_path, |
| | full_path=str(target), |
| | created=True, |
| | size=len(content.encode('utf-8')) |
| | ) |
| | |
| | async def delete_file(self, relative_path: str) -> bool: |
| | """Delete file from session.""" |
| | |
| | target = self._validate_path(relative_path) |
| | |
| | if not target.exists(): |
| | raise FileNotFoundError(f"File not found: {relative_path}") |
| | |
| | if target.is_dir(): |
| | raise IsADirectoryError(f"Cannot delete directory with this method: {relative_path}") |
| | |
| | target.unlink() |
| | return True |
| | |
| | async def list_files(self, subfolder: str = "files") -> DirectoryListing: |
| | """List all files in session folder.""" |
| | |
| | if subfolder == "documents": |
| | target_folder = self.documents_folder |
| | else: |
| | target_folder = self.files_folder |
| | |
| | items = [] |
| | |
| | if target_folder.exists(): |
| | for item in sorted(target_folder.iterdir()): |
| | if item.name.startswith('.'): |
| | continue |
| | |
| | stat = item.stat() |
| | item_info = { |
| | "name": item.name, |
| | "type": "folder" if item.is_dir() else "file", |
| | "path": str(item.relative_to(self.session_folder)), |
| | "size": stat.st_size if item.is_file() else None, |
| | "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), |
| | } |
| | |
| | if item.is_file(): |
| | item_info["language"] = self._detect_language(item.suffix) |
| | |
| | items.append(item_info) |
| | |
| | return DirectoryListing( |
| | path=str(target_folder.relative_to(self.session_folder)), |
| | items=items |
| | ) |
| | |
| | async def get_file_tree(self, max_depth: int = 5) -> FileTree: |
| | """Get folder tree for this session only.""" |
| | |
| | def build_tree(path: Path, depth: int = 0) -> List[Dict[str, Any]]: |
| | if depth > max_depth: |
| | return [] |
| | |
| | items = [] |
| | if not path.exists(): |
| | return items |
| | |
| | for item in sorted(path.iterdir()): |
| | if item.name.startswith('.') or item.name in ['metadata.json', 'chat_history.json']: |
| | continue |
| | |
| | child = { |
| | "name": item.name, |
| | "type": "folder" if item.is_dir() else "file", |
| | "path": str(item.relative_to(self.session_folder)) |
| | } |
| | |
| | if item.is_dir(): |
| | children = build_tree(item, depth + 1) |
| | if children: |
| | child["children"] = children |
| | else: |
| | stat = item.stat() |
| | child["size"] = stat.st_size |
| | child["language"] = self._detect_language(item.suffix) |
| | child["modified"] = datetime.fromtimestamp(stat.st_mtime).isoformat() |
| | |
| | items.append(child) |
| | |
| | return items |
| | |
| | return FileTree( |
| | root=str(self.session_folder), |
| | session_id=self.session_folder.name, |
| | structure=build_tree(self.session_folder) |
| | ) |
| | |
| | async def create_directory(self, relative_path: str) -> bool: |
| | """Create a directory within session folder.""" |
| | |
| | target = self._validate_path(relative_path) |
| | |
| | target.mkdir(parents=True, exist_ok=True) |
| | |
| | return True |
| | |
| | async def get_file_info(self, relative_path: str) -> Dict[str, Any]: |
| | """Get detailed file information.""" |
| | |
| | target = self._validate_path(relative_path) |
| | |
| | if not target.exists(): |
| | raise FileNotFoundError(f"File not found: {relative_path}") |
| | |
| | stat = target.stat() |
| | |
| | info = { |
| | "name": target.name, |
| | "path": str(target.relative_to(self.session_folder)), |
| | "full_path": str(target), |
| | "type": "folder" if target.is_dir() else "file", |
| | "size": stat.st_size, |
| | "created": datetime.fromtimestamp(stat.st_ctime).isoformat(), |
| | "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), |
| | } |
| | |
| | if target.is_file(): |
| | info["language"] = self._detect_language(target.suffix) |
| | info["extension"] = target.suffix |
| | mime_type, _ = mimetypes.guess_type(str(target)) |
| | info["mime_type"] = mime_type |
| | |
| | return info |
| |
|
| |
|
| | |
| | CREATE_FILE_TOOL_SPEC = { |
| | "name": "create_file", |
| | "description": ( |
| | "Create a new file in the session workspace. " |
| | "Files are isolated to the current session and cannot be accessed by other sessions. " |
| | "Use this to: write code, create configuration files, generate documentation, etc." |
| | ), |
| | "parameters": { |
| | "type": "object", |
| | "properties": { |
| | "path": { |
| | "type": "string", |
| | "description": "Relative path for the file (e.g., 'src/main.py', 'README.md')", |
| | }, |
| | "content": { |
| | "type": "string", |
| | "description": "File content to write", |
| | }, |
| | "folder": { |
| | "type": "string", |
| | "enum": ["files", "documents"], |
| | "description": "Target folder (default: files)", |
| | }, |
| | }, |
| | "required": ["path", "content"], |
| | }, |
| | } |
| |
|
| | READ_FILE_TOOL_SPEC = { |
| | "name": "read_file", |
| | "description": ( |
| | "Read the content of a file in the session workspace. " |
| | "Can only access files within the current session." |
| | ), |
| | "parameters": { |
| | "type": "object", |
| | "properties": { |
| | "path": { |
| | "type": "string", |
| | "description": "Relative path of the file to read", |
| | }, |
| | }, |
| | "required": ["path"], |
| | }, |
| | } |
| |
|
| | LIST_FILES_TOOL_SPEC = { |
| | "name": "list_files", |
| | "description": ( |
| | "List all files in the session workspace. " |
| | "Returns files and folders with their metadata." |
| | ), |
| | "parameters": { |
| | "type": "object", |
| | "properties": { |
| | "folder": { |
| | "type": "string", |
| | "enum": ["files", "documents"], |
| | "description": "Target folder (default: files)", |
| | }, |
| | }, |
| | }, |
| | } |
| |
|
| |
|
| | |
| | async def create_file_handler(arguments: Dict[str, Any], session=None) -> tuple[str, bool]: |
| | """Handler for creating files.""" |
| | try: |
| | path = arguments.get("path", "").strip() |
| | content = arguments.get("content", "") |
| | folder = arguments.get("folder", "files") |
| | |
| | if not path: |
| | return "Error: Path is required", False |
| | |
| | |
| | if session and hasattr(session, 'session_folder'): |
| | session_folder = Path(session.session_folder) |
| | else: |
| | |
| | import tempfile |
| | session_folder = Path(tempfile.gettempdir()) / "session_default" |
| | session_folder.mkdir(parents=True, exist_ok=True) |
| | |
| | fs = SessionFileOperator(session_folder) |
| | result = await fs.create_file(path, content, folder) |
| |
|
| | |
| | try: |
| | if session and hasattr(session, 'add_generated_file'): |
| | file_data = { |
| | "path": result.path, |
| | "full_path": result.full_path, |
| | "size": result.size, |
| | "created": result.created, |
| | "timestamp": datetime.utcnow().isoformat(), |
| | } |
| | |
| | session.add_generated_file(file_data) |
| |
|
| | |
| | if hasattr(session, 'send_event'): |
| | await session.send_event( |
| | Event(event_type="file_generated", data={"file": file_data}) |
| | ) |
| | except Exception: |
| | |
| | logger.exception("Failed to emit file_generated event") |
| |
|
| | return ( |
| | f"β
File created successfully\n" |
| | f"π Path: {result.path}\n" |
| | f"πΎ Size: {result.size} bytes", |
| | True |
| | ) |
| | |
| | except PermissionError as e: |
| | return f"β Permission denied: {str(e)}", False |
| | except Exception as e: |
| | logger.error(f"File creation error: {e}") |
| | return f"β Error creating file: {str(e)}", False |
| |
|
| |
|
| | async def read_file_handler(arguments: Dict[str, Any], session=None) -> tuple[str, bool]: |
| | """Handler for reading files.""" |
| | try: |
| | path = arguments.get("path", "").strip() |
| | |
| | if not path: |
| | return "Error: Path is required", False |
| | |
| | |
| | if session and hasattr(session, 'session_folder'): |
| | session_folder = Path(session.session_folder) |
| | else: |
| | import tempfile |
| | session_folder = Path(tempfile.gettempdir()) / "session_default" |
| | |
| | fs = SessionFileOperator(session_folder) |
| | content = await fs.read_file(path) |
| | |
| | return content, True |
| | |
| | except FileNotFoundError: |
| | return f"β File not found: {path}", False |
| | except PermissionError as e: |
| | return f"β Permission denied: {str(e)}", False |
| | except Exception as e: |
| | logger.error(f"File read error: {e}") |
| | return f"β Error reading file: {str(e)}", False |
| |
|
| |
|
| | async def list_files_handler(arguments: Dict[str, Any], session=None) -> tuple[str, bool]: |
| | """Handler for listing files.""" |
| | try: |
| | folder = arguments.get("folder", "files") |
| | |
| | |
| | if session and hasattr(session, 'session_folder'): |
| | session_folder = Path(session.session_folder) |
| | else: |
| | import tempfile |
| | session_folder = Path(tempfile.gettempdir()) / "session_default" |
| | |
| | fs = SessionFileOperator(session_folder) |
| | listing = await fs.list_files(folder) |
| | |
| | if not listing.items: |
| | return f"π Folder '{listing.path}' is empty", True |
| | |
| | lines = [f"π Contents of '{listing.path}':", ""] |
| | |
| | for item in listing.items: |
| | icon = "π" if item["type"] == "folder" else "π" |
| | size_str = f" ({item['size']} bytes)" if item.get("size") else "" |
| | lines.append(f"{icon} {item['name']}{size_str}") |
| | |
| | return "\n".join(lines), True |
| | |
| | except Exception as e: |
| | logger.error(f"File listing error: {e}") |
| | return f"β Error listing files: {str(e)}", False |
| |
|