Spaces:
Sleeping
Sleeping
| import asyncio | |
| import os | |
| import re | |
| import shutil | |
| from abc import ABC, abstractmethod | |
| from concurrent.futures import ThreadPoolExecutor | |
| from pathlib import Path | |
| from typing import Any | |
| from pydantic import BaseModel, Field | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.lib.styles import getSampleStyleSheet | |
| from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer | |
| INVALID_FILENAME_ERROR_MESSAGE = 'Error: Invalid filename format. Must be alphanumeric with supported extension.' | |
| DEFAULT_FILE_SYSTEM_PATH = 'browseruse_agent_data' | |
| class FileSystemError(Exception): | |
| """Custom exception for file system operations that should be shown to LLM""" | |
| pass | |
| class BaseFile(BaseModel, ABC): | |
| """Base class for all file types""" | |
| name: str | |
| content: str = '' | |
| # --- Subclass must define this --- | |
| def extension(self) -> str: | |
| """File extension (e.g. 'txt', 'md')""" | |
| pass | |
| def write_file_content(self, content: str) -> None: | |
| """Update internal content (formatted)""" | |
| self.update_content(content) | |
| def append_file_content(self, content: str) -> None: | |
| """Append content to internal content""" | |
| self.update_content(self.content + content) | |
| # --- These are shared and implemented here --- | |
| def update_content(self, content: str) -> None: | |
| self.content = content | |
| def sync_to_disk_sync(self, path: Path) -> None: | |
| file_path = path / self.full_name | |
| file_path.write_text(self.content) | |
| async def sync_to_disk(self, path: Path) -> None: | |
| file_path = path / self.full_name | |
| with ThreadPoolExecutor() as executor: | |
| await asyncio.get_event_loop().run_in_executor(executor, lambda: file_path.write_text(self.content)) | |
| async def write(self, content: str, path: Path) -> None: | |
| self.write_file_content(content) | |
| await self.sync_to_disk(path) | |
| async def append(self, content: str, path: Path) -> None: | |
| self.append_file_content(content) | |
| await self.sync_to_disk(path) | |
| def read(self) -> str: | |
| return self.content | |
| def full_name(self) -> str: | |
| return f'{self.name}.{self.extension}' | |
| def get_size(self) -> int: | |
| return len(self.content) | |
| def get_line_count(self) -> int: | |
| return len(self.content.splitlines()) | |
| class MarkdownFile(BaseFile): | |
| """Markdown file implementation""" | |
| def extension(self) -> str: | |
| return 'md' | |
| class TxtFile(BaseFile): | |
| """Plain text file implementation""" | |
| def extension(self) -> str: | |
| return 'txt' | |
| class JsonFile(BaseFile): | |
| """JSON file implementation""" | |
| def extension(self) -> str: | |
| return 'json' | |
| class CsvFile(BaseFile): | |
| """CSV file implementation""" | |
| def extension(self) -> str: | |
| return 'csv' | |
| class JsonlFile(BaseFile): | |
| """JSONL (JSON Lines) file implementation""" | |
| def extension(self) -> str: | |
| return 'jsonl' | |
| class PdfFile(BaseFile): | |
| """PDF file implementation""" | |
| def extension(self) -> str: | |
| return 'pdf' | |
| def sync_to_disk_sync(self, path: Path) -> None: | |
| file_path = path / self.full_name | |
| try: | |
| # Create PDF document | |
| doc = SimpleDocTemplate(str(file_path), pagesize=letter) | |
| styles = getSampleStyleSheet() | |
| story = [] | |
| # Convert markdown content to simple text and add to PDF | |
| # For basic implementation, we'll treat content as plain text | |
| # This avoids the AGPL license issue while maintaining functionality | |
| content_lines = self.content.split('\n') | |
| for line in content_lines: | |
| if line.strip(): | |
| # Handle basic markdown headers | |
| if line.startswith('# '): | |
| para = Paragraph(line[2:], styles['Title']) | |
| elif line.startswith('## '): | |
| para = Paragraph(line[3:], styles['Heading1']) | |
| elif line.startswith('### '): | |
| para = Paragraph(line[4:], styles['Heading2']) | |
| else: | |
| para = Paragraph(line, styles['Normal']) | |
| story.append(para) | |
| else: | |
| story.append(Spacer(1, 6)) | |
| doc.build(story) | |
| except Exception as e: | |
| raise FileSystemError(f"Error: Could not write to file '{self.full_name}'. {str(e)}") | |
| async def sync_to_disk(self, path: Path) -> None: | |
| with ThreadPoolExecutor() as executor: | |
| await asyncio.get_event_loop().run_in_executor(executor, lambda: self.sync_to_disk_sync(path)) | |
| class FileSystemState(BaseModel): | |
| """Serializable state of the file system""" | |
| files: dict[str, dict[str, Any]] = Field(default_factory=dict) # full filename -> file data | |
| base_dir: str | |
| extracted_content_count: int = 0 | |
| class FileSystem: | |
| """Enhanced file system with in-memory storage and multiple file type support""" | |
| def __init__(self, base_dir: str | Path, create_default_files: bool = True): | |
| # Handle the Path conversion before calling super().__init__ | |
| self.base_dir = Path(base_dir) if isinstance(base_dir, str) else base_dir | |
| self.base_dir.mkdir(parents=True, exist_ok=True) | |
| # Create and use a dedicated subfolder for all operations | |
| self.data_dir = self.base_dir / DEFAULT_FILE_SYSTEM_PATH | |
| if self.data_dir.exists(): | |
| # clean the data directory | |
| shutil.rmtree(self.data_dir) | |
| self.data_dir.mkdir(exist_ok=True) | |
| self._file_types: dict[str, type[BaseFile]] = { | |
| 'md': MarkdownFile, | |
| 'txt': TxtFile, | |
| 'json': JsonFile, | |
| 'jsonl': JsonlFile, | |
| 'csv': CsvFile, | |
| 'pdf': PdfFile, | |
| } | |
| self.files = {} | |
| if create_default_files: | |
| self.default_files = ['todo.md'] | |
| self._create_default_files() | |
| self.extracted_content_count = 0 | |
| def get_allowed_extensions(self) -> list[str]: | |
| """Get allowed extensions""" | |
| return list(self._file_types.keys()) | |
| def _get_file_type_class(self, extension: str) -> type[BaseFile] | None: | |
| """Get the appropriate file class for an extension.""" | |
| return self._file_types.get(extension.lower(), None) | |
| def _create_default_files(self) -> None: | |
| """Create default results and todo files""" | |
| for full_filename in self.default_files: | |
| name_without_ext, extension = self._parse_filename(full_filename) | |
| file_class = self._get_file_type_class(extension) | |
| if not file_class: | |
| raise ValueError(f"Error: Invalid file extension '{extension}' for file '{full_filename}'.") | |
| file_obj = file_class(name=name_without_ext) | |
| self.files[full_filename] = file_obj # Use full filename as key | |
| file_obj.sync_to_disk_sync(self.data_dir) | |
| def _is_valid_filename(self, file_name: str) -> bool: | |
| """Check if filename matches the required pattern: name.extension""" | |
| # Build extensions pattern from _file_types | |
| extensions = '|'.join(self._file_types.keys()) | |
| pattern = rf'^[a-zA-Z0-9_\-\u4e00-\u9fff]+\.({extensions})$' | |
| file_name_base = os.path.basename(file_name) | |
| return bool(re.match(pattern, file_name_base)) | |
| def _parse_filename(self, filename: str) -> tuple[str, str]: | |
| """Parse filename into name and extension. Always check _is_valid_filename first.""" | |
| name, extension = filename.rsplit('.', 1) | |
| return name, extension.lower() | |
| def get_dir(self) -> Path: | |
| """Get the file system directory""" | |
| return self.data_dir | |
| def get_file(self, full_filename: str) -> BaseFile | None: | |
| """Get a file object by full filename""" | |
| if not self._is_valid_filename(full_filename): | |
| return None | |
| # Use full filename as key | |
| return self.files.get(full_filename) | |
| def list_files(self) -> list[str]: | |
| """List all files in the system""" | |
| return [file_obj.full_name for file_obj in self.files.values()] | |
| def display_file(self, full_filename: str) -> str | None: | |
| """Display file content using file-specific display method""" | |
| if not self._is_valid_filename(full_filename): | |
| return None | |
| file_obj = self.get_file(full_filename) | |
| if not file_obj: | |
| return None | |
| return file_obj.read() | |
| async def read_file(self, full_filename: str, external_file: bool = False) -> str: | |
| """Read file content using file-specific read method and return appropriate message to LLM""" | |
| if external_file: | |
| try: | |
| try: | |
| _, extension = self._parse_filename(full_filename) | |
| except Exception: | |
| return f'Error: Invalid filename format {full_filename}. Must be alphanumeric with a supported extension.' | |
| if extension in ['md', 'txt', 'json', 'jsonl', 'csv']: | |
| import anyio | |
| async with await anyio.open_file(full_filename, 'r') as f: | |
| content = await f.read() | |
| return f'Read from file {full_filename}.\n<content>\n{content}\n</content>' | |
| elif extension == 'pdf': | |
| import pypdf | |
| reader = pypdf.PdfReader(full_filename) | |
| num_pages = len(reader.pages) | |
| MAX_PDF_PAGES = 20 | |
| extra_pages = num_pages - MAX_PDF_PAGES | |
| extracted_text = '' | |
| for page in reader.pages[:MAX_PDF_PAGES]: | |
| extracted_text += page.extract_text() | |
| extra_pages_text = f'{extra_pages} more pages...' if extra_pages > 0 else '' | |
| return f'Read from file {full_filename}.\n<content>\n{extracted_text}\n{extra_pages_text}</content>' | |
| else: | |
| return f'Error: Cannot read file {full_filename} as {extension} extension is not supported.' | |
| except FileNotFoundError: | |
| return f"Error: File '{full_filename}' not found." | |
| except PermissionError: | |
| return f"Error: Permission denied to read file '{full_filename}'." | |
| except Exception as e: | |
| return f"Error: Could not read file '{full_filename}'." | |
| if not self._is_valid_filename(full_filename): | |
| return INVALID_FILENAME_ERROR_MESSAGE | |
| file_obj = self.get_file(full_filename) | |
| if not file_obj: | |
| return f"File '{full_filename}' not found." | |
| try: | |
| content = file_obj.read() | |
| return f'Read from file {full_filename}.\n<content>\n{content}\n</content>' | |
| except FileSystemError as e: | |
| return str(e) | |
| except Exception: | |
| return f"Error: Could not read file '{full_filename}'." | |
| async def write_file(self, full_filename: str, content: str) -> str: | |
| """Write content to file using file-specific write method""" | |
| if not self._is_valid_filename(full_filename): | |
| return INVALID_FILENAME_ERROR_MESSAGE | |
| try: | |
| name_without_ext, extension = self._parse_filename(full_filename) | |
| file_class = self._get_file_type_class(extension) | |
| if not file_class: | |
| raise ValueError(f"Error: Invalid file extension '{extension}' for file '{full_filename}'.") | |
| # Create or get existing file using full filename as key | |
| if full_filename in self.files: | |
| file_obj = self.files[full_filename] | |
| else: | |
| file_obj = file_class(name=name_without_ext) | |
| self.files[full_filename] = file_obj # Use full filename as key | |
| # Use file-specific write method | |
| await file_obj.write(content, self.data_dir) | |
| return f'Data written to file {full_filename} successfully.' | |
| except FileSystemError as e: | |
| return str(e) | |
| except Exception as e: | |
| return f"Error: Could not write to file '{full_filename}'. {str(e)}" | |
| async def append_file(self, full_filename: str, content: str) -> str: | |
| """Append content to file using file-specific append method""" | |
| if not self._is_valid_filename(full_filename): | |
| return INVALID_FILENAME_ERROR_MESSAGE | |
| file_obj = self.get_file(full_filename) | |
| if not file_obj: | |
| return f"File '{full_filename}' not found." | |
| try: | |
| await file_obj.append(content, self.data_dir) | |
| return f'Data appended to file {full_filename} successfully.' | |
| except FileSystemError as e: | |
| return str(e) | |
| except Exception as e: | |
| return f"Error: Could not append to file '{full_filename}'. {str(e)}" | |
| async def replace_file_str(self, full_filename: str, old_str: str, new_str: str) -> str: | |
| """Replace old_str with new_str in file_name""" | |
| if not self._is_valid_filename(full_filename): | |
| return INVALID_FILENAME_ERROR_MESSAGE | |
| if not old_str: | |
| return 'Error: Cannot replace empty string. Please provide a non-empty string to replace.' | |
| file_obj = self.get_file(full_filename) | |
| if not file_obj: | |
| return f"File '{full_filename}' not found." | |
| try: | |
| content = file_obj.read() | |
| content = content.replace(old_str, new_str) | |
| await file_obj.write(content, self.data_dir) | |
| return f'Successfully replaced all occurrences of "{old_str}" with "{new_str}" in file {full_filename}' | |
| except FileSystemError as e: | |
| return str(e) | |
| except Exception as e: | |
| return f"Error: Could not replace string in file '{full_filename}'. {str(e)}" | |
| async def save_extracted_content(self, content: str) -> str: | |
| """Save extracted content to a numbered file""" | |
| initial_filename = f'extracted_content_{self.extracted_content_count}' | |
| extracted_filename = f'{initial_filename}.md' | |
| file_obj = MarkdownFile(name=initial_filename) | |
| await file_obj.write(content, self.data_dir) | |
| self.files[extracted_filename] = file_obj | |
| self.extracted_content_count += 1 | |
| return extracted_filename | |
| def describe(self) -> str: | |
| """List all files with their content information using file-specific display methods""" | |
| DISPLAY_CHARS = 400 | |
| description = '' | |
| for file_obj in self.files.values(): | |
| # Skip todo.md from description | |
| if file_obj.full_name == 'todo.md': | |
| continue | |
| content = file_obj.read() | |
| # Handle empty files | |
| if not content: | |
| description += f'<file>\n{file_obj.full_name} - [empty file]\n</file>\n' | |
| continue | |
| lines = content.splitlines() | |
| line_count = len(lines) | |
| # For small files, display the entire content | |
| whole_file_description = ( | |
| f'<file>\n{file_obj.full_name} - {line_count} lines\n<content>\n{content}\n</content>\n</file>\n' | |
| ) | |
| if len(content) < int(1.5 * DISPLAY_CHARS): | |
| description += whole_file_description | |
| continue | |
| # For larger files, display start and end previews | |
| half_display_chars = DISPLAY_CHARS // 2 | |
| # Get start preview | |
| start_preview = '' | |
| start_line_count = 0 | |
| chars_count = 0 | |
| for line in lines: | |
| if chars_count + len(line) + 1 > half_display_chars: | |
| break | |
| start_preview += line + '\n' | |
| chars_count += len(line) + 1 | |
| start_line_count += 1 | |
| # Get end preview | |
| end_preview = '' | |
| end_line_count = 0 | |
| chars_count = 0 | |
| for line in reversed(lines): | |
| if chars_count + len(line) + 1 > half_display_chars: | |
| break | |
| end_preview = line + '\n' + end_preview | |
| chars_count += len(line) + 1 | |
| end_line_count += 1 | |
| # Calculate lines in between | |
| middle_line_count = line_count - start_line_count - end_line_count | |
| if middle_line_count <= 0: | |
| description += whole_file_description | |
| continue | |
| start_preview = start_preview.strip('\n').rstrip() | |
| end_preview = end_preview.strip('\n').rstrip() | |
| # Format output | |
| if not (start_preview or end_preview): | |
| description += f'<file>\n{file_obj.full_name} - {line_count} lines\n<content>\n{middle_line_count} lines...\n</content>\n</file>\n' | |
| else: | |
| description += f'<file>\n{file_obj.full_name} - {line_count} lines\n<content>\n{start_preview}\n' | |
| description += f'... {middle_line_count} more lines ...\n' | |
| description += f'{end_preview}\n' | |
| description += '</content>\n</file>\n' | |
| return description.strip('\n') | |
| def get_todo_contents(self) -> str: | |
| """Get todo file contents""" | |
| todo_file = self.get_file('todo.md') | |
| return todo_file.read() if todo_file else '' | |
| def get_state(self) -> FileSystemState: | |
| """Get serializable state of the file system""" | |
| files_data = {} | |
| for full_filename, file_obj in self.files.items(): | |
| files_data[full_filename] = {'type': file_obj.__class__.__name__, 'data': file_obj.model_dump()} | |
| return FileSystemState( | |
| files=files_data, base_dir=str(self.base_dir), extracted_content_count=self.extracted_content_count | |
| ) | |
| def nuke(self) -> None: | |
| """Delete the file system directory""" | |
| shutil.rmtree(self.data_dir) | |
| def from_state(cls, state: FileSystemState) -> 'FileSystem': | |
| """Restore file system from serializable state at the exact same location""" | |
| # Create file system without default files | |
| fs = cls(base_dir=Path(state.base_dir), create_default_files=False) | |
| fs.extracted_content_count = state.extracted_content_count | |
| # Restore all files | |
| for full_filename, file_data in state.files.items(): | |
| file_type = file_data['type'] | |
| file_info = file_data['data'] | |
| # Create the appropriate file object based on type | |
| if file_type == 'MarkdownFile': | |
| file_obj = MarkdownFile(**file_info) | |
| elif file_type == 'TxtFile': | |
| file_obj = TxtFile(**file_info) | |
| elif file_type == 'JsonFile': | |
| file_obj = JsonFile(**file_info) | |
| elif file_type == 'JsonlFile': | |
| file_obj = JsonlFile(**file_info) | |
| elif file_type == 'CsvFile': | |
| file_obj = CsvFile(**file_info) | |
| elif file_type == 'PdfFile': | |
| file_obj = PdfFile(**file_info) | |
| else: | |
| # Skip unknown file types | |
| continue | |
| # Add to files dict and sync to disk | |
| fs.files[full_filename] = file_obj | |
| file_obj.sync_to_disk_sync(fs.data_dir) | |
| return fs | |