""" agent_tools.py — Implementasi tools untuk Gemini coding agent. Tools yang tersedia: read_file, write_file, edit_file, bash, glob, grep, ls. Workspace bersifat per-thread agar setiap sesi user terisolasi. """ from __future__ import annotations import os import re import subprocess import threading from pathlib import Path MAX_FILE_SIZE = 200_000 # ~200KB batas baca file MAX_OUTPUT = 20_000 # potong output bash jika terlalu panjang # ── Per-thread workspace ─────────────────────────────────────────────────────── _local = threading.local() def get_workspace() -> Path: """Kembalikan workspace root untuk thread saat ini.""" if hasattr(_local, "root"): return _local.root ws = os.environ.get("CLAW_WORKSPACE", "") base = Path(ws) if ws else Path("/tmp/workspace") base.mkdir(parents=True, exist_ok=True) return base def set_workspace(path: str | Path) -> Path: """Set workspace root untuk thread saat ini dan buat direktorinya.""" p = Path(path) p.mkdir(parents=True, exist_ok=True) _local.root = p return p # ── helpers ─────────────────────────────────────────────────────────────────── def _truncate(text: str, limit: int = MAX_OUTPUT) -> str: if len(text) <= limit: return text half = limit // 2 return text[:half] + f"\n\n... [terpotong {len(text) - limit} karakter] ...\n\n" + text[-half:] def _safe_path(raw: str) -> Path: """Pastikan path berada di dalam workspace.""" ws = get_workspace() p = (ws / raw).resolve() if not str(p).startswith(str(ws)): raise PermissionError(f"Akses ditolak: path di luar workspace ({raw})") return p # ── read_file ───────────────────────────────────────────────────────────────── def read_file(path: str, offset: int = 1, limit: int = 500) -> str: try: p = _safe_path(path) if not p.exists(): return f"[Error] File tidak ditemukan: {path}" if p.stat().st_size > MAX_FILE_SIZE: return f"[Error] File terlalu besar (>{MAX_FILE_SIZE//1000}KB): {path}" lines = p.read_text(encoding="utf-8", errors="replace").splitlines() total = len(lines) start = max(0, offset - 1) end = min(total, start + limit) selected = lines[start:end] numbered = "\n".join(f"{start + i + 1:6}→{line}" for i, line in enumerate(selected)) note = f"[Baris {start+1}–{end} dari total {total} baris]" return f"{note}\n{numbered}" except PermissionError as e: return f"[Error] {e}" except Exception as e: return f"[Error] Gagal membaca file: {e}" # ── write_file ──────────────────────────────────────────────────────────────── def write_file(path: str, content: str) -> str: try: p = _safe_path(path) p.parent.mkdir(parents=True, exist_ok=True) p.write_text(content, encoding="utf-8") lines = content.count("\n") + 1 return f"[OK] File ditulis: {path} ({lines} baris)" except PermissionError as e: return f"[Error] {e}" except Exception as e: return f"[Error] Gagal menulis file: {e}" # ── edit_file ───────────────────────────────────────────────────────────────── def edit_file(path: str, old_string: str, new_string: str) -> str: try: p = _safe_path(path) if not p.exists(): return f"[Error] File tidak ditemukan: {path}" original = p.read_text(encoding="utf-8") count = original.count(old_string) if count == 0: return f"[Error] String tidak ditemukan dalam file:\n{old_string!r}" if count > 1: return f"[Error] String ditemukan {count} kali — harus unik agar edit aman." updated = original.replace(old_string, new_string, 1) p.write_text(updated, encoding="utf-8") return f"[OK] Edit berhasil pada {path}" except PermissionError as e: return f"[Error] {e}" except Exception as e: return f"[Error] Gagal mengedit file: {e}" # ── bash ────────────────────────────────────────────────────────────────────── def bash(command: str, timeout: int = 30) -> str: try: result = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=timeout, cwd=str(get_workspace()), ) out = result.stdout err = result.stderr combined = "" if out: combined += out if err: combined += ("\n[stderr]\n" + err) if out else err if not combined: combined = "(tidak ada output)" combined = _truncate(combined) if result.returncode != 0: combined += f"\n[exit code: {result.returncode}]" return combined except subprocess.TimeoutExpired: return f"[Error] Perintah melebihi batas waktu ({timeout} detik)" except Exception as e: return f"[Error] Gagal menjalankan perintah: {e}" # ── glob ────────────────────────────────────────────────────────────────────── def glob(pattern: str, path: str = ".") -> str: try: base = _safe_path(path) matches = sorted(base.glob(pattern)) if not matches: return f"(tidak ada file yang cocok dengan pola: {pattern})" ws = get_workspace() lines = [str(m.relative_to(ws)) for m in matches[:200]] result = "\n".join(lines) if len(matches) > 200: result += f"\n... dan {len(matches) - 200} file lainnya" return result except PermissionError as e: return f"[Error] {e}" except Exception as e: return f"[Error] Gagal melakukan glob: {e}" # ── grep ────────────────────────────────────────────────────────────────────── def grep(pattern: str, path: str = ".", file_glob: str = "*", context_lines: int = 0) -> str: try: base = _safe_path(path) ws = get_workspace() regex = re.compile(pattern, re.IGNORECASE) results: list[str] = [] for fp in sorted(base.rglob(file_glob)): if not fp.is_file(): continue if fp.stat().st_size > MAX_FILE_SIZE: continue try: lines = fp.read_text(encoding="utf-8", errors="replace").splitlines() except Exception: continue rel = str(fp.relative_to(ws)) for i, line in enumerate(lines): if regex.search(line): if context_lines: start = max(0, i - context_lines) end = min(len(lines), i + context_lines + 1) block = "\n".join( f" {rel}:{j+1}: {lines[j]}" for j in range(start, end) ) results.append(block) else: results.append(f"{rel}:{i+1}: {line}") if len(results) > 300: break if not results: return f"(tidak ada hasil untuk: {pattern!r})" return _truncate("\n".join(results)) except re.error as e: return f"[Error] Regex tidak valid: {e}" except PermissionError as e: return f"[Error] {e}" except Exception as e: return f"[Error] Gagal melakukan grep: {e}" # ── ls ──────────────────────────────────────────────────────────────────────── def ls(path: str = ".") -> str: try: p = _safe_path(path) if not p.exists(): return f"[Error] Direktori tidak ditemukan: {path}" if not p.is_dir(): return f"[Error] Bukan direktori: {path}" entries = sorted(p.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())) lines = [] for e in entries: kind = "📁" if e.is_dir() else "📄" size = "" if e.is_file(): s = e.stat().st_size size = f" ({s:,} B)" if s < 1024 else f" ({s//1024:,} KB)" lines.append(f"{kind} {e.name}{size}") return "\n".join(lines) if lines else "(direktori kosong)" except PermissionError as e: return f"[Error] {e}" except Exception as e: return f"[Error] Gagal listing direktori: {e}" # ── dispatcher ──────────────────────────────────────────────────────────────── TOOL_REGISTRY: dict[str, callable] = { "read_file": read_file, "write_file": write_file, "edit_file": edit_file, "bash": bash, "glob": glob, "grep": grep, "ls": ls, } def dispatch(tool_name: str, args: dict) -> str: fn = TOOL_REGISTRY.get(tool_name) if fn is None: return f"[Error] Tool tidak dikenal: {tool_name}" try: return fn(**args) except TypeError as e: return f"[Error] Parameter salah untuk {tool_name}: {e}"