Nymbo commited on
Commit
7295cd8
·
verified ·
1 Parent(s): c09e8d8

adding `edit` Action to File_System, allows search & replace for editing files and creating code diffs

Browse files
Files changed (1) hide show
  1. Modules/File_System.py +292 -6
Modules/File_System.py CHANGED
@@ -1,7 +1,9 @@
1
  from __future__ import annotations
2
 
 
3
  import json
4
  import os
 
5
  import shutil
6
  from typing import Annotated, Optional
7
 
@@ -20,7 +22,7 @@ from ._core import (
20
 
21
  TOOL_SUMMARY = (
22
  "Browse, search, and manage files within a safe root. "
23
- "Actions: list, read, write, append, mkdir, move, copy, delete, info, search, help. "
24
  "Fill other fields as needed. "
25
  "Use paths like `/` or `/notes/todo.txt` because all paths are relative to the root (`/`). "
26
  "Use 'help' to see action-specific required fields and examples."
@@ -36,6 +38,7 @@ HELP_TEXT = (
36
  "- read: path (e.g., /notes/todo.txt), offset=0, max_chars=4000 (shows next_cursor when truncated)\n"
37
  "- write: path, content (UTF-8), create_dirs=true\n"
38
  "- append: path, content (UTF-8), create_dirs=true\n"
 
39
  "- mkdir: path (directory), exist_ok=true\n"
40
  "- move: path (src), dest_path (dst), overwrite=false\n"
41
  "- copy: path (src), dest_path (dst), overwrite=false\n"
@@ -43,12 +46,22 @@ HELP_TEXT = (
43
  "- info: path\n"
44
  "- search: path (dir or file), content=query text, recursive=false, show_hidden=false, max_entries=20, case_sensitive=false, offset=0\n"
45
  "- help: show this guide\n\n"
 
 
 
 
 
 
 
 
 
46
  "Errors are returned as JSON with fields: {status:'error', code, message, path?, hint?, data?}.\n\n"
47
  "Examples:\n"
48
  "- list current: action=list, path='/'\n"
49
  "- make folder: action=mkdir, path='/notes'\n"
50
  "- write file: action=write, path='/notes/todo.txt', content='hello'\n"
51
  "- read file: action=read, path='/notes/todo.txt', max_chars=200\n"
 
52
  "- move file: action=move, path='/notes/todo.txt', dest_path='/notes/todo-old.txt', overwrite=true\n"
53
  "- delete dir: action=delete, path='/notes', recursive=true\n"
54
  "- search text: action=search, path='/notes', content='TODO', recursive=true, max_entries=50\n"
@@ -156,6 +169,277 @@ def _delete(abs_path: str, *, recursive: bool) -> str:
156
  except Exception as exc:
157
  return _err("delete_failed", "Failed to delete path.", path=_display_path(abs_path), data={"error": _safe_err(exc)})
158
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159
  # ---------------------------------------------------------------------------
160
  # Read-only operations delegated to sandbox
161
  # ---------------------------------------------------------------------------
@@ -200,7 +484,7 @@ def _info(abs_path: str) -> str:
200
 
201
  @autodoc(summary=TOOL_SUMMARY)
202
  def File_System(
203
- action: Annotated[str, "Operation to perform: 'list', 'read', 'write', 'append', 'mkdir', 'move', 'copy', 'delete', 'info', 'search'."],
204
  path: Annotated[str, "Target path, relative to root unless UNSAFE_ALLOW_ABS_PATHS=1."] = "/",
205
  content: Annotated[Optional[str], "Content for write/append actions or search query (UTF-8)."] = None,
206
  dest_path: Annotated[Optional[str], "Destination for move/copy (relative to root unless unsafe absolute allowed)."] = None,
@@ -228,11 +512,11 @@ def File_System(
228
  case_sensitive=case_sensitive,
229
  )
230
  action = (action or "").strip().lower()
231
- if action not in {"list", "read", "write", "append", "mkdir", "move", "copy", "delete", "info", "search", "help"}:
232
  result = _err(
233
  "invalid_action",
234
  "Invalid action.",
235
- hint="Choose from: list, read, write, append, mkdir, move, copy, delete, info, search, help."
236
  )
237
  _log_call_end("File_System", _truncate_for_log(result))
238
  return result
@@ -263,6 +547,8 @@ def File_System(
263
  )
264
  else:
265
  result = _write_file(abs_path, content or "", append=(action == "append"), create_dirs=create_dirs)
 
 
266
  elif action == "mkdir":
267
  result = _mkdir(abs_path, exist_ok=True)
268
  elif action in {"move", "copy"}:
@@ -309,12 +595,12 @@ def build_interface() -> gr.Interface:
309
  inputs=[
310
  gr.Radio(
311
  label="Action",
312
- choices=["list", "read", "write", "append", "mkdir", "move", "copy", "delete", "info", "search", "help"],
313
  value="help",
314
  info="Operation to perform",
315
  ),
316
  gr.Textbox(label="Path", placeholder="/ or /src/file.txt", max_lines=1, value="/", info="Target path (relative to root)"),
317
- gr.Textbox(label="Content", lines=6, placeholder="Text to write or search for...", info="Content for write/append actions or search query"),
318
  gr.Textbox(label="Destination", max_lines=1, info="Destination path (Move/Copy only)"),
319
  gr.Checkbox(label="Recursive", value=False, info="Recurse into subfolders (List/Delete/Search)"),
320
  gr.Checkbox(label="Show hidden", value=False, info="Include hidden files (List/Search)"),
 
1
  from __future__ import annotations
2
 
3
+ import difflib
4
  import json
5
  import os
6
+ import re
7
  import shutil
8
  from typing import Annotated, Optional
9
 
 
22
 
23
  TOOL_SUMMARY = (
24
  "Browse, search, and manage files within a safe root. "
25
+ "Actions: list, read, write, append, edit, mkdir, move, copy, delete, info, search, help. "
26
  "Fill other fields as needed. "
27
  "Use paths like `/` or `/notes/todo.txt` because all paths are relative to the root (`/`). "
28
  "Use 'help' to see action-specific required fields and examples."
 
38
  "- read: path (e.g., /notes/todo.txt), offset=0, max_chars=4000 (shows next_cursor when truncated)\n"
39
  "- write: path, content (UTF-8), create_dirs=true\n"
40
  "- append: path, content (UTF-8), create_dirs=true\n"
41
+ "- edit: path, content (SEARCH/REPLACE blocks, see format below)\n"
42
  "- mkdir: path (directory), exist_ok=true\n"
43
  "- move: path (src), dest_path (dst), overwrite=false\n"
44
  "- copy: path (src), dest_path (dst), overwrite=false\n"
 
46
  "- info: path\n"
47
  "- search: path (dir or file), content=query text, recursive=false, show_hidden=false, max_entries=20, case_sensitive=false, offset=0\n"
48
  "- help: show this guide\n\n"
49
+ "Edit format (SEARCH/REPLACE blocks):\n"
50
+ "<<<<<<< SEARCH\n"
51
+ "[exact content to find]\n"
52
+ "=======\n"
53
+ "[new content to replace with]\n"
54
+ ">>>>>>> REPLACE\n\n"
55
+ "Multiple blocks can be included; each is applied in order. "
56
+ "Search text must match exactly (whitespace, indentation). "
57
+ "Only the first occurrence of each search text is replaced.\n\n"
58
  "Errors are returned as JSON with fields: {status:'error', code, message, path?, hint?, data?}.\n\n"
59
  "Examples:\n"
60
  "- list current: action=list, path='/'\n"
61
  "- make folder: action=mkdir, path='/notes'\n"
62
  "- write file: action=write, path='/notes/todo.txt', content='hello'\n"
63
  "- read file: action=read, path='/notes/todo.txt', max_chars=200\n"
64
+ "- edit file: action=edit, path='/notes/todo.txt', content='<<<<<<< SEARCH\\nhello\\n=======\\nhi\\n>>>>>>> REPLACE'\n"
65
  "- move file: action=move, path='/notes/todo.txt', dest_path='/notes/todo-old.txt', overwrite=true\n"
66
  "- delete dir: action=delete, path='/notes', recursive=true\n"
67
  "- search text: action=search, path='/notes', content='TODO', recursive=true, max_entries=50\n"
 
169
  except Exception as exc:
170
  return _err("delete_failed", "Failed to delete path.", path=_display_path(abs_path), data={"error": _safe_err(exc)})
171
 
172
+
173
+ # ---------------------------------------------------------------------------
174
+ # Edit (search/replace) operation
175
+ # ---------------------------------------------------------------------------
176
+
177
+ # Regex to parse SEARCH/REPLACE blocks (5+ markers required)
178
+ SEARCH_REPLACE_BLOCK_RE = re.compile(
179
+ r"<{5,} SEARCH\r?\n(.*?)\r?\n?={5,}\r?\n(.*?)\r?\n?>{5,} REPLACE", flags=re.DOTALL
180
+ )
181
+
182
+
183
+ def _parse_search_replace_blocks(content: str) -> list[tuple[str, str]]:
184
+ """Parse SEARCH/REPLACE blocks from content. Returns list of (search, replace) tuples."""
185
+ matches = SEARCH_REPLACE_BLOCK_RE.findall(content)
186
+ return [
187
+ (search.rstrip("\r\n"), replace.rstrip("\r\n"))
188
+ for search, replace in matches
189
+ ]
190
+
191
+
192
+ def _find_search_context(content: str, search_text: str, max_context: int = 5) -> str:
193
+ """Find where parts of the search text appear in the content for debugging."""
194
+ lines = content.split("\n")
195
+ search_lines = search_text.split("\n")
196
+
197
+ if not search_lines:
198
+ return "Search text is empty"
199
+
200
+ first_search_line = search_lines[0].strip()
201
+ if not first_search_line:
202
+ return "First line of search text is empty or whitespace only"
203
+
204
+ matches = []
205
+ for i, line in enumerate(lines):
206
+ if first_search_line in line:
207
+ matches.append(i)
208
+
209
+ if not matches:
210
+ return f"First search line '{first_search_line[:60]}...' not found anywhere in file"
211
+
212
+ context_lines = []
213
+ for match_idx in matches[:3]:
214
+ start = max(0, match_idx - max_context)
215
+ end = min(len(lines), match_idx + max_context + 1)
216
+
217
+ context_lines.append(f"\nPotential match area around line {match_idx + 1}:")
218
+ for i in range(start, end):
219
+ marker = ">>>" if i == match_idx else " "
220
+ context_lines.append(f"{marker} {i + 1:3d}: {lines[i]}")
221
+
222
+ return "\n".join(context_lines)
223
+
224
+
225
+ def _find_best_fuzzy_match(
226
+ content: str, search_text: str, threshold: float = 0.9
227
+ ) -> tuple[float, int, int, str] | None:
228
+ """
229
+ Find the best fuzzy match for search_text within content.
230
+ Returns (similarity, start_line, end_line, matched_text) or None if no match above threshold.
231
+ """
232
+ content_lines = content.split("\n")
233
+ search_lines = search_text.split("\n")
234
+ window_size = len(search_lines)
235
+
236
+ if window_size == 0:
237
+ return None
238
+
239
+ non_empty_search = [line for line in search_lines if line.strip()]
240
+ if not non_empty_search:
241
+ return None
242
+
243
+ first_anchor = non_empty_search[0]
244
+ last_anchor = non_empty_search[-1] if len(non_empty_search) > 1 else first_anchor
245
+
246
+ # Find candidate starting positions near anchor lines
247
+ candidate_starts = set()
248
+ spread = 5
249
+
250
+ for i, line in enumerate(content_lines):
251
+ if first_anchor in line or last_anchor in line:
252
+ start_min = max(0, i - spread)
253
+ start_max = min(len(content_lines) - window_size + 1, i + spread + 1)
254
+ for s in range(start_min, start_max):
255
+ candidate_starts.add(s)
256
+
257
+ # Fall back to first N positions if no anchors found
258
+ if not candidate_starts:
259
+ max_positions = min(len(content_lines) - window_size + 1, 100)
260
+ candidate_starts = set(range(0, max(0, max_positions)))
261
+
262
+ best_match = None
263
+ best_similarity = 0.0
264
+
265
+ for start in candidate_starts:
266
+ end = start + window_size
267
+ window_text = "\n".join(content_lines[start:end])
268
+
269
+ matcher = difflib.SequenceMatcher(None, search_text, window_text)
270
+ similarity = matcher.ratio()
271
+
272
+ if similarity >= threshold and similarity > best_similarity:
273
+ best_similarity = similarity
274
+ best_match = (similarity, start + 1, end, window_text) # 1-based line numbers
275
+
276
+ return best_match
277
+
278
+
279
+ def _create_unified_diff(text1: str, text2: str, label1: str = "SEARCH", label2: str = "CLOSEST MATCH") -> str:
280
+ """Create a unified diff between two texts."""
281
+ lines1 = text1.splitlines(keepends=True)
282
+ lines2 = text2.splitlines(keepends=True)
283
+
284
+ lines1 = [line if line.endswith("\n") else line + "\n" for line in lines1]
285
+ lines2 = [line if line.endswith("\n") else line + "\n" for line in lines2]
286
+
287
+ diff = difflib.unified_diff(lines1, lines2, fromfile=label1, tofile=label2, lineterm="", n=3)
288
+ diff_lines = list(diff)
289
+
290
+ if diff_lines and not diff_lines[0].startswith("==="):
291
+ diff_lines.insert(2, "=" * 67 + "\n")
292
+
293
+ result = "".join(diff_lines)
294
+
295
+ # Truncate long diffs
296
+ max_chars = 2000
297
+ if len(result) > max_chars:
298
+ result = result[:max_chars] + "\n...(diff truncated)"
299
+
300
+ return result.rstrip()
301
+
302
+
303
+ def _edit_file(abs_path: str, content: str) -> str:
304
+ """
305
+ Apply SEARCH/REPLACE blocks to a file.
306
+
307
+ Returns a success message with stats, or an error message with debugging context.
308
+ """
309
+ display = _display_path(abs_path)
310
+
311
+ # Validate file exists
312
+ if not os.path.exists(abs_path):
313
+ return _err("file_not_found", f"File not found: {display}", path=display)
314
+ if os.path.isdir(abs_path):
315
+ return _err("is_directory", f"Path is a directory, not a file: {display}", path=display, hint="Provide a file path.")
316
+
317
+ # Validate content has blocks
318
+ content = (content or "").strip()
319
+ if not content:
320
+ return _err(
321
+ "empty_content",
322
+ "No content provided for edit action.",
323
+ hint="Provide SEARCH/REPLACE blocks in the Content field."
324
+ )
325
+
326
+ blocks = _parse_search_replace_blocks(content)
327
+ if not blocks:
328
+ return _err(
329
+ "no_blocks_found",
330
+ "No valid SEARCH/REPLACE blocks found in content.",
331
+ hint=(
332
+ "Expected format:\n"
333
+ "<<<<<<< SEARCH\n"
334
+ "[exact content to find]\n"
335
+ "=======\n"
336
+ "[new content to replace with]\n"
337
+ ">>>>>>> REPLACE"
338
+ )
339
+ )
340
+
341
+ # Read file
342
+ try:
343
+ with open(abs_path, "r", encoding="utf-8", errors="replace") as f:
344
+ file_content = f.read()
345
+ except Exception as exc:
346
+ return _err("read_failed", "Failed to read file.", path=display, data={"error": _safe_err(exc)})
347
+
348
+ # Apply blocks
349
+ current_content = file_content
350
+ applied = 0
351
+ errors: list[str] = []
352
+ warnings: list[str] = []
353
+
354
+ for i, (search, replace) in enumerate(blocks, 1):
355
+ if search not in current_content:
356
+ # Search text not found - provide helpful debugging context
357
+ context = _find_search_context(current_content, search)
358
+
359
+ error_msg = (
360
+ f"Block {i} failed: Search text not found in {display}\n"
361
+ f"Search text was:\n{search!r}\n"
362
+ f"Context analysis:\n{context}"
363
+ )
364
+
365
+ # Try fuzzy matching
366
+ fuzzy = _find_best_fuzzy_match(current_content, search, 0.9)
367
+ if fuzzy:
368
+ similarity, start_line, end_line, matched_text = fuzzy
369
+ diff = _create_unified_diff(search, matched_text)
370
+ error_msg += (
371
+ f"\n\nClosest fuzzy match (similarity {similarity * 100:.1f}%) "
372
+ f"at lines {start_line}–{end_line}:\n```diff\n{diff}\n```"
373
+ )
374
+
375
+ error_msg += (
376
+ "\n\nDebugging tips:\n"
377
+ "1. Check for exact whitespace/indentation match\n"
378
+ "2. Verify line endings match the file exactly (\\r\\n vs \\n)\n"
379
+ "3. Ensure the search text hasn't been modified by previous blocks\n"
380
+ "4. Check for typos or case sensitivity issues"
381
+ )
382
+
383
+ errors.append(error_msg)
384
+ continue
385
+
386
+ # Check for multiple occurrences
387
+ occurrences = current_content.count(search)
388
+ if occurrences > 1:
389
+ warnings.append(
390
+ f"Block {i}: Search text appears {occurrences} times. "
391
+ f"Only the first occurrence will be replaced."
392
+ )
393
+
394
+ # Apply replacement (first occurrence only)
395
+ current_content = current_content.replace(search, replace, 1)
396
+ applied += 1
397
+
398
+ # If all blocks failed, return error
399
+ if errors and applied == 0:
400
+ error_message = "All SEARCH/REPLACE blocks failed:\n\n" + "\n\n---\n\n".join(errors)
401
+ if warnings:
402
+ error_message += "\n\nWarnings:\n" + "\n".join(warnings)
403
+ return _err("edit_failed", error_message)
404
+
405
+ # If some blocks failed, still try to apply successful ones but report errors
406
+ if errors:
407
+ partial_msg = f"Partial edit: {applied} of {len(blocks)} blocks applied.\n\n"
408
+ partial_msg += "Failed blocks:\n\n" + "\n\n---\n\n".join(errors)
409
+ if warnings:
410
+ partial_msg += "\n\nWarnings:\n" + "\n".join(warnings)
411
+
412
+ # Write partial changes
413
+ try:
414
+ with open(abs_path, "w", encoding="utf-8") as f:
415
+ f.write(current_content)
416
+ except Exception as exc:
417
+ return _err("write_failed", "Failed to write file after partial edit.", path=display, data={"error": _safe_err(exc)})
418
+
419
+ return partial_msg
420
+
421
+ # All blocks succeeded - write file
422
+ try:
423
+ with open(abs_path, "w", encoding="utf-8") as f:
424
+ f.write(current_content)
425
+ except Exception as exc:
426
+ return _err("write_failed", "Failed to write file.", path=display, data={"error": _safe_err(exc)})
427
+
428
+ # Build success message
429
+ original_lines = len(file_content.splitlines())
430
+ new_lines = len(current_content.splitlines())
431
+ line_delta = new_lines - original_lines
432
+ delta_str = f"+{line_delta}" if line_delta > 0 else str(line_delta)
433
+
434
+ result = f"Edited {display}: {applied} block{'s' if applied != 1 else ''} applied"
435
+ if line_delta != 0:
436
+ result += f" ({delta_str} lines)"
437
+
438
+ if warnings:
439
+ result += "\n\nWarnings:\n" + "\n".join(warnings)
440
+
441
+ return result
442
+
443
  # ---------------------------------------------------------------------------
444
  # Read-only operations delegated to sandbox
445
  # ---------------------------------------------------------------------------
 
484
 
485
  @autodoc(summary=TOOL_SUMMARY)
486
  def File_System(
487
+ action: Annotated[str, "Operation to perform: 'list', 'read', 'write', 'append', 'edit', 'mkdir', 'move', 'copy', 'delete', 'info', 'search'."],
488
  path: Annotated[str, "Target path, relative to root unless UNSAFE_ALLOW_ABS_PATHS=1."] = "/",
489
  content: Annotated[Optional[str], "Content for write/append actions or search query (UTF-8)."] = None,
490
  dest_path: Annotated[Optional[str], "Destination for move/copy (relative to root unless unsafe absolute allowed)."] = None,
 
512
  case_sensitive=case_sensitive,
513
  )
514
  action = (action or "").strip().lower()
515
+ if action not in {"list", "read", "write", "append", "edit", "mkdir", "move", "copy", "delete", "info", "search", "help"}:
516
  result = _err(
517
  "invalid_action",
518
  "Invalid action.",
519
+ hint="Choose from: list, read, write, append, edit, mkdir, move, copy, delete, info, search, help."
520
  )
521
  _log_call_end("File_System", _truncate_for_log(result))
522
  return result
 
547
  )
548
  else:
549
  result = _write_file(abs_path, content or "", append=(action == "append"), create_dirs=create_dirs)
550
+ elif action == "edit":
551
+ result = _edit_file(abs_path, content or "")
552
  elif action == "mkdir":
553
  result = _mkdir(abs_path, exist_ok=True)
554
  elif action in {"move", "copy"}:
 
595
  inputs=[
596
  gr.Radio(
597
  label="Action",
598
+ choices=["list", "read", "write", "append", "edit", "mkdir", "move", "copy", "delete", "info", "search", "help"],
599
  value="help",
600
  info="Operation to perform",
601
  ),
602
  gr.Textbox(label="Path", placeholder="/ or /src/file.txt", max_lines=1, value="/", info="Target path (relative to root)"),
603
+ gr.Textbox(label="Content", lines=6, placeholder="Text to write, SEARCH/REPLACE blocks for edit, or search query...", value="<<<<<<< SEARCH\n[exact text to find in the file]\n=======\n[exact text to replace it with]\n>>>>>>> REPLACE", info="Content for write/append/edit actions or search query"),
604
  gr.Textbox(label="Destination", max_lines=1, info="Destination path (Move/Copy only)"),
605
  gr.Checkbox(label="Recursive", value=False, info="Recurse into subfolders (List/Delete/Search)"),
606
  gr.Checkbox(label="Show hidden", value=False, info="Include hidden files (List/Search)"),