| import asyncio | |
| import json | |
| import os | |
| import difflib | |
| import fnmatch | |
| import shutil | |
| from typing import Optional, Dict, Any, Tuple | |
| from src.constants import MAX_READ_CHARS, MAX_DIFF_LINES, MAX_OUTPUT_CHARS | |
| _CODENAV_SKIP_DIRS = frozenset({ | |
| ".git", ".hg", ".svn", "node_modules", "venv", ".venv", "__pycache__", | |
| ".mypy_cache", ".pytest_cache", ".ruff_cache", "dist", "build", | |
| ".next", ".cache", "site-packages", ".idea", ".tox", | |
| }) | |
| _CODENAV_MAX_HITS = 200 | |
| _CODENAV_MAX_LINE = 400 | |
| def _unified_diff(old: str, new: str, path: str) -> Optional[Dict[str, Any]]: | |
| if old == new: | |
| return None | |
| old_lines = old.splitlines() | |
| new_lines = new.splitlines() | |
| label = path or "file" | |
| diff_lines = list(difflib.unified_diff( | |
| old_lines, new_lines, | |
| fromfile=f"a/{label}", tofile=f"b/{label}", | |
| lineterm="", | |
| )) | |
| added = sum(1 for line in diff_lines if line.startswith("+") and not line.startswith("+++")) | |
| removed = sum(1 for line in diff_lines if line.startswith("-") and not line.startswith("---")) | |
| truncated = False | |
| if len(diff_lines) > MAX_DIFF_LINES: | |
| diff_lines = diff_lines[:MAX_DIFF_LINES] | |
| truncated = True | |
| text = "\n".join(diff_lines) | |
| if truncated: | |
| text += f"\n… diff truncated at {MAX_DIFF_LINES} lines" | |
| return { | |
| "text": text, | |
| "added": added, | |
| "removed": removed, | |
| "new_file": old == "", | |
| "file": os.path.basename(path) or (path or "file"), | |
| } | |
| class EditFileTool: | |
| async def execute(self, content: str, ctx: dict) -> dict: | |
| from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate | |
| try: | |
| args = json.loads(content) if content.strip().startswith("{") else {} | |
| except (json.JSONDecodeError, TypeError): | |
| args = {} | |
| raw_path = (args.get("path") or "").strip() | |
| old = args.get("old_string", "") | |
| new = args.get("new_string", "") | |
| replace_all = bool(args.get("replace_all", False)) | |
| if not raw_path: | |
| return {"error": "edit_file: path required", "exit_code": 1} | |
| try: | |
| path = _resolve_tool_path(raw_path) | |
| except ValueError as e: | |
| return {"error": f"edit_file: {e}", "exit_code": 1} | |
| if old == "": | |
| return {"error": "edit_file: old_string required (use write_file to create a file)", "exit_code": 1} | |
| if old == new: | |
| return {"error": "edit_file: old_string and new_string are identical", "exit_code": 1} | |
| def _apply(): | |
| """Helper function that performs the actual string replacement and file writing logic.""" | |
| with open(path, "r", encoding="utf-8") as f: | |
| original = f.read() | |
| count = original.count(old) | |
| if count == 0: | |
| return original, None, "not_found" | |
| if count > 1 and not replace_all: | |
| return original, None, f"not_unique:{count}" | |
| updated = original.replace(old, new) if replace_all else original.replace(old, new, 1) | |
| with open(path, "w", encoding="utf-8") as f: | |
| f.write(updated) | |
| return original, updated, "ok" | |
| try: | |
| original, updated, status = await asyncio.to_thread(_apply) | |
| except FileNotFoundError: | |
| return {"error": f"edit_file: {path}: not found (use write_file to create it)", "exit_code": 1} | |
| except (IsADirectoryError, UnicodeDecodeError): | |
| return {"error": f"edit_file: {path}: not an editable text file", "exit_code": 1} | |
| except PermissionError: | |
| return {"error": f"edit_file: {path}: permission denied", "exit_code": 1} | |
| except OSError as e: | |
| return {"error": f"edit_file: {path}: {e}", "exit_code": 1} | |
| if status == "not_found": | |
| return {"error": f"edit_file: old_string not found in {path}. Read the file and match it exactly.", "exit_code": 1} | |
| if status.startswith("not_unique"): | |
| n = status.split(":", 1)[1] | |
| return {"error": f"edit_file: old_string is not unique in {path} ({n} matches). Add surrounding context or set replace_all=true.", "exit_code": 1} | |
| n = original.count(old) | |
| result = {"output": f"Edited {path} ({n} replacement{'s' if n != 1 else ''})", "exit_code": 0} | |
| diff = _unified_diff(original, updated, path) | |
| if diff: | |
| result["diff"] = diff | |
| return result | |
| class ReadFileTool: | |
| async def execute(self, content: str, ctx: dict) -> dict: | |
| from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate | |
| raw_path, offset, limit = content.split("\n", 1)[0].strip(), 0, 0 | |
| _stripped = content.strip() | |
| if _stripped.startswith("{"): | |
| try: | |
| _a = json.loads(_stripped) | |
| raw_path = str(_a.get("path", "")).strip() | |
| offset = int(_a.get("offset") or 0) | |
| limit = int(_a.get("limit") or 0) | |
| except (json.JSONDecodeError, TypeError, ValueError): | |
| pass | |
| try: | |
| path = _resolve_tool_path(raw_path) | |
| except ValueError as e: | |
| return {"error": f"read_file: {e}", "exit_code": 1} | |
| try: | |
| def _read(): | |
| if offset > 0 or limit > 0: | |
| start = max(offset, 1) | |
| out, n, budget = [], 0, MAX_READ_CHARS | |
| with open(path, "r", encoding="utf-8", errors="replace") as f: | |
| for i, line in enumerate(f, 1): | |
| if i < start: | |
| continue | |
| if limit > 0 and n >= limit: | |
| break | |
| out.append(line) | |
| n += 1 | |
| budget -= len(line) | |
| if budget <= 0: | |
| out.append(f"\n... [truncated at {MAX_READ_CHARS} chars]") | |
| break | |
| return "".join(out) | |
| with open(path, "r", encoding="utf-8", errors="replace") as f: | |
| return f.read(MAX_READ_CHARS + 1) | |
| data = await asyncio.to_thread(_read) | |
| except FileNotFoundError: | |
| return {"error": f"read_file: {path}: not found", "exit_code": 1} | |
| except PermissionError: | |
| return {"error": f"read_file: {path}: permission denied", "exit_code": 1} | |
| except IsADirectoryError: | |
| return {"error": f"read_file: {path}: is a directory (use ls)", "exit_code": 1} | |
| except OSError as e: | |
| return {"error": f"read_file: {path}: {e}", "exit_code": 1} | |
| if not (offset > 0 or limit > 0) and len(data) > MAX_READ_CHARS: | |
| data = data[:MAX_READ_CHARS] + f"\n... [truncated at {MAX_READ_CHARS} chars]" | |
| return {"output": data, "exit_code": 0} | |
| class WriteFileTool: | |
| async def execute(self, content: str, ctx: dict) -> dict: | |
| from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate | |
| lines = content.split("\n", 1) | |
| raw_path = lines[0].strip() | |
| body = lines[1] if len(lines) > 1 else "" | |
| try: | |
| path = _resolve_tool_path(raw_path) | |
| except ValueError as e: | |
| return {"error": f"write_file: {e}", "exit_code": 1} | |
| try: | |
| def _write(): | |
| old = "" | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| old = f.read() | |
| except (FileNotFoundError, IsADirectoryError, UnicodeDecodeError, OSError): | |
| old = "" | |
| d = os.path.dirname(path) | |
| if d: | |
| os.makedirs(d, exist_ok=True) | |
| with open(path, "w", encoding="utf-8") as f: | |
| f.write(body) | |
| return old, len(body) | |
| old_content, size = await asyncio.to_thread(_write) | |
| except PermissionError: | |
| return {"error": f"write_file: {path}: permission denied", "exit_code": 1} | |
| except OSError as e: | |
| return {"error": f"write_file: {path}: {e}", "exit_code": 1} | |
| diff = _unified_diff(old_content, body, path) | |
| result = {"output": f"Wrote {size} bytes to {path}", "exit_code": 0} | |
| if diff: | |
| result["diff"] = diff | |
| return result | |
| class LsTool: | |
| async def execute(self, content: str, ctx: dict) -> dict: | |
| from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate | |
| raw_path = "" | |
| _s = (content or "").strip() | |
| if _s.startswith("{"): | |
| try: | |
| raw_path = str(json.loads(_s).get("path", "")).strip() | |
| except json.JSONDecodeError: | |
| raw_path = "" | |
| else: | |
| raw_path = _s.split("\n", 1)[0].strip() | |
| try: | |
| root = _resolve_search_root(raw_path) | |
| except ValueError as e: | |
| return {"error": f"ls: {e}", "exit_code": 1} | |
| def _ls(): | |
| if not os.path.isdir(root): | |
| return None, f"ls: {root}: not a directory" | |
| rows = [] | |
| try: | |
| with os.scandir(root) as it: | |
| for entry in it: | |
| if entry.name.startswith("."): | |
| continue | |
| try: | |
| is_dir = entry.is_dir(follow_symlinks=False) | |
| size = entry.stat(follow_symlinks=False).st_size if not is_dir else 0 | |
| except OSError: | |
| continue | |
| rows.append((is_dir, entry.name, size)) | |
| except (PermissionError, OSError) as _e: | |
| return None, f"ls: {_e}" | |
| rows.sort(key=lambda r: (not r[0], r[1].lower())) | |
| lines = [f"{root}:"] | |
| for is_dir, name, size in rows[:_CODENAV_MAX_HITS]: | |
| lines.append(f" {name}/" if is_dir else f" {name} ({size} B)") | |
| if len(rows) > _CODENAV_MAX_HITS: | |
| lines.append(f" ... [{len(rows) - _CODENAV_MAX_HITS} more]") | |
| if not rows: | |
| lines.append(" (empty)") | |
| return "\n".join(lines), None | |
| out, err = await asyncio.to_thread(_ls) | |
| if err: | |
| return {"error": err, "exit_code": 1} | |
| return {"output": _truncate(out), "exit_code": 0} | |
| class GlobTool: | |
| async def execute(self, content: str, ctx: dict) -> dict: | |
| from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate | |
| args = {} | |
| _s = (content or "").strip() | |
| if _s.startswith("{"): | |
| try: | |
| args = json.loads(_s) | |
| except json.JSONDecodeError: | |
| args = {} | |
| else: | |
| args = {"pattern": _s} | |
| pattern = str(args.get("pattern", "")).strip() | |
| if not pattern: | |
| return {"error": "glob: pattern is required", "exit_code": 1} | |
| try: | |
| root = _resolve_search_root(str(args.get("path", ""))) | |
| except ValueError as e: | |
| return {"error": f"glob: {e}", "exit_code": 1} | |
| def _glob(): | |
| from pathlib import Path | |
| base = Path(root) | |
| if not base.is_dir(): | |
| return None, f"glob: {root}: not a directory" | |
| matched = [] | |
| try: | |
| for p in base.rglob(pattern): | |
| if set(p.relative_to(base).parts) & _CODENAV_SKIP_DIRS: | |
| continue | |
| try: | |
| mtime = p.stat().st_mtime | |
| except OSError: | |
| mtime = 0 | |
| matched.append((mtime, str(p))) | |
| if len(matched) > _CODENAV_MAX_HITS * 5: | |
| break | |
| except (OSError, ValueError) as _e: | |
| return None, f"glob: {_e}" | |
| matched.sort(key=lambda t: t[0], reverse=True) | |
| return [pth for _, pth in matched[:_CODENAV_MAX_HITS]], None | |
| paths, err = await asyncio.to_thread(_glob) | |
| if err: | |
| return {"error": err, "exit_code": 1} | |
| if not paths: | |
| return {"output": f"No files matching {pattern!r} under {root}", "exit_code": 0} | |
| out = "\n".join(paths) | |
| if len(paths) >= _CODENAV_MAX_HITS: | |
| out += f"\n... [capped at {_CODENAV_MAX_HITS} files]" | |
| return {"output": _truncate(out), "exit_code": 0} | |
| class GrepTool: | |
| async def execute(self, content: str, ctx: dict) -> dict: | |
| from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate | |
| args: Dict[str, Any] = {} | |
| _s = (content or "").strip() | |
| if _s.startswith("{"): | |
| try: | |
| args = json.loads(_s) | |
| except json.JSONDecodeError: | |
| args = {} | |
| else: | |
| args = {"pattern": _s} | |
| pattern = str(args.get("pattern", "")).strip() | |
| if not pattern: | |
| return {"error": "grep: pattern is required", "exit_code": 1} | |
| ignore_case = bool(args.get("ignore_case")) | |
| glob_pat = str(args.get("glob", "") or "").strip() | |
| try: | |
| max_hits = int(args.get("max_results") or _CODENAV_MAX_HITS) | |
| except (TypeError, ValueError): | |
| max_hits = _CODENAV_MAX_HITS | |
| max_hits = max(1, min(max_hits, _CODENAV_MAX_HITS)) | |
| try: | |
| root = _resolve_search_root(str(args.get("path", ""))) | |
| except ValueError as e: | |
| return {"error": f"grep: {e}", "exit_code": 1} | |
| def _grep(): | |
| import re as _re | |
| import shutil | |
| rg = shutil.which("rg") | |
| if rg: | |
| cmd = [rg, "--line-number", "--no-heading", "--color=never", | |
| "--max-count", str(max_hits)] | |
| if ignore_case: | |
| cmd.append("--ignore-case") | |
| if glob_pat: | |
| cmd += ["--glob", glob_pat] | |
| for _d in _CODENAV_SKIP_DIRS: | |
| cmd += ["--glob", f"!**/{_d}/**"] | |
| cmd += ["--regexp", pattern, root] | |
| try: | |
| import subprocess | |
| p = subprocess.run(cmd, capture_output=True, text=True, timeout=20) | |
| lines = [ln for ln in (p.stdout or "").splitlines() if ln][:max_hits] | |
| return lines, None | |
| except subprocess.TimeoutExpired: | |
| return None, "grep: timed out" | |
| except Exception as _e: | |
| return None, f"grep: {_e}" | |
| try: | |
| rx = _re.compile(pattern, _re.IGNORECASE if ignore_case else 0) | |
| except _re.error as _e: | |
| return None, f"grep: bad pattern: {_e}" | |
| hits = [] | |
| if os.path.isfile(root): | |
| file_iter = [root] | |
| else: | |
| file_iter = [] | |
| for dp, dns, fns in os.walk(root): | |
| dns[:] = [d for d in dns if d not in _CODENAV_SKIP_DIRS] | |
| for fn in fns: | |
| if glob_pat and not fnmatch.fnmatch(fn, glob_pat): | |
| continue | |
| file_iter.append(os.path.join(dp, fn)) | |
| for fp in file_iter: | |
| if len(hits) >= max_hits: | |
| break | |
| try: | |
| with open(fp, "r", encoding="utf-8", errors="strict") as f: | |
| for i, line in enumerate(f, 1): | |
| if rx.search(line): | |
| hits.append(f"{fp}:{i}:{line.rstrip()[:_CODENAV_MAX_LINE]}") | |
| if len(hits) >= max_hits: | |
| break | |
| except (UnicodeDecodeError, OSError): | |
| continue | |
| return hits, None | |
| lines, err = await asyncio.to_thread(_grep) | |
| if err: | |
| return {"error": err, "exit_code": 1} | |
| if not lines: | |
| return {"output": f"No matches for {pattern!r} under {root}", "exit_code": 0} | |
| out = "\n".join(ln[:_CODENAV_MAX_LINE] for ln in lines) | |
| if len(lines) >= max_hits: | |
| out += f"\n... [capped at {max_hits} matches]" | |
| return {"output": _truncate(out), "exit_code": 0} | |
| class GetWorkspaceTool: | |
| """Report the active workspace folder (no args). File tools are confined to | |
| it; the shell starts there (cwd) but is NOT sandboxed.""" | |
| async def execute(self, content: str, ctx: dict) -> dict: | |
| from src.tool_execution import get_active_workspace | |
| ws = get_active_workspace() | |
| if ws: | |
| return { | |
| "output": f"{ws}\n(File tools are confined to this folder; the shell starts " | |
| f"here but is not sandboxed and can reach outside it.)", | |
| "exit_code": 0, | |
| } | |
| return { | |
| "output": "No workspace is set. File tools use the default allowed roots; " | |
| "resolve paths from the user or use absolute paths.", | |
| "exit_code": 0, | |
| } | |