File size: 8,077 Bytes
891669b d715ed0 891669b adbf39e 891669b e404f67 891669b d715ed0 891669b adbf39e 891669b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | # ---- Changelog ----
# [2026-04-06] Josh + Claude β Add edit_file method (find-and-replace)
# What: New edit_file method on FilesystemTool for targeted file edits
# Why: Gap 3 β write_file does full overwrite which risks corruption on cross-repo work
# How: Read file, verify old_text exists exactly once, replace with new_text, write back
# [2026-03-29] Chisel/TQB β Block C: FilesystemTool
# What: read_file, write_file, list_files extracted from RecursiveContextManager
# Why: PRD Block C β single-responsibility tool classes
# How: Each method gates through policy_engine when present; list_files now respects max_depth
# [2026-03-29] Razor/TQB β Block A: Security Hardening
# What: Path traversal protection, file size guard, proper PolicyEngine integration
# Why: PRD Block A β is_relative_to() enforced, file size check before read, content secret scan
# How: Resolve + is_relative_to on every op; 10MB read guard; PolicyEngine returns (bool, reason)
# -------------------
import logging
from pathlib import Path
logger = logging.getLogger("tools.filesystem")
# Maximum file size to read into memory (bytes)
MAX_READ_SIZE = 10 * 1024 * 1024 # 10MB
class FilesystemTool:
"""Filesystem read/write/list operations scoped to repo_path."""
def __init__(self, repo_path: Path, policy_engine=None):
self.repo_path = repo_path
self.policy_engine = policy_engine
def _check_path(self, path: str, mode: str) -> tuple:
"""Resolve path and enforce workspace boundary.
Returns (resolved_path, error_dict_or_None).
"""
target = (self.repo_path / path).resolve()
if not target.is_relative_to(self.repo_path.resolve()):
msg = f"Path outside workspace boundary: {path}"
logger.warning(msg)
return None, {"status": "error", "tool": "filesystem", "error": msg, "type": "PermissionError"}
if self.policy_engine:
from policy_engine import check_tool_call
tool_name = "read_file" if mode == "read" else "write_file"
args = {"path": str(target)}
allowed, reason = check_tool_call(tool_name, args, self.repo_path)
if not allowed:
return None, {"status": "error", "tool": "filesystem", "error": reason, "type": "PermissionError"}
return target, None
def read_file(self, path: str, start_line: int = None, end_line: int = None) -> str:
target, err = self._check_path(path, "read")
if err:
return err
try:
# File size guard β check BEFORE reading into memory
if target.exists() and target.stat().st_size > MAX_READ_SIZE:
msg = f"File too large ({target.stat().st_size:,} bytes). Max: {MAX_READ_SIZE:,} bytes."
logger.warning("read_file rejected: %s β %s", path, msg)
return {"status": "error", "tool": "filesystem", "error": msg, "type": "ValueError"}
content = target.read_text(encoding='utf-8', errors='ignore')
lines = content.splitlines()
if start_line is not None and end_line is not None:
# Tool schema says 1-based β convert to 0-based for Python slicing
lines = lines[max(0, start_line - 1):end_line]
return "\n".join(lines)
except FileNotFoundError:
return {"status": "error", "tool": "filesystem", "error": f"File not found: {path}", "type": "FileNotFoundError"}
except PermissionError as e:
return {"status": "error", "tool": "filesystem", "error": str(e), "type": "PermissionError"}
except OSError as e:
return {"status": "error", "tool": "filesystem", "error": str(e), "type": type(e).__name__}
def write_file(self, path: str, content: str) -> str:
target, err = self._check_path(path, "write")
if err:
return err
# Content secret scan via PolicyEngine
if self.policy_engine:
from policy_engine import can_write_content
allowed, reason = can_write_content(path, content)
if not allowed:
return {"status": "error", "tool": "filesystem", "error": reason, "type": "PermissionError"}
try:
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(content, encoding='utf-8')
size = target.stat().st_size
return f"Written to {target} ({size:,} bytes)"
except PermissionError as e:
return {"status": "error", "tool": "filesystem", "error": str(e), "type": "PermissionError"}
except OSError as e:
return {"status": "error", "tool": "filesystem", "error": str(e), "type": type(e).__name__}
def edit_file(self, path: str, old_text: str, new_text: str) -> str:
"""Find-and-replace edit: verify old_text exists exactly once, replace with new_text."""
target, err = self._check_path(path, "write")
if err:
return err
# Content secret scan on the new text via PolicyEngine
if self.policy_engine:
from policy_engine import can_write_content
allowed, reason = can_write_content(path, new_text)
if not allowed:
return {"status": "error", "tool": "filesystem", "error": reason, "type": "PermissionError"}
try:
if not target.exists():
return {"status": "error", "tool": "filesystem", "error": f"File not found: {path}", "type": "FileNotFoundError"}
# File size guard
if target.stat().st_size > MAX_READ_SIZE:
msg = f"File too large ({target.stat().st_size:,} bytes). Max: {MAX_READ_SIZE:,} bytes."
return {"status": "error", "tool": "filesystem", "error": msg, "type": "ValueError"}
content = target.read_text(encoding='utf-8', errors='ignore')
count = content.count(old_text)
if count == 0:
return {"status": "error", "tool": "filesystem", "error": f"old_text not found in {path}", "type": "ValueError"}
if count > 1:
return {"status": "error", "tool": "filesystem", "error": f"old_text found {count} times in {path} β must be unique (provide more context)", "type": "ValueError"}
new_content = content.replace(old_text, new_text, 1)
target.write_text(new_content, encoding='utf-8')
size = target.stat().st_size
return f"Edited {target} β replaced 1 occurrence ({size:,} bytes)"
except PermissionError as e:
return {"status": "error", "tool": "filesystem", "error": str(e), "type": "PermissionError"}
except OSError as e:
return {"status": "error", "tool": "filesystem", "error": str(e), "type": type(e).__name__}
def list_files(self, path: str = ".", max_depth: int = 3) -> str:
target, err = self._check_path(path, "read")
if err:
return err
try:
if not target.exists():
return "Path not found."
files = []
for p in target.rglob("*"):
if not p.is_file():
continue
if any(part.startswith(".") for part in p.parts):
continue
try:
rel = p.relative_to(target)
except ValueError:
continue
if len(rel.parts) > max_depth:
continue
files.append(str(p.relative_to(self.repo_path)))
total = len(files)
listing = "\n".join(files[:50])
if total > 50:
listing += f"\n\n(showing 50 of {total} files)"
return listing
except PermissionError as e:
return {"status": "error", "tool": "filesystem", "error": str(e), "type": "PermissionError"}
except OSError as e:
return {"status": "error", "tool": "filesystem", "error": str(e), "type": type(e).__name__}
|