File size: 8,077 Bytes
891669b
d715ed0
 
 
 
891669b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
adbf39e
 
891669b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e404f67
 
891669b
 
 
 
 
d715ed0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
891669b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
adbf39e
 
 
 
 
891669b
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# ---- Changelog ----
# [2026-04-06] Josh + Claude β€” Add edit_file method (find-and-replace)
# What: New edit_file method on FilesystemTool for targeted file edits
# Why: Gap 3 β€” write_file does full overwrite which risks corruption on cross-repo work
# How: Read file, verify old_text exists exactly once, replace with new_text, write back
# [2026-03-29] Chisel/TQB β€” Block C: FilesystemTool
# What: read_file, write_file, list_files extracted from RecursiveContextManager
# Why: PRD Block C β€” single-responsibility tool classes
# How: Each method gates through policy_engine when present; list_files now respects max_depth
# [2026-03-29] Razor/TQB β€” Block A: Security Hardening
# What: Path traversal protection, file size guard, proper PolicyEngine integration
# Why: PRD Block A β€” is_relative_to() enforced, file size check before read, content secret scan
# How: Resolve + is_relative_to on every op; 10MB read guard; PolicyEngine returns (bool, reason)
# -------------------

import logging
from pathlib import Path

logger = logging.getLogger("tools.filesystem")

# Maximum file size to read into memory (bytes)
MAX_READ_SIZE = 10 * 1024 * 1024  # 10MB


class FilesystemTool:
    """Filesystem read/write/list operations scoped to repo_path."""

    def __init__(self, repo_path: Path, policy_engine=None):
        self.repo_path = repo_path
        self.policy_engine = policy_engine

    def _check_path(self, path: str, mode: str) -> tuple:
        """Resolve path and enforce workspace boundary.

        Returns (resolved_path, error_dict_or_None).
        """
        target = (self.repo_path / path).resolve()
        if not target.is_relative_to(self.repo_path.resolve()):
            msg = f"Path outside workspace boundary: {path}"
            logger.warning(msg)
            return None, {"status": "error", "tool": "filesystem", "error": msg, "type": "PermissionError"}

        if self.policy_engine:
            from policy_engine import check_tool_call
            tool_name = "read_file" if mode == "read" else "write_file"
            args = {"path": str(target)}
            allowed, reason = check_tool_call(tool_name, args, self.repo_path)
            if not allowed:
                return None, {"status": "error", "tool": "filesystem", "error": reason, "type": "PermissionError"}

        return target, None

    def read_file(self, path: str, start_line: int = None, end_line: int = None) -> str:
        target, err = self._check_path(path, "read")
        if err:
            return err

        try:
            # File size guard β€” check BEFORE reading into memory
            if target.exists() and target.stat().st_size > MAX_READ_SIZE:
                msg = f"File too large ({target.stat().st_size:,} bytes). Max: {MAX_READ_SIZE:,} bytes."
                logger.warning("read_file rejected: %s β€” %s", path, msg)
                return {"status": "error", "tool": "filesystem", "error": msg, "type": "ValueError"}

            content = target.read_text(encoding='utf-8', errors='ignore')
            lines = content.splitlines()
            if start_line is not None and end_line is not None:
                # Tool schema says 1-based β€” convert to 0-based for Python slicing
                lines = lines[max(0, start_line - 1):end_line]
            return "\n".join(lines)
        except FileNotFoundError:
            return {"status": "error", "tool": "filesystem", "error": f"File not found: {path}", "type": "FileNotFoundError"}
        except PermissionError as e:
            return {"status": "error", "tool": "filesystem", "error": str(e), "type": "PermissionError"}
        except OSError as e:
            return {"status": "error", "tool": "filesystem", "error": str(e), "type": type(e).__name__}

    def write_file(self, path: str, content: str) -> str:
        target, err = self._check_path(path, "write")
        if err:
            return err

        # Content secret scan via PolicyEngine
        if self.policy_engine:
            from policy_engine import can_write_content
            allowed, reason = can_write_content(path, content)
            if not allowed:
                return {"status": "error", "tool": "filesystem", "error": reason, "type": "PermissionError"}

        try:
            target.parent.mkdir(parents=True, exist_ok=True)
            target.write_text(content, encoding='utf-8')
            size = target.stat().st_size
            return f"Written to {target} ({size:,} bytes)"
        except PermissionError as e:
            return {"status": "error", "tool": "filesystem", "error": str(e), "type": "PermissionError"}
        except OSError as e:
            return {"status": "error", "tool": "filesystem", "error": str(e), "type": type(e).__name__}

    def edit_file(self, path: str, old_text: str, new_text: str) -> str:
        """Find-and-replace edit: verify old_text exists exactly once, replace with new_text."""
        target, err = self._check_path(path, "write")
        if err:
            return err

        # Content secret scan on the new text via PolicyEngine
        if self.policy_engine:
            from policy_engine import can_write_content
            allowed, reason = can_write_content(path, new_text)
            if not allowed:
                return {"status": "error", "tool": "filesystem", "error": reason, "type": "PermissionError"}

        try:
            if not target.exists():
                return {"status": "error", "tool": "filesystem", "error": f"File not found: {path}", "type": "FileNotFoundError"}

            # File size guard
            if target.stat().st_size > MAX_READ_SIZE:
                msg = f"File too large ({target.stat().st_size:,} bytes). Max: {MAX_READ_SIZE:,} bytes."
                return {"status": "error", "tool": "filesystem", "error": msg, "type": "ValueError"}

            content = target.read_text(encoding='utf-8', errors='ignore')
            count = content.count(old_text)

            if count == 0:
                return {"status": "error", "tool": "filesystem", "error": f"old_text not found in {path}", "type": "ValueError"}
            if count > 1:
                return {"status": "error", "tool": "filesystem", "error": f"old_text found {count} times in {path} β€” must be unique (provide more context)", "type": "ValueError"}

            new_content = content.replace(old_text, new_text, 1)
            target.write_text(new_content, encoding='utf-8')
            size = target.stat().st_size
            return f"Edited {target} β€” replaced 1 occurrence ({size:,} bytes)"

        except PermissionError as e:
            return {"status": "error", "tool": "filesystem", "error": str(e), "type": "PermissionError"}
        except OSError as e:
            return {"status": "error", "tool": "filesystem", "error": str(e), "type": type(e).__name__}

    def list_files(self, path: str = ".", max_depth: int = 3) -> str:
        target, err = self._check_path(path, "read")
        if err:
            return err

        try:
            if not target.exists():
                return "Path not found."
            files = []
            for p in target.rglob("*"):
                if not p.is_file():
                    continue
                if any(part.startswith(".") for part in p.parts):
                    continue
                try:
                    rel = p.relative_to(target)
                except ValueError:
                    continue
                if len(rel.parts) > max_depth:
                    continue
                files.append(str(p.relative_to(self.repo_path)))
            total = len(files)
            listing = "\n".join(files[:50])
            if total > 50:
                listing += f"\n\n(showing 50 of {total} files)"
            return listing
        except PermissionError as e:
            return {"status": "error", "tool": "filesystem", "error": str(e), "type": "PermissionError"}
        except OSError as e:
            return {"status": "error", "tool": "filesystem", "error": str(e), "type": type(e).__name__}