File size: 16,502 Bytes
d7b3d84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
import asyncio
import os
import re
import shutil
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any

from pydantic import BaseModel, Field
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer

INVALID_FILENAME_ERROR_MESSAGE = 'Error: Invalid filename format. Must be alphanumeric with supported extension.'
DEFAULT_FILE_SYSTEM_PATH = 'browseruse_agent_data'


class FileSystemError(Exception):
	"""Custom exception for file system operations that should be shown to LLM"""

	pass


class BaseFile(BaseModel, ABC):
	"""Base class for all file types"""

	name: str
	content: str = ''

	# --- Subclass must define this ---
	@property
	@abstractmethod
	def extension(self) -> str:
		"""File extension (e.g. 'txt', 'md')"""
		pass

	def write_file_content(self, content: str) -> None:
		"""Update internal content (formatted)"""
		self.update_content(content)

	def append_file_content(self, content: str) -> None:
		"""Append content to internal content"""
		self.update_content(self.content + content)

	# --- These are shared and implemented here ---

	def update_content(self, content: str) -> None:
		self.content = content

	def sync_to_disk_sync(self, path: Path) -> None:
		file_path = path / self.full_name
		file_path.write_text(self.content)

	async def sync_to_disk(self, path: Path) -> None:
		file_path = path / self.full_name
		with ThreadPoolExecutor() as executor:
			await asyncio.get_event_loop().run_in_executor(executor, lambda: file_path.write_text(self.content))

	async def write(self, content: str, path: Path) -> None:
		self.write_file_content(content)
		await self.sync_to_disk(path)

	async def append(self, content: str, path: Path) -> None:
		self.append_file_content(content)
		await self.sync_to_disk(path)

	def read(self) -> str:
		return self.content

	@property
	def full_name(self) -> str:
		return f'{self.name}.{self.extension}'

	@property
	def get_size(self) -> int:
		return len(self.content)

	@property
	def get_line_count(self) -> int:
		return len(self.content.splitlines())


class MarkdownFile(BaseFile):
	"""Markdown file implementation"""

	@property
	def extension(self) -> str:
		return 'md'


class TxtFile(BaseFile):
	"""Plain text file implementation"""

	@property
	def extension(self) -> str:
		return 'txt'


class JsonFile(BaseFile):
	"""JSON file implementation"""

	@property
	def extension(self) -> str:
		return 'json'


class CsvFile(BaseFile):
	"""CSV file implementation"""

	@property
	def extension(self) -> str:
		return 'csv'


class JsonlFile(BaseFile):
	"""JSONL (JSON Lines) file implementation"""

	@property
	def extension(self) -> str:
		return 'jsonl'


class PdfFile(BaseFile):
	"""PDF file implementation"""

	@property
	def extension(self) -> str:
		return 'pdf'

	def sync_to_disk_sync(self, path: Path) -> None:
		file_path = path / self.full_name
		try:
			# Create PDF document
			doc = SimpleDocTemplate(str(file_path), pagesize=letter)
			styles = getSampleStyleSheet()
			story = []

			# Convert markdown content to simple text and add to PDF
			# For basic implementation, we'll treat content as plain text
			# This avoids the AGPL license issue while maintaining functionality
			content_lines = self.content.split('\n')

			for line in content_lines:
				if line.strip():
					# Handle basic markdown headers
					if line.startswith('# '):
						para = Paragraph(line[2:], styles['Title'])
					elif line.startswith('## '):
						para = Paragraph(line[3:], styles['Heading1'])
					elif line.startswith('### '):
						para = Paragraph(line[4:], styles['Heading2'])
					else:
						para = Paragraph(line, styles['Normal'])
					story.append(para)
				else:
					story.append(Spacer(1, 6))

			doc.build(story)
		except Exception as e:
			raise FileSystemError(f"Error: Could not write to file '{self.full_name}'. {str(e)}")

	async def sync_to_disk(self, path: Path) -> None:
		with ThreadPoolExecutor() as executor:
			await asyncio.get_event_loop().run_in_executor(executor, lambda: self.sync_to_disk_sync(path))


