"""File system tools: read, write, edit, list, glob, grep. All tools are sandboxed to a configurable workspace root (default: ./workspace). They return JSON-serializable dicts so they can be exposed via the API. """ from __future__ import annotations import fnmatch import os import re import shutil from pathlib import Path from typing import Any # ─── Workspace sandbox ────────────────────────────────────────────────── # Default workspace: ./workspace under the app root _DEFAULT_WORKSPACE = os.environ.get( "SONICODER_WORKSPACE", os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "workspace")), ) def get_workspace_root() -> str: """Return the absolute path of the agent's workspace root.""" root = _DEFAULT_WORKSPACE os.makedirs(root, exist_ok=True) return root def _resolve_safe(path: str) -> str: """Resolve a path safely within the workspace root. Raises ValueError if the resolved path escapes the workspace. """ root = get_workspace_root() if os.path.isabs(path): full = os.path.abspath(path) else: full = os.path.abspath(os.path.join(root, path)) # Ensure path is within the workspace if not (full == root or full.startswith(root + os.sep)): raise ValueError( f"Path '{path}' resolves outside the workspace root ({root}). " "Agent tools are sandboxed." ) return full # ─── read_file ────────────────────────────────────────────────────────── def read_file(path: str, offset: int = 0, limit: int | None = None) -> dict[str, Any]: """Read a text file from the workspace. Args: path: Relative path inside the workspace, or absolute within it. offset: 1-indexed line to start reading from. limit: Maximum number of lines to read. Returns: dict with: path, content, line_count, truncated """ try: full = _resolve_safe(path) if not os.path.exists(full): return {"success": False, "error": f"File not found: {path}"} if os.path.isdir(full): return {"success": False, "error": f"Path is a directory: {path}"} with open(full, "r", encoding="utf-8", errors="replace") as f: lines = f.readlines() total = len(lines) start = max(0, (offset - 1) if offset > 0 else 0) end = (start + limit) if limit else total selected = lines[start:end] # Re-number for display numbered = "".join( f"{start + i + 1:6}\t{line}" for i, line in enumerate(selected) ) return { "success": True, "path": path, "content": numbered, "line_count": total, "returned_lines": len(selected), "truncated": end < total, } except Exception as exc: return {"success": False, "error": str(exc)} # ─── write_file ───────────────────────────────────────────────────────── def write_file(path: str, content: str) -> dict[str, Any]: """Write content to a file, creating parent directories as needed.""" try: full = _resolve_safe(path) os.makedirs(os.path.dirname(full), exist_ok=True) with open(full, "w", encoding="utf-8") as f: f.write(content) return { "success": True, "path": path, "bytes_written": len(content.encode("utf-8")), } except Exception as exc: return {"success": False, "error": str(exc)} # ─── edit_file ────────────────────────────────────────────────────────── def edit_file( path: str, old_str: str, new_str: str, replace_all: bool = False, ) -> dict[str, Any]: """Replace occurrences of old_str with new_str in a file.""" try: full = _resolve_safe(path) if not os.path.exists(full): return {"success": False, "error": f"File not found: {path}"} with open(full, "r", encoding="utf-8") as f: content = f.read() if old_str not in content: return { "success": False, "error": f"old_str not found in {path}. Edit aborted.", } if old_str == new_str: return {"success": False, "error": "old_str and new_str are identical."} count = content.count(old_str) if replace_all else 1 if not replace_all and count > 1: return { "success": False, "error": ( f"old_str is not unique ({count} matches) in {path}. " "Provide more context or use replace_all=true." ), } new_content = content.replace(old_str, new_str) if replace_all else content.replace( old_str, new_str, 1 ) with open(full, "w", encoding="utf-8") as f: f.write(new_content) return { "success": True, "path": path, "replacements": count, } except Exception as exc: return {"success": False, "error": str(exc)} def multi_edit(path: str, edits: list[dict[str, Any]]) -> dict[str, Any]: """Apply multiple edits to a file atomically (all-or-nothing).""" try: full = _resolve_safe(path) if not os.path.exists(full): return {"success": False, "error": f"File not found: {path}"} with open(full, "r", encoding="utf-8") as f: content = f.read() applied = 0 for edit in edits: old_str = edit.get("old_str", "") new_str = edit.get("new_str", "") replace_all = edit.get("replace_all", False) if old_str not in content: return { "success": False, "error": f"old_str not found in {path} for edit #{applied + 1}.", "applied": applied, } if old_str == new_str: return { "success": False, "error": f"old_str and new_str identical in edit #{applied + 1}.", "applied": applied, } content = content.replace(old_str, new_str) if replace_all else content.replace( old_str, new_str, 1 ) applied += 1 with open(full, "w", encoding="utf-8") as f: f.write(content) return {"success": True, "path": path, "applied": applied} except Exception as exc: return {"success": False, "error": str(exc)} # ─── list_dir ─────────────────────────────────────────────────────────── def list_dir(path: str = ".") -> dict[str, Any]: """List directory contents.""" try: full = _resolve_safe(path) if not os.path.exists(full): return {"success": False, "error": f"Path not found: {path}"} if not os.path.isdir(full): return {"success": False, "error": f"Not a directory: {path}"} entries = [] for name in sorted(os.listdir(full)): entry_path = os.path.join(full, name) stat = os.stat(entry_path) entries.append({ "name": name, "type": "dir" if os.path.isdir(entry_path) else "file", "size": stat.st_size, "path": os.path.relpath(entry_path, get_workspace_root()), }) return { "success": True, "path": path, "entries": entries, } except Exception as exc: return {"success": False, "error": str(exc)} # ─── glob ─────────────────────────────────────────────────────────────── def glob_paths(pattern: str, path: str = ".") -> dict[str, Any]: """Glob file paths matching a pattern, recursively.""" try: full = _resolve_safe(path) matches: list[str] = [] for root_dir, _dirs, files in os.walk(full): for fname in files: if fnmatch.fnmatch(fname, pattern) or fnmatch.fnmatch( os.path.relpath(os.path.join(root_dir, fname), full), pattern ): matches.append(os.path.relpath(os.path.join(root_dir, fname), get_workspace_root())) matches.sort() return {"success": True, "pattern": pattern, "matches": matches} except Exception as exc: return {"success": False, "error": str(exc)} # ─── grep ─────────────────────────────────────────────────────────────── def grep_search( pattern: str, path: str = ".", include: str | None = None, ignore_case: bool = False, max_results: int = 100, ) -> dict[str, Any]: """Search file contents with a regex pattern.""" try: full = _resolve_safe(path) flags = re.IGNORECASE if ignore_case else 0 regex = re.compile(pattern, flags) matches: list[dict[str, Any]] = [] for root_dir, _dirs, files in os.walk(full): for fname in files: if include and not fnmatch.fnmatch(fname, include): continue fpath = os.path.join(root_dir, fname) try: with open(fpath, "r", encoding="utf-8", errors="replace") as f: for lineno, line in enumerate(f, 1): if regex.search(line): matches.append({ "file": os.path.relpath(fpath, get_workspace_root()), "line": lineno, "text": line.rstrip()[:500], }) if len(matches) >= max_results: return { "success": True, "pattern": pattern, "matches": matches, "truncated": True, } except (UnicodeDecodeError, PermissionError): continue return { "success": True, "pattern": pattern, "matches": matches, "truncated": False, } except re.error as exc: return {"success": False, "error": f"Invalid regex: {exc}"} except Exception as exc: return {"success": False, "error": str(exc)} # ─── Workspace management ─────────────────────────────────────────────── def list_workspace_tree(max_depth: int = 3) -> dict[str, Any]: """Return a tree view of the workspace.""" try: root = get_workspace_root() def _walk(path: str, depth: int) -> dict[str, Any]: if depth > max_depth: return {"name": os.path.basename(path), "type": "dir", "truncated": True} entries = [] try: for name in sorted(os.listdir(path)): full = os.path.join(path, name) if os.path.isdir(full): entries.append(_walk(full, depth + 1)) else: entries.append({ "name": name, "type": "file", "size": os.path.getsize(full), }) except PermissionError: pass return {"name": os.path.basename(path), "type": "dir", "children": entries} tree = _walk(root, 0) return {"success": True, "tree": tree} except Exception as exc: return {"success": False, "error": str(exc)} def reset_workspace() -> dict[str, Any]: """Clear all files in the workspace (used by /new command).""" try: root = get_workspace_root() if os.path.exists(root): for entry in os.listdir(root): full = os.path.join(root, entry) if os.path.isdir(full): shutil.rmtree(full) else: os.remove(full) return {"success": True, "message": "Workspace cleared"} except Exception as exc: return {"success": False, "error": str(exc)} def snapshot_workspace() -> dict[str, str]: """Return a dict of {relative_path: content} for all text files in the workspace. Used to package workspace files for ZIP/HF deploy. """ root = get_workspace_root() files: dict[str, str] = {} for dirpath, _dirs, fnames in os.walk(root): # Skip hidden dirs and node_modules / __pycache__ parts = os.path.relpath(dirpath, root).split(os.sep) if any(p.startswith(".") or p in {"node_modules", "__pycache__", ".venv", "venv"} for p in parts): continue for fname in fnames: if fname.startswith("."): continue full = os.path.join(dirpath, fname) try: with open(full, "r", encoding="utf-8") as f: files[os.path.relpath(full, root)] = f.read() except (UnicodeDecodeError, PermissionError): continue return files