PYAE1994 commited on
Commit
9f5c2db
Β·
verified Β·
1 Parent(s): 87fb933

feat: GOD MODE+ v4.0 - tools/filesystem.py

Browse files
Files changed (1) hide show
  1. tools/filesystem.py +306 -0
tools/filesystem.py ADDED
@@ -0,0 +1,306 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ FileSystem Tool β€” Real Agent File Operations
3
+ readFile / writeFile / patchFile / deleteFile / moveFile / searchFiles / tree
4
+ """
5
+
6
+ import asyncio
7
+ import difflib
8
+ import fnmatch
9
+ import json
10
+ import os
11
+ import re
12
+ import shutil
13
+ from pathlib import Path
14
+ from typing import Dict, List, Optional, Tuple
15
+
16
+ import structlog
17
+
18
+ log = structlog.get_logger()
19
+
20
+ WORKSPACE = os.environ.get("WORKSPACE_DIR", "/tmp/god_workspace")
21
+
22
+
23
+ class FileSystemTool:
24
+ def __init__(self, workspace: str = WORKSPACE):
25
+ self.workspace = workspace
26
+ os.makedirs(workspace, exist_ok=True)
27
+
28
+ def _safe_path(self, filename: str) -> str:
29
+ """Resolve path safely within workspace."""
30
+ if filename.startswith("/tmp/god_workspace") or filename.startswith(self.workspace):
31
+ resolved = filename
32
+ else:
33
+ resolved = os.path.join(self.workspace, filename.lstrip("/"))
34
+ # Security: ensure within workspace
35
+ real = os.path.realpath(resolved)
36
+ ws_real = os.path.realpath(self.workspace)
37
+ if not real.startswith(ws_real):
38
+ raise PermissionError(f"Path escape attempt: {filename}")
39
+ return resolved
40
+
41
+ # ─── Read ─────────────────────────────────────────────────────────────────
42
+
43
+ async def read_file(self, filename: str, encoding: str = "utf-8") -> Dict:
44
+ try:
45
+ path = self._safe_path(filename)
46
+ with open(path, "r", encoding=encoding, errors="replace") as f:
47
+ content = f.read()
48
+ lines = content.split("\n")
49
+ return {
50
+ "success": True,
51
+ "filename": filename,
52
+ "content": content,
53
+ "lines": len(lines),
54
+ "size": len(content),
55
+ "path": path,
56
+ }
57
+ except FileNotFoundError:
58
+ return {"success": False, "error": f"File not found: {filename}"}
59
+ except Exception as e:
60
+ return {"success": False, "error": str(e)}
61
+
62
+ # ─── Write ────────────────────────────────────────────────────────────────
63
+
64
+ async def write_file(self, filename: str, content: str, encoding: str = "utf-8") -> Dict:
65
+ try:
66
+ path = self._safe_path(filename)
67
+ os.makedirs(os.path.dirname(path), exist_ok=True)
68
+ with open(path, "w", encoding=encoding) as f:
69
+ f.write(content)
70
+ return {
71
+ "success": True,
72
+ "filename": filename,
73
+ "path": path,
74
+ "size": len(content),
75
+ "lines": len(content.split("\n")),
76
+ "action": "written",
77
+ }
78
+ except PermissionError as e:
79
+ return {"success": False, "error": str(e)}
80
+ except Exception as e:
81
+ return {"success": False, "error": str(e)}
82
+
83
+ # ─── Patch (smart diff apply) ─────────────────────────────────────────────
84
+
85
+ async def patch_file(self, filename: str, old_str: str, new_str: str) -> Dict:
86
+ """Replace old_str with new_str in file (like Cursor/Devin style edit)."""
87
+ try:
88
+ result = await self.read_file(filename)
89
+ if not result["success"]:
90
+ return result
91
+ content = result["content"]
92
+ if old_str not in content:
93
+ return {
94
+ "success": False,
95
+ "error": f"Pattern not found in {filename}",
96
+ "hint": "Use write_file to create from scratch",
97
+ }
98
+ new_content = content.replace(old_str, new_str, 1)
99
+ diff = list(difflib.unified_diff(
100
+ content.splitlines(keepends=True),
101
+ new_content.splitlines(keepends=True),
102
+ fromfile=f"a/{filename}",
103
+ tofile=f"b/{filename}",
104
+ ))
105
+ await self.write_file(filename, new_content)
106
+ return {
107
+ "success": True,
108
+ "filename": filename,
109
+ "action": "patched",
110
+ "diff": "".join(diff[:50]),
111
+ "lines_changed": len([l for l in diff if l.startswith(("+", "-")) and not l.startswith(("+++", "---"))]),
112
+ }
113
+ except Exception as e:
114
+ return {"success": False, "error": str(e)}
115
+
116
+ # ─── Delete ───────────────────────────────────────────────────────────────
117
+
118
+ async def delete_file(self, filename: str) -> Dict:
119
+ try:
120
+ path = self._safe_path(filename)
121
+ if os.path.isdir(path):
122
+ shutil.rmtree(path)
123
+ return {"success": True, "filename": filename, "action": "directory_deleted"}
124
+ os.remove(path)
125
+ return {"success": True, "filename": filename, "action": "deleted"}
126
+ except FileNotFoundError:
127
+ return {"success": False, "error": f"Not found: {filename}"}
128
+ except Exception as e:
129
+ return {"success": False, "error": str(e)}
130
+
131
+ # ─── Move / Rename ────────────────────────────────────────────────────────
132
+
133
+ async def move_file(self, src: str, dst: str) -> Dict:
134
+ try:
135
+ src_path = self._safe_path(src)
136
+ dst_path = self._safe_path(dst)
137
+ os.makedirs(os.path.dirname(dst_path), exist_ok=True)
138
+ shutil.move(src_path, dst_path)
139
+ return {"success": True, "src": src, "dst": dst, "action": "moved"}
140
+ except Exception as e:
141
+ return {"success": False, "error": str(e)}
142
+
143
+ # ─── Copy ─────────────────────────────────────────────────────────────────
144
+
145
+ async def copy_file(self, src: str, dst: str) -> Dict:
146
+ try:
147
+ src_path = self._safe_path(src)
148
+ dst_path = self._safe_path(dst)
149
+ os.makedirs(os.path.dirname(dst_path), exist_ok=True)
150
+ shutil.copy2(src_path, dst_path)
151
+ return {"success": True, "src": src, "dst": dst, "action": "copied"}
152
+ except Exception as e:
153
+ return {"success": False, "error": str(e)}
154
+
155
+ # ─── Search Files ─────────────────────────────────────────────────────────
156
+
157
+ async def search_files(
158
+ self,
159
+ query: str,
160
+ path: str = "",
161
+ pattern: str = "*",
162
+ max_results: int = 50,
163
+ ) -> Dict:
164
+ """Search file contents for query string."""
165
+ try:
166
+ base = self._safe_path(path) if path else self.workspace
167
+ results = []
168
+ for root, dirs, files in os.walk(base):
169
+ dirs[:] = [d for d in dirs if not d.startswith(".") and d not in ("node_modules", "__pycache__", ".git", ".next", "dist", "build")]
170
+ for fname in files:
171
+ if not fnmatch.fnmatch(fname, pattern):
172
+ continue
173
+ fpath = os.path.join(root, fname)
174
+ try:
175
+ with open(fpath, "r", encoding="utf-8", errors="replace") as f:
176
+ lines = f.readlines()
177
+ for i, line in enumerate(lines, 1):
178
+ if query.lower() in line.lower():
179
+ rel = os.path.relpath(fpath, self.workspace)
180
+ results.append({
181
+ "file": rel,
182
+ "line": i,
183
+ "content": line.strip()[:200],
184
+ })
185
+ if len(results) >= max_results:
186
+ break
187
+ except Exception:
188
+ continue
189
+ if len(results) >= max_results:
190
+ break
191
+ return {"success": True, "query": query, "results": results, "count": len(results)}
192
+ except Exception as e:
193
+ return {"success": False, "error": str(e)}
194
+
195
+ # ─── Tree ─────────────────────────────────────────────────────────────────
196
+
197
+ async def tree(self, path: str = "", max_depth: int = 4) -> Dict:
198
+ """Generate directory tree."""
199
+ try:
200
+ base = self._safe_path(path) if path else self.workspace
201
+ lines = []
202
+ self._walk_tree(base, lines, prefix="", depth=0, max_depth=max_depth)
203
+ return {
204
+ "success": True,
205
+ "path": os.path.relpath(base, self.workspace) or ".",
206
+ "tree": "\n".join(lines),
207
+ "full_path": base,
208
+ }
209
+ except Exception as e:
210
+ return {"success": False, "error": str(e)}
211
+
212
+ def _walk_tree(self, path: str, lines: list, prefix: str, depth: int, max_depth: int):
213
+ if depth > max_depth:
214
+ return
215
+ try:
216
+ entries = sorted(os.listdir(path))
217
+ except PermissionError:
218
+ return
219
+ skip = {".git", "node_modules", "__pycache__", ".next", "dist", "build", ".venv", "venv"}
220
+ entries = [e for e in entries if e not in skip]
221
+ for i, entry in enumerate(entries):
222
+ full = os.path.join(path, entry)
223
+ is_last = i == len(entries) - 1
224
+ connector = "└── " if is_last else "β”œβ”€β”€ "
225
+ suffix = "/" if os.path.isdir(full) else ""
226
+ lines.append(f"{prefix}{connector}{entry}{suffix}")
227
+ if os.path.isdir(full):
228
+ ext = " " if is_last else "β”‚ "
229
+ self._walk_tree(full, lines, prefix + ext, depth + 1, max_depth)
230
+
231
+ # ─── Make Directory ───────────────────────────────────────────────────────
232
+
233
+ async def mkdir(self, path: str) -> Dict:
234
+ try:
235
+ full = self._safe_path(path)
236
+ os.makedirs(full, exist_ok=True)
237
+ return {"success": True, "path": path, "action": "created"}
238
+ except Exception as e:
239
+ return {"success": False, "error": str(e)}
240
+
241
+ # ─── List Directory ───────────────────────────────────────────────────────
242
+
243
+ async def list_dir(self, path: str = "") -> Dict:
244
+ try:
245
+ base = self._safe_path(path) if path else self.workspace
246
+ entries = []
247
+ for e in sorted(os.listdir(base)):
248
+ full = os.path.join(base, e)
249
+ stat = os.stat(full)
250
+ entries.append({
251
+ "name": e,
252
+ "type": "dir" if os.path.isdir(full) else "file",
253
+ "size": stat.st_size,
254
+ "modified": int(stat.st_mtime),
255
+ })
256
+ return {"success": True, "path": path or ".", "entries": entries}
257
+ except Exception as e:
258
+ return {"success": False, "error": str(e)}
259
+
260
+ # ─── Grep ─────────────────────────────────────────────────────────────────
261
+
262
+ async def grep(self, pattern: str, path: str = "", flags: str = "i") -> Dict:
263
+ """Regex grep across workspace files."""
264
+ try:
265
+ base = self._safe_path(path) if path else self.workspace
266
+ re_flags = re.IGNORECASE if "i" in flags else 0
267
+ regex = re.compile(pattern, re_flags)
268
+ results = []
269
+ for root, dirs, files in os.walk(base):
270
+ dirs[:] = [d for d in dirs if d not in {"node_modules", "__pycache__", ".git", ".next"}]
271
+ for fname in files:
272
+ fpath = os.path.join(root, fname)
273
+ try:
274
+ with open(fpath, "r", encoding="utf-8", errors="replace") as f:
275
+ for i, line in enumerate(f, 1):
276
+ if regex.search(line):
277
+ rel = os.path.relpath(fpath, self.workspace)
278
+ results.append({"file": rel, "line": i, "content": line.strip()[:200]})
279
+ if len(results) >= 100:
280
+ break
281
+ except Exception:
282
+ continue
283
+ if len(results) >= 100:
284
+ break
285
+ return {"success": True, "pattern": pattern, "results": results, "count": len(results)}
286
+ except re.error as e:
287
+ return {"success": False, "error": f"Invalid regex: {e}"}
288
+ except Exception as e:
289
+ return {"success": False, "error": str(e)}
290
+
291
+ # ─── Bulk Write (multiple files at once) ──────────────────────────────────
292
+
293
+ async def write_files(self, files: List[Dict]) -> Dict:
294
+ """Write multiple files at once. files = [{"filename": ..., "content": ...}]"""
295
+ results = []
296
+ for f in files:
297
+ r = await self.write_file(f["filename"], f["content"])
298
+ results.append(r)
299
+ success_count = sum(1 for r in results if r.get("success"))
300
+ return {
301
+ "success": success_count == len(files),
302
+ "total": len(files),
303
+ "succeeded": success_count,
304
+ "failed": len(files) - success_count,
305
+ "results": results,
306
+ }