class FileSystemState(BaseModel):
	"""Serializable state of the file system"""

	files: dict[str, dict[str, Any]] = Field(default_factory=dict)  # full filename -> file data
	base_dir: str
	extracted_content_count: int = 0


class FileSystem:
	"""Enhanced file system with in-memory storage and multiple file type support"""

	def __init__(self, base_dir: str | Path, create_default_files: bool = True):
		# Handle the Path conversion before calling super().__init__
		self.base_dir = Path(base_dir) if isinstance(base_dir, str) else base_dir
		self.base_dir.mkdir(parents=True, exist_ok=True)

		# Create and use a dedicated subfolder for all operations
		self.data_dir = self.base_dir / DEFAULT_FILE_SYSTEM_PATH
		if self.data_dir.exists():
			# clean the data directory
			shutil.rmtree(self.data_dir)
		self.data_dir.mkdir(exist_ok=True)

		self._file_types: dict[str, type[BaseFile]] = {
			'md': MarkdownFile,
			'txt': TxtFile,
			'json': JsonFile,
			'jsonl': JsonlFile,
			'csv': CsvFile,
			'pdf': PdfFile,
		}

		self.files = {}
		if create_default_files:
			self.default_files = ['todo.md']
			self._create_default_files()

		self.extracted_content_count = 0

	def get_allowed_extensions(self) -> list[str]:
		"""Get allowed extensions"""
		return list(self._file_types.keys())

	def _get_file_type_class(self, extension: str) -> type[BaseFile] | None:
		"""Get the appropriate file class for an extension."""
		return self._file_types.get(extension.lower(), None)

	def _create_default_files(self) -> None:
		"""Create default results and todo files"""
		for full_filename in self.default_files:
			name_without_ext, extension = self._parse_filename(full_filename)
			file_class = self._get_file_type_class(extension)
			if not file_class:
				raise ValueError(f"Error: Invalid file extension '{extension}' for file '{full_filename}'.")

			file_obj = file_class(name=name_without_ext)
			self.files[full_filename] = file_obj  # Use full filename as key
			file_obj.sync_to_disk_sync(self.data_dir)

	def _is_valid_filename(self, file_name: str) -> bool:
		"""Check if filename matches the required pattern: name.extension"""
		# Build extensions pattern from _file_types
		extensions = '|'.join(self._file_types.keys())
		pattern = rf'^[a-zA-Z0-9_\-\u4e00-\u9fff]+\.({extensions})$'
		file_name_base = os.path.basename(file_name)
		return bool(re.match(pattern, file_name_base))

	def _parse_filename(self, filename: str) -> tuple[str, str]:
		"""Parse filename into name and extension. Always check _is_valid_filename first."""
		name, extension = filename.rsplit('.', 1)
		return name, extension.lower()

	def get_dir(self) -> Path:
		"""Get the file system directory"""
		return self.data_dir

	def get_file(self, full_filename: str) -> BaseFile | None:
		"""Get a file object by full filename"""
		if not self._is_valid_filename(full_filename):
			return None

		# Use full filename as key
		return self.files.get(full_filename)

	def list_files(self) -> list[str]:
		"""List all files in the system"""
		return [file_obj.full_name for file_obj in self.files.values()]

	def display_file(self, full_filename: str) -> str | None:
		"""Display file content using file-specific display method"""
		if not self._is_valid_filename(full_filename):
			return None

		file_obj = self.get_file(full_filename)
		if not file_obj:
			return None

		return file_obj.read()

	async def read_file(self, full_filename: str, external_file: bool = False) -> str:
		"""Read file content using file-specific read method and return appropriate message to LLM"""
		if external_file:
			try:
				try:
					_, extension = self._parse_filename(full_filename)
				except Exception:
					return f'Error: Invalid filename format {full_filename}. Must be alphanumeric with a supported extension.'
				if extension in ['md', 'txt', 'json', 'jsonl', 'csv']:
					import anyio

					async with await anyio.open_file(full_filename, 'r') as f:
						content = await f.read()
						return f'Read from file {full_filename}.\n<content>\n{content}\n</content>'
				elif extension == 'pdf':
					import pypdf

					reader = pypdf.PdfReader(full_filename)
					num_pages = len(reader.pages)
					MAX_PDF_PAGES = 20
					extra_pages = num_pages - MAX_PDF_PAGES
					extracted_text = ''
					for page in reader.pages[:MAX_PDF_PAGES]:
						extracted_text += page.extract_text()
					extra_pages_text = f'{extra_pages} more pages...' if extra_pages > 0 else ''
					return f'Read from file {full_filename}.\n<content>\n{extracted_text}\n{extra_pages_text}</content>'
				else:
					return f'Error: Cannot read file {full_filename} as {extension} extension is not supported.'
			except FileNotFoundError:
				return f"Error: File '{full_filename}' not found."
			except PermissionError:
				return f"Error: Permission denied to read file '{full_filename}'."
			except Exception as e:
				return f"Error: Could not read file '{full_filename}'."

		if not self._is_valid_filename(full_filename):
			return INVALID_FILENAME_ERROR_MESSAGE

		file_obj = self.get_file(full_filename)
		if not file_obj:
			return f"File '{full_filename}' not found."

		try:
			content = file_obj.read()
			return f'Read from file {full_filename}.\n<content>\n{content}\n</content>'
		except FileSystemError as e:
			return str(e)
		except Exception:
			return f"Error: Could not read file '{full_filename}'."

	async def write_file(self, full_filename: str, content: str) -> str:
		"""Write content to file using file-specific write method"""
		if not self._is_valid_filename(full_filename):
			return INVALID_FILENAME_ERROR_MESSAGE

		try:
			name_without_ext, extension = self._parse_filename(full_filename)
			file_class = self._get_file_type_class(extension)
			if not file_class:
				raise ValueError(f"Error: Invalid file extension '{extension}' for file '{full_filename}'.")

			# Create or get existing file using full filename as key
			if full_filename in self.files:
				file_obj = self.files[full_filename]
			else:
				file_obj = file_class(name=name_without_ext)
				self.files[full_filename] = file_obj  # Use full filename as key

			# Use file-specific write method
			await file_obj.write(content, self.data_dir)
			return f'Data written to file {full_filename} successfully.'
		except FileSystemError as e:
			return str(e)
		except Exception as e:
			return f"Error: Could not write to file '{full_filename}'. {str(e)}"

	async def append_file(self, full_filename: str, content: str) -> str:
		"""Append content to file using file-specific append method"""
		if not self._is_valid_filename(full_filename):
			return INVALID_FILENAME_ERROR_MESSAGE

		file_obj = self.get_file(full_filename)
		if not file_obj:
			return f"File '{full_filename}' not found."

		try:
			await file_obj.append(content, self.data_dir)
			return f'Data appended to file {full_filename} successfully.'
		except FileSystemError as e:
			return str(e)
		except Exception as e:
			return f"Error: Could not append to file '{full_filename}'. {str(e)}"

	async def replace_file_str(self, full_filename: str, old_str: str, new_str: str) -> str:
		"""Replace old_str with new_str in file_name"""
		if not self._is_valid_filename(full_filename):
			return INVALID_FILENAME_ERROR_MESSAGE

		if not old_str:
			return 'Error: Cannot replace empty string. Please provide a non-empty string to replace.'

		file_obj = self.get_file(full_filename)
		if not file_obj:
			return f"File '{full_filename}' not found."

		try:
			content = file_obj.read()
			content = content.replace(old_str, new_str)
			await file_obj.write(content, self.data_dir)
			return f'Successfully replaced all occurrences of "{old_str}" with "{new_str}" in file {full_filename}'
		except FileSystemError as e:
			return str(e)
		except Exception as e:
			return f"Error: Could not replace string in file '{full_filename}'. {str(e)}"

	async def save_extracted_content(self, content: str) -> str:
		"""Save extracted content to a numbered file"""
		initial_filename = f'extracted_content_{self.extracted_content_count}'
		extracted_filename = f'{initial_filename}.md'
		file_obj = MarkdownFile(name=initial_filename)
		await file_obj.write(content, self.data_dir)
		self.files[extracted_filename] = file_obj
		self.extracted_content_count += 1
		return extracted_filename

	def describe(self) -> str:
		"""List all files with their content information using file-specific display methods"""
		DISPLAY_CHARS = 400
		description = ''

		for file_obj in self.files.values():
			# Skip todo.md from description
			if file_obj.full_name == 'todo.md':
				continue

			content = file_obj.read()

			# Handle empty files
			if not content:
				description += f'<file>\n{file_obj.full_name} - [empty file]\n</file>\n'
				continue

			lines = content.splitlines()
			line_count = len(lines)

			# For small files, display the entire content
			whole_file_description = (
				f'<file>\n{file_obj.full_name} - {line_count} lines\n<content>\n{content}\n</content>\n</file>\n'
			)
			if len(content) < int(1.5 * DISPLAY_CHARS):
				description += whole_file_description
				continue

			# For larger files, display start and end previews
			half_display_chars = DISPLAY_CHARS // 2

			# Get start preview
			start_preview = ''
			start_line_count = 0
			chars_count = 0
			for line in lines:
				if chars_count + len(line) + 1 > half_display_chars:
					break
				start_preview += line + '\n'
				chars_count += len(line) + 1
				start_line_count += 1

			# Get end preview
			end_preview = ''
			end_line_count = 0
			chars_count = 0
			for line in reversed(lines):
				if chars_count + len(line) + 1 > half_display_chars:
					break
				end_preview = line + '\n' + end_preview
				chars_count += len(line) + 1
				end_line_count += 1

			# Calculate lines in between
			middle_line_count = line_count - start_line_count - end_line_count
			if middle_line_count <= 0:
				description += whole_file_description
				continue

			start_preview = start_preview.strip('\n').rstrip()
			end_preview = end_preview.strip('\n').rstrip()

			# Format output
			if not (start_preview or end_preview):
				description += f'<file>\n{file_obj.full_name} - {line_count} lines\n<content>\n{middle_line_count} lines...\n</content>\n</file>\n'
			else:
				description += f'<file>\n{file_obj.full_name} - {line_count} lines\n<content>\n{start_preview}\n'
				description += f'... {middle_line_count} more lines ...\n'
				description += f'{end_preview}\n'
				description += '</content>\n</file>\n'

		return description.strip('\n')

	def get_todo_contents(self) -> str:
		"""Get todo file contents"""
		todo_file = self.get_file('todo.md')
		return todo_file.read() if todo_file else ''

	def get_state(self) -> FileSystemState:
		"""Get serializable state of the file system"""
		files_data = {}
		for full_filename, file_obj in self.files.items():
			files_data[full_filename] = {'type': file_obj.__class__.__name__, 'data': file_obj.model_dump()}

		return FileSystemState(
			files=files_data, base_dir=str(self.base_dir), extracted_content_count=self.extracted_content_count
		)

	def nuke(self) -> None:
		"""Delete the file system directory"""
		shutil.rmtree(self.data_dir)

	@classmethod
	def from_state(cls, state: FileSystemState) -> 'FileSystem':
		"""Restore file system from serializable state at the exact same location"""
		# Create file system without default files
		fs = cls(base_dir=Path(state.base_dir), create_default_files=False)
		fs.extracted_content_count = state.extracted_content_count

		# Restore all files
		for full_filename, file_data in state.files.items():
			file_type = file_data['type']
			file_info = file_data['data']

			# Create the appropriate file object based on type
			if file_type == 'MarkdownFile':
				file_obj = MarkdownFile(**file_info)
			elif file_type == 'TxtFile':
				file_obj = TxtFile(**file_info)
			elif file_type == 'JsonFile':
				file_obj = JsonFile(**file_info)
			elif file_type == 'JsonlFile':
				file_obj = JsonlFile(**file_info)
			elif file_type == 'CsvFile':
				file_obj = CsvFile(**file_info)
			elif file_type == 'PdfFile':
				file_obj = PdfFile(**file_info)
			else:
				# Skip unknown file types
				continue

			# Add to files dict and sync to disk
			fs.files[full_filename] = file_obj
			file_obj.sync_to_disk_sync(fs.data_dir)

		return fs