import asyncio import os import re import shutil from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Any from pydantic import BaseModel, Field from reportlab.lib.pagesizes import letter from reportlab.lib.styles import getSampleStyleSheet from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer INVALID_FILENAME_ERROR_MESSAGE = 'Error: Invalid filename format. Must be alphanumeric with supported extension.' DEFAULT_FILE_SYSTEM_PATH = 'browseruse_agent_data' class FileSystemError(Exception): """Custom exception for file system operations that should be shown to LLM""" pass class BaseFile(BaseModel, ABC): """Base class for all file types""" name: str content: str = '' # --- Subclass must define this --- @property @abstractmethod def extension(self) -> str: """File extension (e.g. 'txt', 'md')""" pass def write_file_content(self, content: str) -> None: """Update internal content (formatted)""" self.update_content(content) def append_file_content(self, content: str) -> None: """Append content to internal content""" self.update_content(self.content + content) # --- These are shared and implemented here --- def update_content(self, content: str) -> None: self.content = content def sync_to_disk_sync(self, path: Path) -> None: file_path = path / self.full_name file_path.write_text(self.content) async def sync_to_disk(self, path: Path) -> None: file_path = path / self.full_name with ThreadPoolExecutor() as executor: await asyncio.get_event_loop().run_in_executor(executor, lambda: file_path.write_text(self.content)) async def write(self, content: str, path: Path) -> None: self.write_file_content(content) await self.sync_to_disk(path) async def append(self, content: str, path: Path) -> None: self.append_file_content(content) await self.sync_to_disk(path) def read(self) -> str: return self.content @property def full_name(self) -> str: return f'{self.name}.{self.extension}' @property def get_size(self) -> int: return len(self.content) @property def get_line_count(self) -> int: return len(self.content.splitlines()) class MarkdownFile(BaseFile): """Markdown file implementation""" @property def extension(self) -> str: return 'md' class TxtFile(BaseFile): """Plain text file implementation""" @property def extension(self) -> str: return 'txt' class JsonFile(BaseFile): """JSON file implementation""" @property def extension(self) -> str: return 'json' class CsvFile(BaseFile): """CSV file implementation""" @property def extension(self) -> str: return 'csv' class JsonlFile(BaseFile): """JSONL (JSON Lines) file implementation""" @property def extension(self) -> str: return 'jsonl' class PdfFile(BaseFile): """PDF file implementation""" @property def extension(self) -> str: return 'pdf' def sync_to_disk_sync(self, path: Path) -> None: file_path = path / self.full_name try: # Create PDF document doc = SimpleDocTemplate(str(file_path), pagesize=letter) styles = getSampleStyleSheet() story = [] # Convert markdown content to simple text and add to PDF # For basic implementation, we'll treat content as plain text # This avoids the AGPL license issue while maintaining functionality content_lines = self.content.split('\n') for line in content_lines: if line.strip(): # Handle basic markdown headers if line.startswith('# '): para = Paragraph(line[2:], styles['Title']) elif line.startswith('## '): para = Paragraph(line[3:], styles['Heading1']) elif line.startswith('### '): para = Paragraph(line[4:], styles['Heading2']) else: para = Paragraph(line, styles['Normal']) story.append(para) else: story.append(Spacer(1, 6)) doc.build(story) except Exception as e: raise FileSystemError(f"Error: Could not write to file '{self.full_name}'. {str(e)}") async def sync_to_disk(self, path: Path) -> None: with ThreadPoolExecutor() as executor: await asyncio.get_event_loop().run_in_executor(executor, lambda: self.sync_to_disk_sync(path)) class FileSystemState(BaseModel): """Serializable state of the file system""" files: dict[str, dict[str, Any]] = Field(default_factory=dict) # full filename -> file data base_dir: str extracted_content_count: int = 0 class FileSystem: """Enhanced file system with in-memory storage and multiple file type support""" def __init__(self, base_dir: str | Path, create_default_files: bool = True): # Handle the Path conversion before calling super().__init__ self.base_dir = Path(base_dir) if isinstance(base_dir, str) else base_dir self.base_dir.mkdir(parents=True, exist_ok=True) # Create and use a dedicated subfolder for all operations self.data_dir = self.base_dir / DEFAULT_FILE_SYSTEM_PATH if self.data_dir.exists(): # clean the data directory shutil.rmtree(self.data_dir) self.data_dir.mkdir(exist_ok=True) self._file_types: dict[str, type[BaseFile]] = { 'md': MarkdownFile, 'txt': TxtFile, 'json': JsonFile, 'jsonl': JsonlFile, 'csv': CsvFile, 'pdf': PdfFile, } self.files = {} if create_default_files: self.default_files = ['todo.md'] self._create_default_files() self.extracted_content_count = 0 def get_allowed_extensions(self) -> list[str]: """Get allowed extensions""" return list(self._file_types.keys()) def _get_file_type_class(self, extension: str) -> type[BaseFile] | None: """Get the appropriate file class for an extension.""" return self._file_types.get(extension.lower(), None) def _create_default_files(self) -> None: """Create default results and todo files""" for full_filename in self.default_files: name_without_ext, extension = self._parse_filename(full_filename) file_class = self._get_file_type_class(extension) if not file_class: raise ValueError(f"Error: Invalid file extension '{extension}' for file '{full_filename}'.") file_obj = file_class(name=name_without_ext) self.files[full_filename] = file_obj # Use full filename as key file_obj.sync_to_disk_sync(self.data_dir) def _is_valid_filename(self, file_name: str) -> bool: """Check if filename matches the required pattern: name.extension""" # Build extensions pattern from _file_types extensions = '|'.join(self._file_types.keys()) pattern = rf'^[a-zA-Z0-9_\-\u4e00-\u9fff]+\.({extensions})$' file_name_base = os.path.basename(file_name) return bool(re.match(pattern, file_name_base)) def _parse_filename(self, filename: str) -> tuple[str, str]: """Parse filename into name and extension. Always check _is_valid_filename first.""" name, extension = filename.rsplit('.', 1) return name, extension.lower() def get_dir(self) -> Path: """Get the file system directory""" return self.data_dir def get_file(self, full_filename: str) -> BaseFile | None: """Get a file object by full filename""" if not self._is_valid_filename(full_filename): return None # Use full filename as key return self.files.get(full_filename) def list_files(self) -> list[str]: """List all files in the system""" return [file_obj.full_name for file_obj in self.files.values()] def display_file(self, full_filename: str) -> str | None: """Display file content using file-specific display method""" if not self._is_valid_filename(full_filename): return None file_obj = self.get_file(full_filename) if not file_obj: return None return file_obj.read() async def read_file(self, full_filename: str, external_file: bool = False) -> str: """Read file content using file-specific read method and return appropriate message to LLM""" if external_file: try: try: _, extension = self._parse_filename(full_filename) except Exception: return f'Error: Invalid filename format {full_filename}. Must be alphanumeric with a supported extension.' if extension in ['md', 'txt', 'json', 'jsonl', 'csv']: import anyio async with await anyio.open_file(full_filename, 'r') as f: content = await f.read() return f'Read from file {full_filename}.\n\n{content}\n' elif extension == 'pdf': import pypdf reader = pypdf.PdfReader(full_filename) num_pages = len(reader.pages) MAX_PDF_PAGES = 20 extra_pages = num_pages - MAX_PDF_PAGES extracted_text = '' for page in reader.pages[:MAX_PDF_PAGES]: extracted_text += page.extract_text() extra_pages_text = f'{extra_pages} more pages...' if extra_pages > 0 else '' return f'Read from file {full_filename}.\n\n{extracted_text}\n{extra_pages_text}' else: return f'Error: Cannot read file {full_filename} as {extension} extension is not supported.' except FileNotFoundError: return f"Error: File '{full_filename}' not found." except PermissionError: return f"Error: Permission denied to read file '{full_filename}'." except Exception as e: return f"Error: Could not read file '{full_filename}'." if not self._is_valid_filename(full_filename): return INVALID_FILENAME_ERROR_MESSAGE file_obj = self.get_file(full_filename) if not file_obj: return f"File '{full_filename}' not found." try: content = file_obj.read() return f'Read from file {full_filename}.\n\n{content}\n' except FileSystemError as e: return str(e) except Exception: return f"Error: Could not read file '{full_filename}'." async def write_file(self, full_filename: str, content: str) -> str: """Write content to file using file-specific write method""" if not self._is_valid_filename(full_filename): return INVALID_FILENAME_ERROR_MESSAGE try: name_without_ext, extension = self._parse_filename(full_filename) file_class = self._get_file_type_class(extension) if not file_class: raise ValueError(f"Error: Invalid file extension '{extension}' for file '{full_filename}'.") # Create or get existing file using full filename as key if full_filename in self.files: file_obj = self.files[full_filename] else: file_obj = file_class(name=name_without_ext) self.files[full_filename] = file_obj # Use full filename as key # Use file-specific write method await file_obj.write(content, self.data_dir) return f'Data written to file {full_filename} successfully.' except FileSystemError as e: return str(e) except Exception as e: return f"Error: Could not write to file '{full_filename}'. {str(e)}" async def append_file(self, full_filename: str, content: str) -> str: """Append content to file using file-specific append method""" if not self._is_valid_filename(full_filename): return INVALID_FILENAME_ERROR_MESSAGE file_obj = self.get_file(full_filename) if not file_obj: return f"File '{full_filename}' not found." try: await file_obj.append(content, self.data_dir) return f'Data appended to file {full_filename} successfully.' except FileSystemError as e: return str(e) except Exception as e: return f"Error: Could not append to file '{full_filename}'. {str(e)}" async def replace_file_str(self, full_filename: str, old_str: str, new_str: str) -> str: """Replace old_str with new_str in file_name""" if not self._is_valid_filename(full_filename): return INVALID_FILENAME_ERROR_MESSAGE if not old_str: return 'Error: Cannot replace empty string. Please provide a non-empty string to replace.' file_obj = self.get_file(full_filename) if not file_obj: return f"File '{full_filename}' not found." try: content = file_obj.read() content = content.replace(old_str, new_str) await file_obj.write(content, self.data_dir) return f'Successfully replaced all occurrences of "{old_str}" with "{new_str}" in file {full_filename}' except FileSystemError as e: return str(e) except Exception as e: return f"Error: Could not replace string in file '{full_filename}'. {str(e)}" async def save_extracted_content(self, content: str) -> str: """Save extracted content to a numbered file""" initial_filename = f'extracted_content_{self.extracted_content_count}' extracted_filename = f'{initial_filename}.md' file_obj = MarkdownFile(name=initial_filename) await file_obj.write(content, self.data_dir) self.files[extracted_filename] = file_obj self.extracted_content_count += 1 return extracted_filename def describe(self) -> str: """List all files with their content information using file-specific display methods""" DISPLAY_CHARS = 400 description = '' for file_obj in self.files.values(): # Skip todo.md from description if file_obj.full_name == 'todo.md': continue content = file_obj.read() # Handle empty files if not content: description += f'\n{file_obj.full_name} - [empty file]\n\n' continue lines = content.splitlines() line_count = len(lines) # For small files, display the entire content whole_file_description = ( f'\n{file_obj.full_name} - {line_count} lines\n\n{content}\n\n\n' ) if len(content) < int(1.5 * DISPLAY_CHARS): description += whole_file_description continue # For larger files, display start and end previews half_display_chars = DISPLAY_CHARS // 2 # Get start preview start_preview = '' start_line_count = 0 chars_count = 0 for line in lines: if chars_count + len(line) + 1 > half_display_chars: break start_preview += line + '\n' chars_count += len(line) + 1 start_line_count += 1 # Get end preview end_preview = '' end_line_count = 0 chars_count = 0 for line in reversed(lines): if chars_count + len(line) + 1 > half_display_chars: break end_preview = line + '\n' + end_preview chars_count += len(line) + 1 end_line_count += 1 # Calculate lines in between middle_line_count = line_count - start_line_count - end_line_count if middle_line_count <= 0: description += whole_file_description continue start_preview = start_preview.strip('\n').rstrip() end_preview = end_preview.strip('\n').rstrip() # Format output if not (start_preview or end_preview): description += f'\n{file_obj.full_name} - {line_count} lines\n\n{middle_line_count} lines...\n\n\n' else: description += f'\n{file_obj.full_name} - {line_count} lines\n\n{start_preview}\n' description += f'... {middle_line_count} more lines ...\n' description += f'{end_preview}\n' description += '\n\n' return description.strip('\n') def get_todo_contents(self) -> str: """Get todo file contents""" todo_file = self.get_file('todo.md') return todo_file.read() if todo_file else '' def get_state(self) -> FileSystemState: """Get serializable state of the file system""" files_data = {} for full_filename, file_obj in self.files.items(): files_data[full_filename] = {'type': file_obj.__class__.__name__, 'data': file_obj.model_dump()} return FileSystemState( files=files_data, base_dir=str(self.base_dir), extracted_content_count=self.extracted_content_count ) def nuke(self) -> None: """Delete the file system directory""" shutil.rmtree(self.data_dir) @classmethod def from_state(cls, state: FileSystemState) -> 'FileSystem': """Restore file system from serializable state at the exact same location""" # Create file system without default files fs = cls(base_dir=Path(state.base_dir), create_default_files=False) fs.extracted_content_count = state.extracted_content_count # Restore all files for full_filename, file_data in state.files.items(): file_type = file_data['type'] file_info = file_data['data'] # Create the appropriate file object based on type if file_type == 'MarkdownFile': file_obj = MarkdownFile(**file_info) elif file_type == 'TxtFile': file_obj = TxtFile(**file_info) elif file_type == 'JsonFile': file_obj = JsonFile(**file_info) elif file_type == 'JsonlFile': file_obj = JsonlFile(**file_info) elif file_type == 'CsvFile': file_obj = CsvFile(**file_info) elif file_type == 'PdfFile': file_obj = PdfFile(**file_info) else: # Skip unknown file types continue # Add to files dict and sync to disk fs.files[full_filename] = file_obj file_obj.sync_to_disk_sync(fs.data_dir) return fs