| """ |
| FileSystem Tool β Real Agent File Operations |
| readFile / writeFile / patchFile / deleteFile / moveFile / searchFiles / tree |
| """ |
|
|
| import asyncio |
| import difflib |
| import fnmatch |
| import json |
| import os |
| import re |
| import shutil |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple |
|
|
| import structlog |
|
|
| log = structlog.get_logger() |
|
|
| WORKSPACE = os.environ.get("WORKSPACE_DIR", "/tmp/god_workspace") |
|
|
|
|
| class FileSystemTool: |
| def __init__(self, workspace: str = WORKSPACE): |
| self.workspace = workspace |
| os.makedirs(workspace, exist_ok=True) |
|
|
| def _safe_path(self, filename: str) -> str: |
| """Resolve path safely within workspace.""" |
| if filename.startswith("/tmp/god_workspace") or filename.startswith(self.workspace): |
| resolved = filename |
| else: |
| resolved = os.path.join(self.workspace, filename.lstrip("/")) |
| |
| real = os.path.realpath(resolved) |
| ws_real = os.path.realpath(self.workspace) |
| if not real.startswith(ws_real): |
| raise PermissionError(f"Path escape attempt: {filename}") |
| return resolved |
|
|
| |
|
|
| async def read_file(self, filename: str, encoding: str = "utf-8") -> Dict: |
| try: |
| path = self._safe_path(filename) |
| with open(path, "r", encoding=encoding, errors="replace") as f: |
| content = f.read() |
| lines = content.split("\n") |
| return { |
| "success": True, |
| "filename": filename, |
| "content": content, |
| "lines": len(lines), |
| "size": len(content), |
| "path": path, |
| } |
| except FileNotFoundError: |
| return {"success": False, "error": f"File not found: {filename}"} |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| |
|
|
| async def write_file(self, filename: str, content: str, encoding: str = "utf-8") -> Dict: |
| try: |
| path = self._safe_path(filename) |
| os.makedirs(os.path.dirname(path), exist_ok=True) |
| with open(path, "w", encoding=encoding) as f: |
| f.write(content) |
| return { |
| "success": True, |
| "filename": filename, |
| "path": path, |
| "size": len(content), |
| "lines": len(content.split("\n")), |
| "action": "written", |
| } |
| except PermissionError as e: |
| return {"success": False, "error": str(e)} |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| |
|
|
| async def patch_file(self, filename: str, old_str: str, new_str: str) -> Dict: |
| """Replace old_str with new_str in file (like Cursor/Devin style edit).""" |
| try: |
| result = await self.read_file(filename) |
| if not result["success"]: |
| return result |
| content = result["content"] |
| if old_str not in content: |
| return { |
| "success": False, |
| "error": f"Pattern not found in {filename}", |
| "hint": "Use write_file to create from scratch", |
| } |
| new_content = content.replace(old_str, new_str, 1) |
| diff = list(difflib.unified_diff( |
| content.splitlines(keepends=True), |
| new_content.splitlines(keepends=True), |
| fromfile=f"a/{filename}", |
| tofile=f"b/{filename}", |
| )) |
| await self.write_file(filename, new_content) |
| return { |
| "success": True, |
| "filename": filename, |
| "action": "patched", |
| "diff": "".join(diff[:50]), |
| "lines_changed": len([l for l in diff if l.startswith(("+", "-")) and not l.startswith(("+++", "---"))]), |
| } |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| |
|
|
| async def delete_file(self, filename: str) -> Dict: |
| try: |
| path = self._safe_path(filename) |
| if os.path.isdir(path): |
| shutil.rmtree(path) |
| return {"success": True, "filename": filename, "action": "directory_deleted"} |
| os.remove(path) |
| return {"success": True, "filename": filename, "action": "deleted"} |
| except FileNotFoundError: |
| return {"success": False, "error": f"Not found: {filename}"} |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| |
|
|
| async def move_file(self, src: str, dst: str) -> Dict: |
| try: |
| src_path = self._safe_path(src) |
| dst_path = self._safe_path(dst) |
| os.makedirs(os.path.dirname(dst_path), exist_ok=True) |
| shutil.move(src_path, dst_path) |
| return {"success": True, "src": src, "dst": dst, "action": "moved"} |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| |
|
|
| async def copy_file(self, src: str, dst: str) -> Dict: |
| try: |
| src_path = self._safe_path(src) |
| dst_path = self._safe_path(dst) |
| os.makedirs(os.path.dirname(dst_path), exist_ok=True) |
| shutil.copy2(src_path, dst_path) |
| return {"success": True, "src": src, "dst": dst, "action": "copied"} |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| |
|
|
| async def search_files( |
| self, |
| query: str, |
| path: str = "", |
| pattern: str = "*", |
| max_results: int = 50, |
| ) -> Dict: |
| """Search file contents for query string.""" |
| try: |
| base = self._safe_path(path) if path else self.workspace |
| results = [] |
| for root, dirs, files in os.walk(base): |
| dirs[:] = [d for d in dirs if not d.startswith(".") and d not in ("node_modules", "__pycache__", ".git", ".next", "dist", "build")] |
| for fname in files: |
| if not fnmatch.fnmatch(fname, pattern): |
| continue |
| fpath = os.path.join(root, fname) |
| try: |
| with open(fpath, "r", encoding="utf-8", errors="replace") as f: |
| lines = f.readlines() |
| for i, line in enumerate(lines, 1): |
| if query.lower() in line.lower(): |
| rel = os.path.relpath(fpath, self.workspace) |
| results.append({ |
| "file": rel, |
| "line": i, |
| "content": line.strip()[:200], |
| }) |
| if len(results) >= max_results: |
| break |
| except Exception: |
| continue |
| if len(results) >= max_results: |
| break |
| return {"success": True, "query": query, "results": results, "count": len(results)} |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| |
|
|
| async def tree(self, path: str = "", max_depth: int = 4) -> Dict: |
| """Generate directory tree.""" |
| try: |
| base = self._safe_path(path) if path else self.workspace |
| lines = [] |
| self._walk_tree(base, lines, prefix="", depth=0, max_depth=max_depth) |
| return { |
| "success": True, |
| "path": os.path.relpath(base, self.workspace) or ".", |
| "tree": "\n".join(lines), |
| "full_path": base, |
| } |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| def _walk_tree(self, path: str, lines: list, prefix: str, depth: int, max_depth: int): |
| if depth > max_depth: |
| return |
| try: |
| entries = sorted(os.listdir(path)) |
| except PermissionError: |
| return |
| skip = {".git", "node_modules", "__pycache__", ".next", "dist", "build", ".venv", "venv"} |
| entries = [e for e in entries if e not in skip] |
| for i, entry in enumerate(entries): |
| full = os.path.join(path, entry) |
| is_last = i == len(entries) - 1 |
| connector = "βββ " if is_last else "βββ " |
| suffix = "/" if os.path.isdir(full) else "" |
| lines.append(f"{prefix}{connector}{entry}{suffix}") |
| if os.path.isdir(full): |
| ext = " " if is_last else "β " |
| self._walk_tree(full, lines, prefix + ext, depth + 1, max_depth) |
|
|
| |
|
|
| async def mkdir(self, path: str) -> Dict: |
| try: |
| full = self._safe_path(path) |
| os.makedirs(full, exist_ok=True) |
| return {"success": True, "path": path, "action": "created"} |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| |
|
|
| async def list_dir(self, path: str = "") -> Dict: |
| try: |
| base = self._safe_path(path) if path else self.workspace |
| entries = [] |
| for e in sorted(os.listdir(base)): |
| full = os.path.join(base, e) |
| stat = os.stat(full) |
| entries.append({ |
| "name": e, |
| "type": "dir" if os.path.isdir(full) else "file", |
| "size": stat.st_size, |
| "modified": int(stat.st_mtime), |
| }) |
| return {"success": True, "path": path or ".", "entries": entries} |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| |
|
|
| async def grep(self, pattern: str, path: str = "", flags: str = "i") -> Dict: |
| """Regex grep across workspace files.""" |
| try: |
| base = self._safe_path(path) if path else self.workspace |
| re_flags = re.IGNORECASE if "i" in flags else 0 |
| regex = re.compile(pattern, re_flags) |
| results = [] |
| for root, dirs, files in os.walk(base): |
| dirs[:] = [d for d in dirs if d not in {"node_modules", "__pycache__", ".git", ".next"}] |
| for fname in files: |
| fpath = os.path.join(root, fname) |
| try: |
| with open(fpath, "r", encoding="utf-8", errors="replace") as f: |
| for i, line in enumerate(f, 1): |
| if regex.search(line): |
| rel = os.path.relpath(fpath, self.workspace) |
| results.append({"file": rel, "line": i, "content": line.strip()[:200]}) |
| if len(results) >= 100: |
| break |
| except Exception: |
| continue |
| if len(results) >= 100: |
| break |
| return {"success": True, "pattern": pattern, "results": results, "count": len(results)} |
| except re.error as e: |
| return {"success": False, "error": f"Invalid regex: {e}"} |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| |
|
|
| async def write_files(self, files: List[Dict]) -> Dict: |
| """Write multiple files at once. files = [{"filename": ..., "content": ...}]""" |
| results = [] |
| for f in files: |
| r = await self.write_file(f["filename"], f["content"]) |
| results.append(r) |
| success_count = sum(1 for r in results if r.get("success")) |
| return { |
| "success": success_count == len(files), |
| "total": len(files), |
| "succeeded": success_count, |
| "failed": len(files) - success_count, |
| "results": results, |
| } |
|
|