gemini-claw / src /agent_tools.py
PurCLI Agent
Initial commit from PurCLI Coding Agent
ef271cc
"""
agent_tools.py β€” Implementasi tools untuk Gemini coding agent.
Tools yang tersedia: read_file, write_file, edit_file, bash, glob, grep, ls.
Workspace bersifat per-thread agar setiap sesi user terisolasi.
"""
from __future__ import annotations
import os
import re
import subprocess
import threading
from pathlib import Path
MAX_FILE_SIZE = 200_000 # ~200KB batas baca file
MAX_OUTPUT = 20_000 # potong output bash jika terlalu panjang
# ── Per-thread workspace ───────────────────────────────────────────────────────
_local = threading.local()
def get_workspace() -> Path:
"""Kembalikan workspace root untuk thread saat ini."""
if hasattr(_local, "root"):
return _local.root
ws = os.environ.get("CLAW_WORKSPACE", "")
base = Path(ws) if ws else Path("/tmp/workspace")
base.mkdir(parents=True, exist_ok=True)
return base
def set_workspace(path: str | Path) -> Path:
"""Set workspace root untuk thread saat ini dan buat direktorinya."""
p = Path(path)
p.mkdir(parents=True, exist_ok=True)
_local.root = p
return p
# ── helpers ───────────────────────────────────────────────────────────────────
def _truncate(text: str, limit: int = MAX_OUTPUT) -> str:
if len(text) <= limit:
return text
half = limit // 2
return text[:half] + f"\n\n... [terpotong {len(text) - limit} karakter] ...\n\n" + text[-half:]
def _safe_path(raw: str) -> Path:
"""Pastikan path berada di dalam workspace."""
ws = get_workspace()
p = (ws / raw).resolve()
if not str(p).startswith(str(ws)):
raise PermissionError(f"Akses ditolak: path di luar workspace ({raw})")
return p
# ── read_file ─────────────────────────────────────────────────────────────────
def read_file(path: str, offset: int = 1, limit: int = 500) -> str:
try:
p = _safe_path(path)
if not p.exists():
return f"[Error] File tidak ditemukan: {path}"
if p.stat().st_size > MAX_FILE_SIZE:
return f"[Error] File terlalu besar (>{MAX_FILE_SIZE//1000}KB): {path}"
lines = p.read_text(encoding="utf-8", errors="replace").splitlines()
total = len(lines)
start = max(0, offset - 1)
end = min(total, start + limit)
selected = lines[start:end]
numbered = "\n".join(f"{start + i + 1:6}β†’{line}" for i, line in enumerate(selected))
note = f"[Baris {start+1}–{end} dari total {total} baris]"
return f"{note}\n{numbered}"
except PermissionError as e:
return f"[Error] {e}"
except Exception as e:
return f"[Error] Gagal membaca file: {e}"
# ── write_file ────────────────────────────────────────────────────────────────
def write_file(path: str, content: str) -> str:
try:
p = _safe_path(path)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content, encoding="utf-8")
lines = content.count("\n") + 1
return f"[OK] File ditulis: {path} ({lines} baris)"
except PermissionError as e:
return f"[Error] {e}"
except Exception as e:
return f"[Error] Gagal menulis file: {e}"
# ── edit_file ─────────────────────────────────────────────────────────────────
def edit_file(path: str, old_string: str, new_string: str) -> str:
try:
p = _safe_path(path)
if not p.exists():
return f"[Error] File tidak ditemukan: {path}"
original = p.read_text(encoding="utf-8")
count = original.count(old_string)
if count == 0:
return f"[Error] String tidak ditemukan dalam file:\n{old_string!r}"
if count > 1:
return f"[Error] String ditemukan {count} kali β€” harus unik agar edit aman."
updated = original.replace(old_string, new_string, 1)
p.write_text(updated, encoding="utf-8")
return f"[OK] Edit berhasil pada {path}"
except PermissionError as e:
return f"[Error] {e}"
except Exception as e:
return f"[Error] Gagal mengedit file: {e}"
# ── bash ──────────────────────────────────────────────────────────────────────
def bash(command: str, timeout: int = 30) -> str:
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=timeout,
cwd=str(get_workspace()),
)
out = result.stdout
err = result.stderr
combined = ""
if out:
combined += out
if err:
combined += ("\n[stderr]\n" + err) if out else err
if not combined:
combined = "(tidak ada output)"
combined = _truncate(combined)
if result.returncode != 0:
combined += f"\n[exit code: {result.returncode}]"
return combined
except subprocess.TimeoutExpired:
return f"[Error] Perintah melebihi batas waktu ({timeout} detik)"
except Exception as e:
return f"[Error] Gagal menjalankan perintah: {e}"
# ── glob ──────────────────────────────────────────────────────────────────────
def glob(pattern: str, path: str = ".") -> str:
try:
base = _safe_path(path)
matches = sorted(base.glob(pattern))
if not matches:
return f"(tidak ada file yang cocok dengan pola: {pattern})"
ws = get_workspace()
lines = [str(m.relative_to(ws)) for m in matches[:200]]
result = "\n".join(lines)
if len(matches) > 200:
result += f"\n... dan {len(matches) - 200} file lainnya"
return result
except PermissionError as e:
return f"[Error] {e}"
except Exception as e:
return f"[Error] Gagal melakukan glob: {e}"
# ── grep ──────────────────────────────────────────────────────────────────────
def grep(pattern: str, path: str = ".", file_glob: str = "*", context_lines: int = 0) -> str:
try:
base = _safe_path(path)
ws = get_workspace()
regex = re.compile(pattern, re.IGNORECASE)
results: list[str] = []
for fp in sorted(base.rglob(file_glob)):
if not fp.is_file():
continue
if fp.stat().st_size > MAX_FILE_SIZE:
continue
try:
lines = fp.read_text(encoding="utf-8", errors="replace").splitlines()
except Exception:
continue
rel = str(fp.relative_to(ws))
for i, line in enumerate(lines):
if regex.search(line):
if context_lines:
start = max(0, i - context_lines)
end = min(len(lines), i + context_lines + 1)
block = "\n".join(
f" {rel}:{j+1}: {lines[j]}" for j in range(start, end)
)
results.append(block)
else:
results.append(f"{rel}:{i+1}: {line}")
if len(results) > 300:
break
if not results:
return f"(tidak ada hasil untuk: {pattern!r})"
return _truncate("\n".join(results))
except re.error as e:
return f"[Error] Regex tidak valid: {e}"
except PermissionError as e:
return f"[Error] {e}"
except Exception as e:
return f"[Error] Gagal melakukan grep: {e}"
# ── ls ────────────────────────────────────────────────────────────────────────
def ls(path: str = ".") -> str:
try:
p = _safe_path(path)
if not p.exists():
return f"[Error] Direktori tidak ditemukan: {path}"
if not p.is_dir():
return f"[Error] Bukan direktori: {path}"
entries = sorted(p.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower()))
lines = []
for e in entries:
kind = "πŸ“" if e.is_dir() else "πŸ“„"
size = ""
if e.is_file():
s = e.stat().st_size
size = f" ({s:,} B)" if s < 1024 else f" ({s//1024:,} KB)"
lines.append(f"{kind} {e.name}{size}")
return "\n".join(lines) if lines else "(direktori kosong)"
except PermissionError as e:
return f"[Error] {e}"
except Exception as e:
return f"[Error] Gagal listing direktori: {e}"
# ── dispatcher ────────────────────────────────────────────────────────────────
TOOL_REGISTRY: dict[str, callable] = {
"read_file": read_file,
"write_file": write_file,
"edit_file": edit_file,
"bash": bash,
"glob": glob,
"grep": grep,
"ls": ls,
}
def dispatch(tool_name: str, args: dict) -> str:
fn = TOOL_REGISTRY.get(tool_name)
if fn is None:
return f"[Error] Tool tidak dikenal: {tool_name}"
try:
return fn(**args)
except TypeError as e:
return f"[Error] Parameter salah untuk {tool_name}: {e}"