""" agent_loop.py — Agentic loop standalone untuk Gemini Claw. Urutan prompt setiap turn: [20 history terakhir] → [system prompt] → [input user] → [injection] Injection: Pesan singkat yang WAJIB dipatuhi AI: - Jangan halusinasi (jangan tulis 'User:', 'Assistant:', dll.) - Awali dengan [start_response], akhiri dengan [end_response] Error handling: - Output tool diperiksa terhadap pola error. - Jika error: injeksi pesan koreksi, AI wajib perbaiki dulu. - Jika consecutive error >= MAX_CONSECUTIVE_ERRORS: loop dihentikan. """ from __future__ import annotations import re from pathlib import Path from .agent_tools import dispatch, TOOL_REGISTRY, set_workspace from .gemini_client import GeminiAPIError, call_gemini, DEFAULT_MODEL MAX_TURNS = 30 MAX_CONSECUTIVE_ERRORS = 4 _PROMPT_FILE = Path(__file__).parent.parent / "system_prompt.md" def build_system_prompt(workspace: str | Path | None = None) -> str: """Baca system_prompt.md dan isi placeholder {workspace}.""" import os if workspace: cwd = Path(workspace) else: ws_env = os.environ.get("CLAW_WORKSPACE", "") cwd = Path(ws_env) if ws_env else Path.cwd() template = _PROMPT_FILE.read_text(encoding="utf-8") return template.replace("{workspace}", str(cwd)) # ── Injection anti-halusinasi ───────────────────────────────────────────────── _INJECTION = ( "[SYSTEM INJECTION — IKUTI TANPA PENGECUALIAN]\n" "1. Kamu HANYA menulis balasan kamu sendiri. " "DILARANG menulis label seperti 'User:', 'Assistant:', 'History:', " "'Agent:', atau melanjutkan percakapan seolah kamu adalah user.\n" "2. Mulai responmu dengan token: [start_response]\n" "3. Akhiri responmu dengan token: [end_response]\n" "4. Jangan menulis apapun setelah [end_response].\n" "Sekarang tulis responmu:" ) # ── Regex tools_call ────────────────────────────────────────────────────────── _TOOLS_CALL_BLOCK_RE = re.compile(r"```tools_call\n(.*?)```", re.DOTALL) _PARAM_KEY_RE = re.compile(r"^\[(\w+)\]$") # Regex untuk strip token halusinasi-guard _RESPONSE_TOKEN_RE = re.compile(r"\[start_response\]|\[end_response\]", re.IGNORECASE) def _parse_tools_call_block(block: str) -> dict | None: lines = block.splitlines() fields: dict[str, str] = {} i = 0 while i < len(lines): key_match = _PARAM_KEY_RE.match(lines[i].strip()) if key_match: key = key_match.group(1) i += 1 if i >= len(lines): break if lines[i].strip() == "< list[dict]: calls = [] for m in _TOOLS_CALL_BLOCK_RE.finditer(text): parsed = _parse_tools_call_block(m.group(1)) if parsed: calls.append(parsed) return calls def has_tool_calls(text: str) -> bool: return bool(_TOOLS_CALL_BLOCK_RE.search(text)) def _cast_params(tool_name: str, raw: dict[str, str]) -> dict: INT_PARAMS = {"offset", "limit", "timeout", "context_lines"} result = {} for k, v in raw.items(): if k in INT_PARAMS: try: result[k] = int(v) except ValueError: result[k] = v else: result[k] = v return result def _render_result(name: str, params: dict, output: str) -> str: return f'\n\n{output}\n\n' def strip_tool_calls(text: str) -> str: """Hapus blok tools_call dan token [start/end_response] dari teks.""" cleaned = _TOOLS_CALL_BLOCK_RE.sub("", text) cleaned = _RESPONSE_TOKEN_RE.sub("", cleaned) cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) return cleaned.strip() # ── Error detection ─────────────────────────────────────────────────────────── _ERROR_PATTERNS = re.compile( r"(\[Error\]" r"|\[exit code:\s*[^0]\d*\]" r"|Traceback \(most recent" r"|FileNotFoundError" r"|PermissionError" r"|SyntaxError" r"|tidak ditemukan" r"|String tidak ditemukan" r"|String ditemukan \d+ kali" r"|Gagal)", re.IGNORECASE, ) def _is_error_output(output: str) -> bool: return bool(_ERROR_PATTERNS.search(output)) def _build_error_correction_message(tool_name: str, output: str) -> str: return ( f"[SYSTEM — ERROR DETECTED]\n" f"Tool `{tool_name}` GAGAL dengan pesan:\n\n" f"{output.strip()}\n\n" f"INSTRUKSI WAJIB:\n" f"1. Analisis pesan error di atas dengan seksama.\n" f"2. JANGAN lanjut ke langkah berikutnya sebelum error ini diperbaiki.\n" f"3. Jika error dari `edit_file`: gunakan `read_file` dulu untuk mendapat isi\n" f" file terkini, lalu ulangi edit dengan `old_string` yang PERSIS sama.\n" f"4. Jika error dari `bash`: periksa perintah, path, dan dependensi.\n" f"5. Setelah memperbaiki, lanjutkan rencana semula." ) # ── Agentic loop ────────────────────────────────────────────────────────────── class AgentLoop: """ Loop agentic. Urutan prompt setiap turn: [20 history terakhir] → [system prompt] → [input user] → [injection] """ def __init__(self, model: str = DEFAULT_MODEL, workspace: str | Path | None = None): self.model = model self.workspace = Path(workspace) if workspace else None self.system = build_system_prompt(workspace) def _build_full_prompt(self, conversation: list[dict]) -> str: """ Susun prompt: [system prompt] → [20 history terakhir] → [input user] → [injection] """ lines = [] # ── 1. System prompt ────────────────────────────────────────────────── lines.append(self.system) lines.append("") # ── 2. History (20 pesan terakhir, kecuali input user saat ini) ────── history = conversation[:-1] for turn in history[-20:]: role = "User" if turn["role"] == "user" else "Assistant" lines.append(f"{role}: {turn['content']}") if len(history) > 0: lines.append("") # ── 3. Input user saat ini ──────────────────────────────────────────── current_input = conversation[-1]["content"] if conversation else "" lines.append(f"User: {current_input}") lines.append("") # ── 4. Injection anti-halusinasi ────────────────────────────────────── lines.append(_INJECTION) return "\n".join(lines) def run( self, user_input: str, conversation: list[dict], on_text: callable = None, on_tool_start: callable = None, on_tool_result: callable = None, ) -> tuple[str, list[dict]]: """ Jalankan satu giliran agentic. Returns: (final_reply_text, updated_conversation) """ if self.workspace: set_workspace(self.workspace) conversation = list(conversation) conversation.append({"role": "user", "content": user_input}) final_text = "" turn_count = 0 consecutive_errors = 0 while turn_count < MAX_TURNS: turn_count += 1 prompt = self._build_full_prompt(conversation) try: raw_reply = call_gemini(prompt, model=self.model) except GeminiAPIError as e: err = f"[Error API] {e}" if on_text: on_text(err) return err, conversation tool_calls = parse_tool_calls(raw_reply) visible_text = strip_tool_calls(raw_reply) if visible_text and on_text: on_text(visible_text) if not tool_calls: conversation.append({"role": "assistant", "content": raw_reply}) final_text = visible_text break tool_results_block = "" turn_had_error = False for tc in tool_calls: name = tc["name"] reason = tc.get("reason", "") params = _cast_params(name, tc["params"]) if on_tool_start: on_tool_start(name, params, reason) output = dispatch(name, params) if on_tool_result: on_tool_result(name, output) if _is_error_output(output): error_msg = _build_error_correction_message(name, output) tool_results_block += _render_result(name, params, error_msg) turn_had_error = True else: tool_results_block += _render_result(name, params, output) if turn_had_error: consecutive_errors += 1 else: consecutive_errors = 0 if consecutive_errors >= MAX_CONSECUTIVE_ERRORS: abort_msg = ( f"[SYSTEM — LOOP DIHENTIKAN]\n" f"AI gagal {consecutive_errors}x berturut-turut. " f"Periksa error terakhir secara manual." ) if on_text: on_text(abort_msg) conversation.append({"role": "assistant", "content": raw_reply}) return abort_msg, conversation conversation.append({"role": "assistant", "content": raw_reply}) conversation.append({"role": "user", "content": tool_results_block.strip()}) else: final_text = "[Batas maksimum turn tercapai]" return final_text, conversation