Spaces:
Sleeping
Sleeping
File size: 16,502 Bytes
d7b3d84 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 | import asyncio
import os
import re
import shutil
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Field
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer
INVALID_FILENAME_ERROR_MESSAGE = 'Error: Invalid filename format. Must be alphanumeric with supported extension.'
DEFAULT_FILE_SYSTEM_PATH = 'browseruse_agent_data'
class FileSystemError(Exception):
"""Custom exception for file system operations that should be shown to LLM"""
pass
class BaseFile(BaseModel, ABC):
"""Base class for all file types"""
name: str
content: str = ''
# --- Subclass must define this ---
@property
@abstractmethod
def extension(self) -> str:
"""File extension (e.g. 'txt', 'md')"""
pass
def write_file_content(self, content: str) -> None:
"""Update internal content (formatted)"""
self.update_content(content)
def append_file_content(self, content: str) -> None:
"""Append content to internal content"""
self.update_content(self.content + content)
# --- These are shared and implemented here ---
def update_content(self, content: str) -> None:
self.content = content
def sync_to_disk_sync(self, path: Path) -> None:
file_path = path / self.full_name
file_path.write_text(self.content)
async def sync_to_disk(self, path: Path) -> None:
file_path = path / self.full_name
with ThreadPoolExecutor() as executor:
await asyncio.get_event_loop().run_in_executor(executor, lambda: file_path.write_text(self.content))
async def write(self, content: str, path: Path) -> None:
self.write_file_content(content)
await self.sync_to_disk(path)
async def append(self, content: str, path: Path) -> None:
self.append_file_content(content)
await self.sync_to_disk(path)
def read(self) -> str:
return self.content
@property
def full_name(self) -> str:
return f'{self.name}.{self.extension}'
@property
def get_size(self) -> int:
return len(self.content)
@property
def get_line_count(self) -> int:
return len(self.content.splitlines())
class MarkdownFile(BaseFile):
"""Markdown file implementation"""
@property
def extension(self) -> str:
return 'md'
class TxtFile(BaseFile):
"""Plain text file implementation"""
@property
def extension(self) -> str:
return 'txt'
class JsonFile(BaseFile):
"""JSON file implementation"""
@property
def extension(self) -> str:
return 'json'
class CsvFile(BaseFile):
"""CSV file implementation"""
@property
def extension(self) -> str:
return 'csv'
class JsonlFile(BaseFile):
"""JSONL (JSON Lines) file implementation"""
@property
def extension(self) -> str:
return 'jsonl'
class PdfFile(BaseFile):
"""PDF file implementation"""
@property
def extension(self) -> str:
return 'pdf'
def sync_to_disk_sync(self, path: Path) -> None:
file_path = path / self.full_name
try:
# Create PDF document
doc = SimpleDocTemplate(str(file_path), pagesize=letter)
styles = getSampleStyleSheet()
story = []
# Convert markdown content to simple text and add to PDF
# For basic implementation, we'll treat content as plain text
# This avoids the AGPL license issue while maintaining functionality
content_lines = self.content.split('\n')
for line in content_lines:
if line.strip():
# Handle basic markdown headers
if line.startswith('# '):
para = Paragraph(line[2:], styles['Title'])
elif line.startswith('## '):
para = Paragraph(line[3:], styles['Heading1'])
elif line.startswith('### '):
para = Paragraph(line[4:], styles['Heading2'])
else:
para = Paragraph(line, styles['Normal'])
story.append(para)
else:
story.append(Spacer(1, 6))
doc.build(story)
except Exception as e:
raise FileSystemError(f"Error: Could not write to file '{self.full_name}'. {str(e)}")
async def sync_to_disk(self, path: Path) -> None:
with ThreadPoolExecutor() as executor:
await asyncio.get_event_loop().run_in_executor(executor, lambda: self.sync_to_disk_sync(path))
class FileSystemState(BaseModel):
"""Serializable state of the file system"""
files: dict[str, dict[str, Any]] = Field(default_factory=dict) # full filename -> file data
base_dir: str
extracted_content_count: int = 0
class FileSystem:
"""Enhanced file system with in-memory storage and multiple file type support"""
def __init__(self, base_dir: str | Path, create_default_files: bool = True):
# Handle the Path conversion before calling super().__init__
self.base_dir = Path(base_dir) if isinstance(base_dir, str) else base_dir
self.base_dir.mkdir(parents=True, exist_ok=True)
# Create and use a dedicated subfolder for all operations
self.data_dir = self.base_dir / DEFAULT_FILE_SYSTEM_PATH
if self.data_dir.exists():
# clean the data directory
shutil.rmtree(self.data_dir)
self.data_dir.mkdir(exist_ok=True)
self._file_types: dict[str, type[BaseFile]] = {
'md': MarkdownFile,
'txt': TxtFile,
'json': JsonFile,
'jsonl': JsonlFile,
'csv': CsvFile,
'pdf': PdfFile,
}
self.files = {}
if create_default_files:
self.default_files = ['todo.md']
self._create_default_files()
self.extracted_content_count = 0
def get_allowed_extensions(self) -> list[str]:
"""Get allowed extensions"""
return list(self._file_types.keys())
def _get_file_type_class(self, extension: str) -> type[BaseFile] | None:
"""Get the appropriate file class for an extension."""
return self._file_types.get(extension.lower(), None)
def _create_default_files(self) -> None:
"""Create default results and todo files"""
for full_filename in self.default_files:
name_without_ext, extension = self._parse_filename(full_filename)
file_class = self._get_file_type_class(extension)
if not file_class:
raise ValueError(f"Error: Invalid file extension '{extension}' for file '{full_filename}'.")
file_obj = file_class(name=name_without_ext)
self.files[full_filename] = file_obj # Use full filename as key
file_obj.sync_to_disk_sync(self.data_dir)
def _is_valid_filename(self, file_name: str) -> bool:
"""Check if filename matches the required pattern: name.extension"""
# Build extensions pattern from _file_types
extensions = '|'.join(self._file_types.keys())
pattern = rf'^[a-zA-Z0-9_\-\u4e00-\u9fff]+\.({extensions})$'
file_name_base = os.path.basename(file_name)
return bool(re.match(pattern, file_name_base))
def _parse_filename(self, filename: str) -> tuple[str, str]:
"""Parse filename into name and extension. Always check _is_valid_filename first."""
name, extension = filename.rsplit('.', 1)
return name, extension.lower()
def get_dir(self) -> Path:
"""Get the file system directory"""
return self.data_dir
def get_file(self, full_filename: str) -> BaseFile | None:
"""Get a file object by full filename"""
if not self._is_valid_filename(full_filename):
return None
# Use full filename as key
return self.files.get(full_filename)
def list_files(self) -> list[str]:
"""List all files in the system"""
return [file_obj.full_name for file_obj in self.files.values()]
def display_file(self, full_filename: str) -> str | None:
"""Display file content using file-specific display method"""
if not self._is_valid_filename(full_filename):
return None
file_obj = self.get_file(full_filename)
if not file_obj:
return None
return file_obj.read()
async def read_file(self, full_filename: str, external_file: bool = False) -> str:
"""Read file content using file-specific read method and return appropriate message to LLM"""
if external_file:
try:
try:
_, extension = self._parse_filename(full_filename)
except Exception:
return f'Error: Invalid filename format {full_filename}. Must be alphanumeric with a supported extension.'
if extension in ['md', 'txt', 'json', 'jsonl', 'csv']:
import anyio
async with await anyio.open_file(full_filename, 'r') as f:
content = await f.read()
return f'Read from file {full_filename}.\n<content>\n{content}\n</content>'
elif extension == 'pdf':
import pypdf
reader = pypdf.PdfReader(full_filename)
num_pages = len(reader.pages)
MAX_PDF_PAGES = 20
extra_pages = num_pages - MAX_PDF_PAGES
extracted_text = ''
for page in reader.pages[:MAX_PDF_PAGES]:
extracted_text += page.extract_text()
extra_pages_text = f'{extra_pages} more pages...' if extra_pages > 0 else ''
return f'Read from file {full_filename}.\n<content>\n{extracted_text}\n{extra_pages_text}</content>'
else:
return f'Error: Cannot read file {full_filename} as {extension} extension is not supported.'
except FileNotFoundError:
return f"Error: File '{full_filename}' not found."
except PermissionError:
return f"Error: Permission denied to read file '{full_filename}'."
except Exception as e:
return f"Error: Could not read file '{full_filename}'."
if not self._is_valid_filename(full_filename):
return INVALID_FILENAME_ERROR_MESSAGE
file_obj = self.get_file(full_filename)
if not file_obj:
return f"File '{full_filename}' not found."
try:
content = file_obj.read()
return f'Read from file {full_filename}.\n<content>\n{content}\n</content>'
except FileSystemError as e:
return str(e)
except Exception:
return f"Error: Could not read file '{full_filename}'."
async def write_file(self, full_filename: str, content: str) -> str:
"""Write content to file using file-specific write method"""
if not self._is_valid_filename(full_filename):
return INVALID_FILENAME_ERROR_MESSAGE
try:
name_without_ext, extension = self._parse_filename(full_filename)
file_class = self._get_file_type_class(extension)
if not file_class:
raise ValueError(f"Error: Invalid file extension '{extension}' for file '{full_filename}'.")
# Create or get existing file using full filename as key
if full_filename in self.files:
file_obj = self.files[full_filename]
else:
file_obj = file_class(name=name_without_ext)
self.files[full_filename] = file_obj # Use full filename as key
# Use file-specific write method
await file_obj.write(content, self.data_dir)
return f'Data written to file {full_filename} successfully.'
except FileSystemError as e:
return str(e)
except Exception as e:
return f"Error: Could not write to file '{full_filename}'. {str(e)}"
async def append_file(self, full_filename: str, content: str) -> str:
"""Append content to file using file-specific append method"""
if not self._is_valid_filename(full_filename):
return INVALID_FILENAME_ERROR_MESSAGE
file_obj = self.get_file(full_filename)
if not file_obj:
return f"File '{full_filename}' not found."
try:
await file_obj.append(content, self.data_dir)
return f'Data appended to file {full_filename} successfully.'
except FileSystemError as e:
return str(e)
except Exception as e:
return f"Error: Could not append to file '{full_filename}'. {str(e)}"
async def replace_file_str(self, full_filename: str, old_str: str, new_str: str) -> str:
"""Replace old_str with new_str in file_name"""
if not self._is_valid_filename(full_filename):
return INVALID_FILENAME_ERROR_MESSAGE
if not old_str:
return 'Error: Cannot replace empty string. Please provide a non-empty string to replace.'
file_obj = self.get_file(full_filename)
if not file_obj:
return f"File '{full_filename}' not found."
try:
content = file_obj.read()
content = content.replace(old_str, new_str)
await file_obj.write(content, self.data_dir)
return f'Successfully replaced all occurrences of "{old_str}" with "{new_str}" in file {full_filename}'
except FileSystemError as e:
return str(e)
except Exception as e:
return f"Error: Could not replace string in file '{full_filename}'. {str(e)}"
async def save_extracted_content(self, content: str) -> str:
"""Save extracted content to a numbered file"""
initial_filename = f'extracted_content_{self.extracted_content_count}'
extracted_filename = f'{initial_filename}.md'
file_obj = MarkdownFile(name=initial_filename)
await file_obj.write(content, self.data_dir)
self.files[extracted_filename] = file_obj
self.extracted_content_count += 1
return extracted_filename
def describe(self) -> str:
"""List all files with their content information using file-specific display methods"""
DISPLAY_CHARS = 400
description = ''
for file_obj in self.files.values():
# Skip todo.md from description
if file_obj.full_name == 'todo.md':
continue
content = file_obj.read()
# Handle empty files
if not content:
description += f'<file>\n{file_obj.full_name} - [empty file]\n</file>\n'
continue
lines = content.splitlines()
line_count = len(lines)
# For small files, display the entire content
whole_file_description = (
f'<file>\n{file_obj.full_name} - {line_count} lines\n<content>\n{content}\n</content>\n</file>\n'
)
if len(content) < int(1.5 * DISPLAY_CHARS):
description += whole_file_description
continue
# For larger files, display start and end previews
half_display_chars = DISPLAY_CHARS // 2
# Get start preview
start_preview = ''
start_line_count = 0
chars_count = 0
for line in lines:
if chars_count + len(line) + 1 > half_display_chars:
break
start_preview += line + '\n'
chars_count += len(line) + 1
start_line_count += 1
# Get end preview
end_preview = ''
end_line_count = 0
chars_count = 0
for line in reversed(lines):
if chars_count + len(line) + 1 > half_display_chars:
break
end_preview = line + '\n' + end_preview
chars_count += len(line) + 1
end_line_count += 1
# Calculate lines in between
middle_line_count = line_count - start_line_count - end_line_count
if middle_line_count <= 0:
description += whole_file_description
continue
start_preview = start_preview.strip('\n').rstrip()
end_preview = end_preview.strip('\n').rstrip()
# Format output
if not (start_preview or end_preview):
description += f'<file>\n{file_obj.full_name} - {line_count} lines\n<content>\n{middle_line_count} lines...\n</content>\n</file>\n'
else:
description += f'<file>\n{file_obj.full_name} - {line_count} lines\n<content>\n{start_preview}\n'
description += f'... {middle_line_count} more lines ...\n'
description += f'{end_preview}\n'
description += '</content>\n</file>\n'
return description.strip('\n')
def get_todo_contents(self) -> str:
"""Get todo file contents"""
todo_file = self.get_file('todo.md')
return todo_file.read() if todo_file else ''
def get_state(self) -> FileSystemState:
"""Get serializable state of the file system"""
files_data = {}
for full_filename, file_obj in self.files.items():
files_data[full_filename] = {'type': file_obj.__class__.__name__, 'data': file_obj.model_dump()}
return FileSystemState(
files=files_data, base_dir=str(self.base_dir), extracted_content_count=self.extracted_content_count
)
def nuke(self) -> None:
"""Delete the file system directory"""
shutil.rmtree(self.data_dir)
@classmethod
def from_state(cls, state: FileSystemState) -> 'FileSystem':
"""Restore file system from serializable state at the exact same location"""
# Create file system without default files
fs = cls(base_dir=Path(state.base_dir), create_default_files=False)
fs.extracted_content_count = state.extracted_content_count
# Restore all files
for full_filename, file_data in state.files.items():
file_type = file_data['type']
file_info = file_data['data']
# Create the appropriate file object based on type
if file_type == 'MarkdownFile':
file_obj = MarkdownFile(**file_info)
elif file_type == 'TxtFile':
file_obj = TxtFile(**file_info)
elif file_type == 'JsonFile':
file_obj = JsonFile(**file_info)
elif file_type == 'JsonlFile':
file_obj = JsonlFile(**file_info)
elif file_type == 'CsvFile':
file_obj = CsvFile(**file_info)
elif file_type == 'PdfFile':
file_obj = PdfFile(**file_info)
else:
# Skip unknown file types
continue
# Add to files dict and sync to disk
fs.files[full_filename] = file_obj
file_obj.sync_to_disk_sync(fs.data_dir)
return fs
|