""" File System Operations Tool - Session-based file isolation Each session gets its own isolated folder for complete data separation. """ import json import logging import mimetypes import os from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) from agent.core.session import Event @dataclass class FileCreated: """Result of file creation""" path: str full_path: str created: bool size: int @dataclass class DirectoryListing: """Result of directory listing""" path: str items: List[Dict[str, Any]] @dataclass class FileTree: """File tree structure""" root: str session_id: str structure: List[Dict[str, Any]] class SessionFileOperator: """ File operations restricted to session folder. Cannot escape session boundaries. """ def __init__(self, session_folder: Path): self.session_folder = Path(session_folder).resolve() self.files_folder = self.session_folder / "files" self.documents_folder = self.session_folder / "documents" # Ensure folders exist self.files_folder.mkdir(parents=True, exist_ok=True) self.documents_folder.mkdir(parents=True, exist_ok=True) def _validate_path(self, relative_path: str) -> Path: """ Validate that the path is within session folder. Raises PermissionError if path escapes session boundaries. """ # Prevent path traversal if ".." in relative_path: raise PermissionError("Path traversal not allowed") # Normalize path target = (self.session_folder / relative_path).resolve() # Double-check file is within session folder try: target.relative_to(self.session_folder) except ValueError: raise PermissionError("Cannot access outside session folder") return target def _detect_language(self, extension: str) -> Optional[str]: """Detect programming language from file extension""" language_map = { ".py": "python", ".js": "javascript", ".ts": "typescript", ".jsx": "jsx", ".tsx": "tsx", ".html": "html", ".css": "css", ".scss": "scss", ".sass": "sass", ".json": "json", ".yaml": "yaml", ".yml": "yaml", ".md": "markdown", ".rs": "rust", ".go": "go", ".java": "java", ".kt": "kotlin", ".swift": "swift", ".cpp": "cpp", ".c": "c", ".h": "c", ".hpp": "cpp", ".rb": "ruby", ".php": "php", ".sh": "bash", ".sql": "sql", ".xml": "xml", ".dockerfile": "dockerfile", ".vue": "vue", ".svelte": "svelte", } return language_map.get(extension.lower()) async def create_file( self, relative_path: str, content: str = "", folder: str = "files" ) -> FileCreated: """Create file only within session folder.""" # Determine target folder if folder == "documents": base_folder = self.documents_folder else: base_folder = self.files_folder # Validate path if ".." in relative_path or relative_path.startswith("/"): raise PermissionError("Invalid path") target = base_folder / relative_path # Double-check file is within session folder if not str(target.resolve()).startswith(str(self.session_folder.resolve())): raise PermissionError("Cannot create outside session folder") # Create parent directories target.parent.mkdir(parents=True, exist_ok=True) # Write file target.write_text(content, encoding='utf-8') # Get relative path from session folder rel_path = str(target.relative_to(self.session_folder)) return FileCreated( path=rel_path, full_path=str(target), created=True, size=len(content.encode('utf-8')) ) async def read_file(self, relative_path: str) -> str: """Read file content from session.""" target = self._validate_path(relative_path) if not target.exists(): raise FileNotFoundError(f"File not found: {relative_path}") if not target.is_file(): raise IsADirectoryError(f"Path is a directory: {relative_path}") return target.read_text(encoding='utf-8') async def update_file(self, relative_path: str, content: str) -> FileCreated: """Update file content.""" target = self._validate_path(relative_path) if not target.exists(): raise FileNotFoundError(f"File not found: {relative_path}") # Write new content target.write_text(content, encoding='utf-8') rel_path = str(target.relative_to(self.session_folder)) return FileCreated( path=rel_path, full_path=str(target), created=True, size=len(content.encode('utf-8')) ) async def delete_file(self, relative_path: str) -> bool: """Delete file from session.""" target = self._validate_path(relative_path) if not target.exists(): raise FileNotFoundError(f"File not found: {relative_path}") if target.is_dir(): raise IsADirectoryError(f"Cannot delete directory with this method: {relative_path}") target.unlink() return True async def list_files(self, subfolder: str = "files") -> DirectoryListing: """List all files in session folder.""" if subfolder == "documents": target_folder = self.documents_folder else: target_folder = self.files_folder items = [] if target_folder.exists(): for item in sorted(target_folder.iterdir()): if item.name.startswith('.'): continue # Skip hidden files stat = item.stat() item_info = { "name": item.name, "type": "folder" if item.is_dir() else "file", "path": str(item.relative_to(self.session_folder)), "size": stat.st_size if item.is_file() else None, "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), } if item.is_file(): item_info["language"] = self._detect_language(item.suffix) items.append(item_info) return DirectoryListing( path=str(target_folder.relative_to(self.session_folder)), items=items ) async def get_file_tree(self, max_depth: int = 5) -> FileTree: """Get folder tree for this session only.""" def build_tree(path: Path, depth: int = 0) -> List[Dict[str, Any]]: if depth > max_depth: return [] items = [] if not path.exists(): return items for item in sorted(path.iterdir()): if item.name.startswith('.') or item.name in ['metadata.json', 'chat_history.json']: continue # Skip hidden and metadata files child = { "name": item.name, "type": "folder" if item.is_dir() else "file", "path": str(item.relative_to(self.session_folder)) } if item.is_dir(): children = build_tree(item, depth + 1) if children: child["children"] = children else: stat = item.stat() child["size"] = stat.st_size child["language"] = self._detect_language(item.suffix) child["modified"] = datetime.fromtimestamp(stat.st_mtime).isoformat() items.append(child) return items return FileTree( root=str(self.session_folder), session_id=self.session_folder.name, structure=build_tree(self.session_folder) ) async def create_directory(self, relative_path: str) -> bool: """Create a directory within session folder.""" target = self._validate_path(relative_path) target.mkdir(parents=True, exist_ok=True) return True async def get_file_info(self, relative_path: str) -> Dict[str, Any]: """Get detailed file information.""" target = self._validate_path(relative_path) if not target.exists(): raise FileNotFoundError(f"File not found: {relative_path}") stat = target.stat() info = { "name": target.name, "path": str(target.relative_to(self.session_folder)), "full_path": str(target), "type": "folder" if target.is_dir() else "file", "size": stat.st_size, "created": datetime.fromtimestamp(stat.st_ctime).isoformat(), "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), } if target.is_file(): info["language"] = self._detect_language(target.suffix) info["extension"] = target.suffix mime_type, _ = mimetypes.guess_type(str(target)) info["mime_type"] = mime_type return info # Tool specs for LLM CREATE_FILE_TOOL_SPEC = { "name": "create_file", "description": ( "Create a new file in the session workspace. " "Files are isolated to the current session and cannot be accessed by other sessions. " "Use this to: write code, create configuration files, generate documentation, etc." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Relative path for the file (e.g., 'src/main.py', 'README.md')", }, "content": { "type": "string", "description": "File content to write", }, "folder": { "type": "string", "enum": ["files", "documents"], "description": "Target folder (default: files)", }, }, "required": ["path", "content"], }, } READ_FILE_TOOL_SPEC = { "name": "read_file", "description": ( "Read the content of a file in the session workspace. " "Can only access files within the current session." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Relative path of the file to read", }, }, "required": ["path"], }, } LIST_FILES_TOOL_SPEC = { "name": "list_files", "description": ( "List all files in the session workspace. " "Returns files and folders with their metadata." ), "parameters": { "type": "object", "properties": { "folder": { "type": "string", "enum": ["files", "documents"], "description": "Target folder (default: files)", }, }, }, } # Tool handlers async def create_file_handler(arguments: Dict[str, Any], session=None) -> tuple[str, bool]: """Handler for creating files.""" try: path = arguments.get("path", "").strip() content = arguments.get("content", "") folder = arguments.get("folder", "files") if not path: return "Error: Path is required", False # Get session folder from session if session and hasattr(session, 'session_folder'): session_folder = Path(session.session_folder) else: # Fallback to temp directory import tempfile session_folder = Path(tempfile.gettempdir()) / "session_default" session_folder.mkdir(parents=True, exist_ok=True) fs = SessionFileOperator(session_folder) result = await fs.create_file(path, content, folder) # If a session object was provided, register generated file and emit event try: if session and hasattr(session, 'add_generated_file'): file_data = { "path": result.path, "full_path": result.full_path, "size": result.size, "created": result.created, "timestamp": datetime.utcnow().isoformat(), } # Track generated file on the session session.add_generated_file(file_data) # Emit file_generated event so front-end can pick it up if hasattr(session, 'send_event'): await session.send_event( Event(event_type="file_generated", data={"file": file_data}) ) except Exception: # Non-fatal: don't break file creation on event emission failure logger.exception("Failed to emit file_generated event") return ( f"✅ File created successfully\n" f"📄 Path: {result.path}\n" f"💾 Size: {result.size} bytes", True ) except PermissionError as e: return f"❌ Permission denied: {str(e)}", False except Exception as e: logger.error(f"File creation error: {e}") return f"❌ Error creating file: {str(e)}", False async def read_file_handler(arguments: Dict[str, Any], session=None) -> tuple[str, bool]: """Handler for reading files.""" try: path = arguments.get("path", "").strip() if not path: return "Error: Path is required", False # Get session folder from session if session and hasattr(session, 'session_folder'): session_folder = Path(session.session_folder) else: import tempfile session_folder = Path(tempfile.gettempdir()) / "session_default" fs = SessionFileOperator(session_folder) content = await fs.read_file(path) return content, True except FileNotFoundError: return f"❌ File not found: {path}", False except PermissionError as e: return f"❌ Permission denied: {str(e)}", False except Exception as e: logger.error(f"File read error: {e}") return f"❌ Error reading file: {str(e)}", False async def list_files_handler(arguments: Dict[str, Any], session=None) -> tuple[str, bool]: """Handler for listing files.""" try: folder = arguments.get("folder", "files") # Get session folder from session if session and hasattr(session, 'session_folder'): session_folder = Path(session.session_folder) else: import tempfile session_folder = Path(tempfile.gettempdir()) / "session_default" fs = SessionFileOperator(session_folder) listing = await fs.list_files(folder) if not listing.items: return f"📁 Folder '{listing.path}' is empty", True lines = [f"📁 Contents of '{listing.path}':", ""] for item in listing.items: icon = "📂" if item["type"] == "folder" else "📄" size_str = f" ({item['size']} bytes)" if item.get("size") else "" lines.append(f"{icon} {item['name']}{size_str}") return "\n".join(lines), True except Exception as e: logger.error(f"File listing error: {e}") return f"❌ Error listing files: {str(e)}", False