PYAE1994's picture
feat: GOD MODE+ v4.0 - tools/filesystem.py
9f5c2db verified
"""
FileSystem Tool β€” Real Agent File Operations
readFile / writeFile / patchFile / deleteFile / moveFile / searchFiles / tree
"""
import asyncio
import difflib
import fnmatch
import json
import os
import re
import shutil
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import structlog
log = structlog.get_logger()
WORKSPACE = os.environ.get("WORKSPACE_DIR", "/tmp/god_workspace")
class FileSystemTool:
def __init__(self, workspace: str = WORKSPACE):
self.workspace = workspace
os.makedirs(workspace, exist_ok=True)
def _safe_path(self, filename: str) -> str:
"""Resolve path safely within workspace."""
if filename.startswith("/tmp/god_workspace") or filename.startswith(self.workspace):
resolved = filename
else:
resolved = os.path.join(self.workspace, filename.lstrip("/"))
# Security: ensure within workspace
real = os.path.realpath(resolved)
ws_real = os.path.realpath(self.workspace)
if not real.startswith(ws_real):
raise PermissionError(f"Path escape attempt: {filename}")
return resolved
# ─── Read ─────────────────────────────────────────────────────────────────
async def read_file(self, filename: str, encoding: str = "utf-8") -> Dict:
try:
path = self._safe_path(filename)
with open(path, "r", encoding=encoding, errors="replace") as f:
content = f.read()
lines = content.split("\n")
return {
"success": True,
"filename": filename,
"content": content,
"lines": len(lines),
"size": len(content),
"path": path,
}
except FileNotFoundError:
return {"success": False, "error": f"File not found: {filename}"}
except Exception as e:
return {"success": False, "error": str(e)}
# ─── Write ────────────────────────────────────────────────────────────────
async def write_file(self, filename: str, content: str, encoding: str = "utf-8") -> Dict:
try:
path = self._safe_path(filename)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding=encoding) as f:
f.write(content)
return {
"success": True,
"filename": filename,
"path": path,
"size": len(content),
"lines": len(content.split("\n")),
"action": "written",
}
except PermissionError as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": str(e)}
# ─── Patch (smart diff apply) ─────────────────────────────────────────────
async def patch_file(self, filename: str, old_str: str, new_str: str) -> Dict:
"""Replace old_str with new_str in file (like Cursor/Devin style edit)."""
try:
result = await self.read_file(filename)
if not result["success"]:
return result
content = result["content"]
if old_str not in content:
return {
"success": False,
"error": f"Pattern not found in {filename}",
"hint": "Use write_file to create from scratch",
}
new_content = content.replace(old_str, new_str, 1)
diff = list(difflib.unified_diff(
content.splitlines(keepends=True),
new_content.splitlines(keepends=True),
fromfile=f"a/{filename}",
tofile=f"b/{filename}",
))
await self.write_file(filename, new_content)
return {
"success": True,
"filename": filename,
"action": "patched",
"diff": "".join(diff[:50]),
"lines_changed": len([l for l in diff if l.startswith(("+", "-")) and not l.startswith(("+++", "---"))]),
}
except Exception as e:
return {"success": False, "error": str(e)}
# ─── Delete ───────────────────────────────────────────────────────────────
async def delete_file(self, filename: str) -> Dict:
try:
path = self._safe_path(filename)
if os.path.isdir(path):
shutil.rmtree(path)
return {"success": True, "filename": filename, "action": "directory_deleted"}
os.remove(path)
return {"success": True, "filename": filename, "action": "deleted"}
except FileNotFoundError:
return {"success": False, "error": f"Not found: {filename}"}
except Exception as e:
return {"success": False, "error": str(e)}
# ─── Move / Rename ────────────────────────────────────────────────────────
async def move_file(self, src: str, dst: str) -> Dict:
try:
src_path = self._safe_path(src)
dst_path = self._safe_path(dst)
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
shutil.move(src_path, dst_path)
return {"success": True, "src": src, "dst": dst, "action": "moved"}
except Exception as e:
return {"success": False, "error": str(e)}
# ─── Copy ─────────────────────────────────────────────────────────────────
async def copy_file(self, src: str, dst: str) -> Dict:
try:
src_path = self._safe_path(src)
dst_path = self._safe_path(dst)
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
shutil.copy2(src_path, dst_path)
return {"success": True, "src": src, "dst": dst, "action": "copied"}
except Exception as e:
return {"success": False, "error": str(e)}
# ─── Search Files ─────────────────────────────────────────────────────────
async def search_files(
self,
query: str,
path: str = "",
pattern: str = "*",
max_results: int = 50,
) -> Dict:
"""Search file contents for query string."""
try:
base = self._safe_path(path) if path else self.workspace
results = []
for root, dirs, files in os.walk(base):
dirs[:] = [d for d in dirs if not d.startswith(".") and d not in ("node_modules", "__pycache__", ".git", ".next", "dist", "build")]
for fname in files:
if not fnmatch.fnmatch(fname, pattern):
continue
fpath = os.path.join(root, fname)
try:
with open(fpath, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
for i, line in enumerate(lines, 1):
if query.lower() in line.lower():
rel = os.path.relpath(fpath, self.workspace)
results.append({
"file": rel,
"line": i,
"content": line.strip()[:200],
})
if len(results) >= max_results:
break
except Exception:
continue
if len(results) >= max_results:
break
return {"success": True, "query": query, "results": results, "count": len(results)}
except Exception as e:
return {"success": False, "error": str(e)}
# ─── Tree ─────────────────────────────────────────────────────────────────
async def tree(self, path: str = "", max_depth: int = 4) -> Dict:
"""Generate directory tree."""
try:
base = self._safe_path(path) if path else self.workspace
lines = []
self._walk_tree(base, lines, prefix="", depth=0, max_depth=max_depth)
return {
"success": True,
"path": os.path.relpath(base, self.workspace) or ".",
"tree": "\n".join(lines),
"full_path": base,
}
except Exception as e:
return {"success": False, "error": str(e)}
def _walk_tree(self, path: str, lines: list, prefix: str, depth: int, max_depth: int):
if depth > max_depth:
return
try:
entries = sorted(os.listdir(path))
except PermissionError:
return
skip = {".git", "node_modules", "__pycache__", ".next", "dist", "build", ".venv", "venv"}
entries = [e for e in entries if e not in skip]
for i, entry in enumerate(entries):
full = os.path.join(path, entry)
is_last = i == len(entries) - 1
connector = "└── " if is_last else "β”œβ”€β”€ "
suffix = "/" if os.path.isdir(full) else ""
lines.append(f"{prefix}{connector}{entry}{suffix}")
if os.path.isdir(full):
ext = " " if is_last else "β”‚ "
self._walk_tree(full, lines, prefix + ext, depth + 1, max_depth)
# ─── Make Directory ───────────────────────────────────────────────────────
async def mkdir(self, path: str) -> Dict:
try:
full = self._safe_path(path)
os.makedirs(full, exist_ok=True)
return {"success": True, "path": path, "action": "created"}
except Exception as e:
return {"success": False, "error": str(e)}
# ─── List Directory ───────────────────────────────────────────────────────
async def list_dir(self, path: str = "") -> Dict:
try:
base = self._safe_path(path) if path else self.workspace
entries = []
for e in sorted(os.listdir(base)):
full = os.path.join(base, e)
stat = os.stat(full)
entries.append({
"name": e,
"type": "dir" if os.path.isdir(full) else "file",
"size": stat.st_size,
"modified": int(stat.st_mtime),
})
return {"success": True, "path": path or ".", "entries": entries}
except Exception as e:
return {"success": False, "error": str(e)}
# ─── Grep ─────────────────────────────────────────────────────────────────
async def grep(self, pattern: str, path: str = "", flags: str = "i") -> Dict:
"""Regex grep across workspace files."""
try:
base = self._safe_path(path) if path else self.workspace
re_flags = re.IGNORECASE if "i" in flags else 0
regex = re.compile(pattern, re_flags)
results = []
for root, dirs, files in os.walk(base):
dirs[:] = [d for d in dirs if d not in {"node_modules", "__pycache__", ".git", ".next"}]
for fname in files:
fpath = os.path.join(root, fname)
try:
with open(fpath, "r", encoding="utf-8", errors="replace") as f:
for i, line in enumerate(f, 1):
if regex.search(line):
rel = os.path.relpath(fpath, self.workspace)
results.append({"file": rel, "line": i, "content": line.strip()[:200]})
if len(results) >= 100:
break
except Exception:
continue
if len(results) >= 100:
break
return {"success": True, "pattern": pattern, "results": results, "count": len(results)}
except re.error as e:
return {"success": False, "error": f"Invalid regex: {e}"}
except Exception as e:
return {"success": False, "error": str(e)}
# ─── Bulk Write (multiple files at once) ──────────────────────────────────
async def write_files(self, files: List[Dict]) -> Dict:
"""Write multiple files at once. files = [{"filename": ..., "content": ...}]"""
results = []
for f in files:
r = await self.write_file(f["filename"], f["content"])
results.append(r)
success_count = sum(1 for r in results if r.get("success"))
return {
"success": success_count == len(files),
"total": len(files),
"succeeded": success_count,
"failed": len(files) - success_count,
"results": results,
}