""" FileSystem Tool — Real Agent File Operations readFile / writeFile / patchFile / deleteFile / moveFile / searchFiles / tree """ import asyncio import difflib import fnmatch import json import os import re import shutil from pathlib import Path from typing import Dict, List, Optional, Tuple import structlog log = structlog.get_logger() WORKSPACE = os.environ.get("WORKSPACE_DIR", "/tmp/god_workspace") class FileSystemTool: def __init__(self, workspace: str = WORKSPACE): self.workspace = workspace os.makedirs(workspace, exist_ok=True) def _safe_path(self, filename: str) -> str: """Resolve path safely within workspace.""" if filename.startswith("/tmp/god_workspace") or filename.startswith(self.workspace): resolved = filename else: resolved = os.path.join(self.workspace, filename.lstrip("/")) # Security: ensure within workspace real = os.path.realpath(resolved) ws_real = os.path.realpath(self.workspace) if not real.startswith(ws_real): raise PermissionError(f"Path escape attempt: {filename}") return resolved # ─── Read ───────────────────────────────────────────────────────────────── async def read_file(self, filename: str, encoding: str = "utf-8") -> Dict: try: path = self._safe_path(filename) with open(path, "r", encoding=encoding, errors="replace") as f: content = f.read() lines = content.split("\n") return { "success": True, "filename": filename, "content": content, "lines": len(lines), "size": len(content), "path": path, } except FileNotFoundError: return {"success": False, "error": f"File not found: {filename}"} except Exception as e: return {"success": False, "error": str(e)} # ─── Write ──────────────────────────────────────────────────────────────── async def write_file(self, filename: str, content: str, encoding: str = "utf-8") -> Dict: try: path = self._safe_path(filename) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding=encoding) as f: f.write(content) return { "success": True, "filename": filename, "path": path, "size": len(content), "lines": len(content.split("\n")), "action": "written", } except PermissionError as e: return {"success": False, "error": str(e)} except Exception as e: return {"success": False, "error": str(e)} # ─── Patch (smart diff apply) ───────────────────────────────────────────── async def patch_file(self, filename: str, old_str: str, new_str: str) -> Dict: """Replace old_str with new_str in file (like Cursor/Devin style edit).""" try: result = await self.read_file(filename) if not result["success"]: return result content = result["content"] if old_str not in content: return { "success": False, "error": f"Pattern not found in {filename}", "hint": "Use write_file to create from scratch", } new_content = content.replace(old_str, new_str, 1) diff = list(difflib.unified_diff( content.splitlines(keepends=True), new_content.splitlines(keepends=True), fromfile=f"a/{filename}", tofile=f"b/{filename}", )) await self.write_file(filename, new_content) return { "success": True, "filename": filename, "action": "patched", "diff": "".join(diff[:50]), "lines_changed": len([l for l in diff if l.startswith(("+", "-")) and not l.startswith(("+++", "---"))]), } except Exception as e: return {"success": False, "error": str(e)} # ─── Delete ─────────────────────────────────────────────────────────────── async def delete_file(self, filename: str) -> Dict: try: path = self._safe_path(filename) if os.path.isdir(path): shutil.rmtree(path) return {"success": True, "filename": filename, "action": "directory_deleted"} os.remove(path) return {"success": True, "filename": filename, "action": "deleted"} except FileNotFoundError: return {"success": False, "error": f"Not found: {filename}"} except Exception as e: return {"success": False, "error": str(e)} # ─── Move / Rename ──────────────────────────────────────────────────────── async def move_file(self, src: str, dst: str) -> Dict: try: src_path = self._safe_path(src) dst_path = self._safe_path(dst) os.makedirs(os.path.dirname(dst_path), exist_ok=True) shutil.move(src_path, dst_path) return {"success": True, "src": src, "dst": dst, "action": "moved"} except Exception as e: return {"success": False, "error": str(e)} # ─── Copy ───────────────────────────────────────────────────────────────── async def copy_file(self, src: str, dst: str) -> Dict: try: src_path = self._safe_path(src) dst_path = self._safe_path(dst) os.makedirs(os.path.dirname(dst_path), exist_ok=True) shutil.copy2(src_path, dst_path) return {"success": True, "src": src, "dst": dst, "action": "copied"} except Exception as e: return {"success": False, "error": str(e)} # ─── Search Files ───────────────────────────────────────────────────────── async def search_files( self, query: str, path: str = "", pattern: str = "*", max_results: int = 50, ) -> Dict: """Search file contents for query string.""" try: base = self._safe_path(path) if path else self.workspace results = [] for root, dirs, files in os.walk(base): dirs[:] = [d for d in dirs if not d.startswith(".") and d not in ("node_modules", "__pycache__", ".git", ".next", "dist", "build")] for fname in files: if not fnmatch.fnmatch(fname, pattern): continue fpath = os.path.join(root, fname) try: with open(fpath, "r", encoding="utf-8", errors="replace") as f: lines = f.readlines() for i, line in enumerate(lines, 1): if query.lower() in line.lower(): rel = os.path.relpath(fpath, self.workspace) results.append({ "file": rel, "line": i, "content": line.strip()[:200], }) if len(results) >= max_results: break except Exception: continue if len(results) >= max_results: break return {"success": True, "query": query, "results": results, "count": len(results)} except Exception as e: return {"success": False, "error": str(e)} # ─── Tree ───────────────────────────────────────────────────────────────── async def tree(self, path: str = "", max_depth: int = 4) -> Dict: """Generate directory tree.""" try: base = self._safe_path(path) if path else self.workspace lines = [] self._walk_tree(base, lines, prefix="", depth=0, max_depth=max_depth) return { "success": True, "path": os.path.relpath(base, self.workspace) or ".", "tree": "\n".join(lines), "full_path": base, } except Exception as e: return {"success": False, "error": str(e)} def _walk_tree(self, path: str, lines: list, prefix: str, depth: int, max_depth: int): if depth > max_depth: return try: entries = sorted(os.listdir(path)) except PermissionError: return skip = {".git", "node_modules", "__pycache__", ".next", "dist", "build", ".venv", "venv"} entries = [e for e in entries if e not in skip] for i, entry in enumerate(entries): full = os.path.join(path, entry) is_last = i == len(entries) - 1 connector = "└── " if is_last else "├── " suffix = "/" if os.path.isdir(full) else "" lines.append(f"{prefix}{connector}{entry}{suffix}") if os.path.isdir(full): ext = " " if is_last else "│ " self._walk_tree(full, lines, prefix + ext, depth + 1, max_depth) # ─── Make Directory ─────────────────────────────────────────────────────── async def mkdir(self, path: str) -> Dict: try: full = self._safe_path(path) os.makedirs(full, exist_ok=True) return {"success": True, "path": path, "action": "created"} except Exception as e: return {"success": False, "error": str(e)} # ─── List Directory ─────────────────────────────────────────────────────── async def list_dir(self, path: str = "") -> Dict: try: base = self._safe_path(path) if path else self.workspace entries = [] for e in sorted(os.listdir(base)): full = os.path.join(base, e) stat = os.stat(full) entries.append({ "name": e, "type": "dir" if os.path.isdir(full) else "file", "size": stat.st_size, "modified": int(stat.st_mtime), }) return {"success": True, "path": path or ".", "entries": entries} except Exception as e: return {"success": False, "error": str(e)} # ─── Grep ───────────────────────────────────────────────────────────────── async def grep(self, pattern: str, path: str = "", flags: str = "i") -> Dict: """Regex grep across workspace files.""" try: base = self._safe_path(path) if path else self.workspace re_flags = re.IGNORECASE if "i" in flags else 0 regex = re.compile(pattern, re_flags) results = [] for root, dirs, files in os.walk(base): dirs[:] = [d for d in dirs if d not in {"node_modules", "__pycache__", ".git", ".next"}] for fname in files: fpath = os.path.join(root, fname) try: with open(fpath, "r", encoding="utf-8", errors="replace") as f: for i, line in enumerate(f, 1): if regex.search(line): rel = os.path.relpath(fpath, self.workspace) results.append({"file": rel, "line": i, "content": line.strip()[:200]}) if len(results) >= 100: break except Exception: continue if len(results) >= 100: break return {"success": True, "pattern": pattern, "results": results, "count": len(results)} except re.error as e: return {"success": False, "error": f"Invalid regex: {e}"} except Exception as e: return {"success": False, "error": str(e)} # ─── Bulk Write (multiple files at once) ────────────────────────────────── async def write_files(self, files: List[Dict]) -> Dict: """Write multiple files at once. files = [{"filename": ..., "content": ...}]""" results = [] for f in files: r = await self.write_file(f["filename"], f["content"]) results.append(r) success_count = sum(1 for r in results if r.get("success")) return { "success": success_count == len(files), "total": len(files), "succeeded": success_count, "failed": len(files) - success_count, "results": results, }