Spaces:
Paused
Paused
| """ | |
| agent_tools.py β Implementasi tools untuk Gemini coding agent. | |
| Tools yang tersedia: read_file, write_file, edit_file, bash, glob, grep, ls. | |
| Workspace bersifat per-thread agar setiap sesi user terisolasi. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import re | |
| import subprocess | |
| import threading | |
| from pathlib import Path | |
| MAX_FILE_SIZE = 200_000 # ~200KB batas baca file | |
| MAX_OUTPUT = 20_000 # potong output bash jika terlalu panjang | |
| # ββ Per-thread workspace βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _local = threading.local() | |
| def get_workspace() -> Path: | |
| """Kembalikan workspace root untuk thread saat ini.""" | |
| if hasattr(_local, "root"): | |
| return _local.root | |
| ws = os.environ.get("CLAW_WORKSPACE", "") | |
| base = Path(ws) if ws else Path("/tmp/workspace") | |
| base.mkdir(parents=True, exist_ok=True) | |
| return base | |
| def set_workspace(path: str | Path) -> Path: | |
| """Set workspace root untuk thread saat ini dan buat direktorinya.""" | |
| p = Path(path) | |
| p.mkdir(parents=True, exist_ok=True) | |
| _local.root = p | |
| return p | |
| # ββ helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _truncate(text: str, limit: int = MAX_OUTPUT) -> str: | |
| if len(text) <= limit: | |
| return text | |
| half = limit // 2 | |
| return text[:half] + f"\n\n... [terpotong {len(text) - limit} karakter] ...\n\n" + text[-half:] | |
| def _safe_path(raw: str) -> Path: | |
| """Pastikan path berada di dalam workspace.""" | |
| ws = get_workspace() | |
| p = (ws / raw).resolve() | |
| if not str(p).startswith(str(ws)): | |
| raise PermissionError(f"Akses ditolak: path di luar workspace ({raw})") | |
| return p | |
| # ββ read_file βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def read_file(path: str, offset: int = 1, limit: int = 500) -> str: | |
| try: | |
| p = _safe_path(path) | |
| if not p.exists(): | |
| return f"[Error] File tidak ditemukan: {path}" | |
| if p.stat().st_size > MAX_FILE_SIZE: | |
| return f"[Error] File terlalu besar (>{MAX_FILE_SIZE//1000}KB): {path}" | |
| lines = p.read_text(encoding="utf-8", errors="replace").splitlines() | |
| total = len(lines) | |
| start = max(0, offset - 1) | |
| end = min(total, start + limit) | |
| selected = lines[start:end] | |
| numbered = "\n".join(f"{start + i + 1:6}β{line}" for i, line in enumerate(selected)) | |
| note = f"[Baris {start+1}β{end} dari total {total} baris]" | |
| return f"{note}\n{numbered}" | |
| except PermissionError as e: | |
| return f"[Error] {e}" | |
| except Exception as e: | |
| return f"[Error] Gagal membaca file: {e}" | |
| # ββ write_file ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def write_file(path: str, content: str) -> str: | |
| try: | |
| p = _safe_path(path) | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| p.write_text(content, encoding="utf-8") | |
| lines = content.count("\n") + 1 | |
| return f"[OK] File ditulis: {path} ({lines} baris)" | |
| except PermissionError as e: | |
| return f"[Error] {e}" | |
| except Exception as e: | |
| return f"[Error] Gagal menulis file: {e}" | |
| # ββ edit_file βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def edit_file(path: str, old_string: str, new_string: str) -> str: | |
| try: | |
| p = _safe_path(path) | |
| if not p.exists(): | |
| return f"[Error] File tidak ditemukan: {path}" | |
| original = p.read_text(encoding="utf-8") | |
| count = original.count(old_string) | |
| if count == 0: | |
| return f"[Error] String tidak ditemukan dalam file:\n{old_string!r}" | |
| if count > 1: | |
| return f"[Error] String ditemukan {count} kali β harus unik agar edit aman." | |
| updated = original.replace(old_string, new_string, 1) | |
| p.write_text(updated, encoding="utf-8") | |
| return f"[OK] Edit berhasil pada {path}" | |
| except PermissionError as e: | |
| return f"[Error] {e}" | |
| except Exception as e: | |
| return f"[Error] Gagal mengedit file: {e}" | |
| # ββ bash ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def bash(command: str, timeout: int = 30) -> str: | |
| try: | |
| result = subprocess.run( | |
| command, | |
| shell=True, | |
| capture_output=True, | |
| text=True, | |
| timeout=timeout, | |
| cwd=str(get_workspace()), | |
| ) | |
| out = result.stdout | |
| err = result.stderr | |
| combined = "" | |
| if out: | |
| combined += out | |
| if err: | |
| combined += ("\n[stderr]\n" + err) if out else err | |
| if not combined: | |
| combined = "(tidak ada output)" | |
| combined = _truncate(combined) | |
| if result.returncode != 0: | |
| combined += f"\n[exit code: {result.returncode}]" | |
| return combined | |
| except subprocess.TimeoutExpired: | |
| return f"[Error] Perintah melebihi batas waktu ({timeout} detik)" | |
| except Exception as e: | |
| return f"[Error] Gagal menjalankan perintah: {e}" | |
| # ββ glob ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def glob(pattern: str, path: str = ".") -> str: | |
| try: | |
| base = _safe_path(path) | |
| matches = sorted(base.glob(pattern)) | |
| if not matches: | |
| return f"(tidak ada file yang cocok dengan pola: {pattern})" | |
| ws = get_workspace() | |
| lines = [str(m.relative_to(ws)) for m in matches[:200]] | |
| result = "\n".join(lines) | |
| if len(matches) > 200: | |
| result += f"\n... dan {len(matches) - 200} file lainnya" | |
| return result | |
| except PermissionError as e: | |
| return f"[Error] {e}" | |
| except Exception as e: | |
| return f"[Error] Gagal melakukan glob: {e}" | |
| # ββ grep ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def grep(pattern: str, path: str = ".", file_glob: str = "*", context_lines: int = 0) -> str: | |
| try: | |
| base = _safe_path(path) | |
| ws = get_workspace() | |
| regex = re.compile(pattern, re.IGNORECASE) | |
| results: list[str] = [] | |
| for fp in sorted(base.rglob(file_glob)): | |
| if not fp.is_file(): | |
| continue | |
| if fp.stat().st_size > MAX_FILE_SIZE: | |
| continue | |
| try: | |
| lines = fp.read_text(encoding="utf-8", errors="replace").splitlines() | |
| except Exception: | |
| continue | |
| rel = str(fp.relative_to(ws)) | |
| for i, line in enumerate(lines): | |
| if regex.search(line): | |
| if context_lines: | |
| start = max(0, i - context_lines) | |
| end = min(len(lines), i + context_lines + 1) | |
| block = "\n".join( | |
| f" {rel}:{j+1}: {lines[j]}" for j in range(start, end) | |
| ) | |
| results.append(block) | |
| else: | |
| results.append(f"{rel}:{i+1}: {line}") | |
| if len(results) > 300: | |
| break | |
| if not results: | |
| return f"(tidak ada hasil untuk: {pattern!r})" | |
| return _truncate("\n".join(results)) | |
| except re.error as e: | |
| return f"[Error] Regex tidak valid: {e}" | |
| except PermissionError as e: | |
| return f"[Error] {e}" | |
| except Exception as e: | |
| return f"[Error] Gagal melakukan grep: {e}" | |
| # ββ ls ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def ls(path: str = ".") -> str: | |
| try: | |
| p = _safe_path(path) | |
| if not p.exists(): | |
| return f"[Error] Direktori tidak ditemukan: {path}" | |
| if not p.is_dir(): | |
| return f"[Error] Bukan direktori: {path}" | |
| entries = sorted(p.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())) | |
| lines = [] | |
| for e in entries: | |
| kind = "π" if e.is_dir() else "π" | |
| size = "" | |
| if e.is_file(): | |
| s = e.stat().st_size | |
| size = f" ({s:,} B)" if s < 1024 else f" ({s//1024:,} KB)" | |
| lines.append(f"{kind} {e.name}{size}") | |
| return "\n".join(lines) if lines else "(direktori kosong)" | |
| except PermissionError as e: | |
| return f"[Error] {e}" | |
| except Exception as e: | |
| return f"[Error] Gagal listing direktori: {e}" | |
| # ββ dispatcher ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| TOOL_REGISTRY: dict[str, callable] = { | |
| "read_file": read_file, | |
| "write_file": write_file, | |
| "edit_file": edit_file, | |
| "bash": bash, | |
| "glob": glob, | |
| "grep": grep, | |
| "ls": ls, | |
| } | |
| def dispatch(tool_name: str, args: dict) -> str: | |
| fn = TOOL_REGISTRY.get(tool_name) | |
| if fn is None: | |
| return f"[Error] Tool tidak dikenal: {tool_name}" | |
| try: | |
| return fn(**args) | |
| except TypeError as e: | |
| return f"[Error] Parameter salah untuk {tool_name}: {e}" | |