Spaces:
Paused
Paused
| """ | |
| agent_loop.py β Agentic loop standalone untuk Gemini Claw. | |
| Urutan prompt setiap turn: | |
| [20 history terakhir] β [system prompt] β [input user] β [injection] | |
| Injection: | |
| Pesan singkat yang WAJIB dipatuhi AI: | |
| - Jangan halusinasi (jangan tulis 'User:', 'Assistant:', dll.) | |
| - Awali dengan [start_response], akhiri dengan [end_response] | |
| Error handling: | |
| - Output tool diperiksa terhadap pola error. | |
| - Jika error: injeksi pesan koreksi, AI wajib perbaiki dulu. | |
| - Jika consecutive error >= MAX_CONSECUTIVE_ERRORS: loop dihentikan. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| from pathlib import Path | |
| from .agent_tools import dispatch, TOOL_REGISTRY, set_workspace | |
| from .gemini_client import GeminiAPIError, call_gemini, DEFAULT_MODEL | |
| MAX_TURNS = 30 | |
| MAX_CONSECUTIVE_ERRORS = 4 | |
| _PROMPT_FILE = Path(__file__).parent.parent / "system_prompt.md" | |
| def build_system_prompt(workspace: str | Path | None = None) -> str: | |
| """Baca system_prompt.md dan isi placeholder {workspace}.""" | |
| import os | |
| if workspace: | |
| cwd = Path(workspace) | |
| else: | |
| ws_env = os.environ.get("CLAW_WORKSPACE", "") | |
| cwd = Path(ws_env) if ws_env else Path.cwd() | |
| template = _PROMPT_FILE.read_text(encoding="utf-8") | |
| return template.replace("{workspace}", str(cwd)) | |
| # ββ Injection anti-halusinasi βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _INJECTION = ( | |
| "[SYSTEM INJECTION β IKUTI TANPA PENGECUALIAN]\n" | |
| "1. Kamu HANYA menulis balasan kamu sendiri. " | |
| "DILARANG menulis label seperti 'User:', 'Assistant:', 'History:', " | |
| "'Agent:', atau melanjutkan percakapan seolah kamu adalah user.\n" | |
| "2. Mulai responmu dengan token: [start_response]\n" | |
| "3. Akhiri responmu dengan token: [end_response]\n" | |
| "4. Jangan menulis apapun setelah [end_response].\n" | |
| "Sekarang tulis responmu:" | |
| ) | |
| # ββ Regex tools_call ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _TOOLS_CALL_BLOCK_RE = re.compile(r"```tools_call\n(.*?)```", re.DOTALL) | |
| _PARAM_KEY_RE = re.compile(r"^\[(\w+)\]$") | |
| # Regex untuk strip token halusinasi-guard | |
| _RESPONSE_TOKEN_RE = re.compile(r"\[start_response\]|\[end_response\]", re.IGNORECASE) | |
| def _parse_tools_call_block(block: str) -> dict | None: | |
| lines = block.splitlines() | |
| fields: dict[str, str] = {} | |
| i = 0 | |
| while i < len(lines): | |
| key_match = _PARAM_KEY_RE.match(lines[i].strip()) | |
| if key_match: | |
| key = key_match.group(1) | |
| i += 1 | |
| if i >= len(lines): | |
| break | |
| if lines[i].strip() == "<<EOF": | |
| i += 1 | |
| multi: list[str] = [] | |
| while i < len(lines) and lines[i].strip() != "EOF": | |
| multi.append(lines[i]) | |
| i += 1 | |
| fields[key] = "\n".join(multi) | |
| else: | |
| fields[key] = lines[i].strip() | |
| i += 1 | |
| name = fields.pop("nama", None) | |
| reason = fields.pop("reason", "") or "" | |
| if name: | |
| return {"name": name, "reason": reason, "params": fields} | |
| return None | |
| def parse_tool_calls(text: str) -> list[dict]: | |
| calls = [] | |
| for m in _TOOLS_CALL_BLOCK_RE.finditer(text): | |
| parsed = _parse_tools_call_block(m.group(1)) | |
| if parsed: | |
| calls.append(parsed) | |
| return calls | |
| def has_tool_calls(text: str) -> bool: | |
| return bool(_TOOLS_CALL_BLOCK_RE.search(text)) | |
| def _cast_params(tool_name: str, raw: dict[str, str]) -> dict: | |
| INT_PARAMS = {"offset", "limit", "timeout", "context_lines"} | |
| result = {} | |
| for k, v in raw.items(): | |
| if k in INT_PARAMS: | |
| try: | |
| result[k] = int(v) | |
| except ValueError: | |
| result[k] = v | |
| else: | |
| result[k] = v | |
| return result | |
| def _render_result(name: str, params: dict, output: str) -> str: | |
| return f'\n<tool_result name="{name}">\n{output}\n</tool_result>\n' | |
| def strip_tool_calls(text: str) -> str: | |
| """Hapus blok tools_call dan token [start/end_response] dari teks.""" | |
| cleaned = _TOOLS_CALL_BLOCK_RE.sub("", text) | |
| cleaned = _RESPONSE_TOKEN_RE.sub("", cleaned) | |
| cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) | |
| return cleaned.strip() | |
| # ββ Error detection βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _ERROR_PATTERNS = re.compile( | |
| r"(\[Error\]" | |
| r"|\[exit code:\s*[^0]\d*\]" | |
| r"|Traceback \(most recent" | |
| r"|FileNotFoundError" | |
| r"|PermissionError" | |
| r"|SyntaxError" | |
| r"|tidak ditemukan" | |
| r"|String tidak ditemukan" | |
| r"|String ditemukan \d+ kali" | |
| r"|Gagal)", | |
| re.IGNORECASE, | |
| ) | |
| def _is_error_output(output: str) -> bool: | |
| return bool(_ERROR_PATTERNS.search(output)) | |
| def _build_error_correction_message(tool_name: str, output: str) -> str: | |
| return ( | |
| f"[SYSTEM β ERROR DETECTED]\n" | |
| f"Tool `{tool_name}` GAGAL dengan pesan:\n\n" | |
| f"{output.strip()}\n\n" | |
| f"INSTRUKSI WAJIB:\n" | |
| f"1. Analisis pesan error di atas dengan seksama.\n" | |
| f"2. JANGAN lanjut ke langkah berikutnya sebelum error ini diperbaiki.\n" | |
| f"3. Jika error dari `edit_file`: gunakan `read_file` dulu untuk mendapat isi\n" | |
| f" file terkini, lalu ulangi edit dengan `old_string` yang PERSIS sama.\n" | |
| f"4. Jika error dari `bash`: periksa perintah, path, dan dependensi.\n" | |
| f"5. Setelah memperbaiki, lanjutkan rencana semula." | |
| ) | |
| # ββ Agentic loop ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class AgentLoop: | |
| """ | |
| Loop agentic. | |
| Urutan prompt setiap turn: | |
| [20 history terakhir] β [system prompt] β [input user] β [injection] | |
| """ | |
| def __init__(self, model: str = DEFAULT_MODEL, workspace: str | Path | None = None): | |
| self.model = model | |
| self.workspace = Path(workspace) if workspace else None | |
| self.system = build_system_prompt(workspace) | |
| def _build_full_prompt(self, conversation: list[dict]) -> str: | |
| """ | |
| Susun prompt: | |
| [system prompt] β [20 history terakhir] β [input user] β [injection] | |
| """ | |
| lines = [] | |
| # ββ 1. System prompt ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| lines.append(self.system) | |
| lines.append("") | |
| # ββ 2. History (20 pesan terakhir, kecuali input user saat ini) ββββββ | |
| history = conversation[:-1] | |
| for turn in history[-20:]: | |
| role = "User" if turn["role"] == "user" else "Assistant" | |
| lines.append(f"{role}: {turn['content']}") | |
| if len(history) > 0: | |
| lines.append("") | |
| # ββ 3. Input user saat ini ββββββββββββββββββββββββββββββββββββββββββββ | |
| current_input = conversation[-1]["content"] if conversation else "" | |
| lines.append(f"User: {current_input}") | |
| lines.append("") | |
| # ββ 4. Injection anti-halusinasi ββββββββββββββββββββββββββββββββββββββ | |
| lines.append(_INJECTION) | |
| return "\n".join(lines) | |
| def run( | |
| self, | |
| user_input: str, | |
| conversation: list[dict], | |
| on_text: callable = None, | |
| on_tool_start: callable = None, | |
| on_tool_result: callable = None, | |
| ) -> tuple[str, list[dict]]: | |
| """ | |
| Jalankan satu giliran agentic. | |
| Returns: (final_reply_text, updated_conversation) | |
| """ | |
| if self.workspace: | |
| set_workspace(self.workspace) | |
| conversation = list(conversation) | |
| conversation.append({"role": "user", "content": user_input}) | |
| final_text = "" | |
| turn_count = 0 | |
| consecutive_errors = 0 | |
| while turn_count < MAX_TURNS: | |
| turn_count += 1 | |
| prompt = self._build_full_prompt(conversation) | |
| try: | |
| raw_reply = call_gemini(prompt, model=self.model) | |
| except GeminiAPIError as e: | |
| err = f"[Error API] {e}" | |
| if on_text: | |
| on_text(err) | |
| return err, conversation | |
| tool_calls = parse_tool_calls(raw_reply) | |
| visible_text = strip_tool_calls(raw_reply) | |
| if visible_text and on_text: | |
| on_text(visible_text) | |
| if not tool_calls: | |
| conversation.append({"role": "assistant", "content": raw_reply}) | |
| final_text = visible_text | |
| break | |
| tool_results_block = "" | |
| turn_had_error = False | |
| for tc in tool_calls: | |
| name = tc["name"] | |
| reason = tc.get("reason", "") | |
| params = _cast_params(name, tc["params"]) | |
| if on_tool_start: | |
| on_tool_start(name, params, reason) | |
| output = dispatch(name, params) | |
| if on_tool_result: | |
| on_tool_result(name, output) | |
| if _is_error_output(output): | |
| error_msg = _build_error_correction_message(name, output) | |
| tool_results_block += _render_result(name, params, error_msg) | |
| turn_had_error = True | |
| else: | |
| tool_results_block += _render_result(name, params, output) | |
| if turn_had_error: | |
| consecutive_errors += 1 | |
| else: | |
| consecutive_errors = 0 | |
| if consecutive_errors >= MAX_CONSECUTIVE_ERRORS: | |
| abort_msg = ( | |
| f"[SYSTEM β LOOP DIHENTIKAN]\n" | |
| f"AI gagal {consecutive_errors}x berturut-turut. " | |
| f"Periksa error terakhir secara manual." | |
| ) | |
| if on_text: | |
| on_text(abort_msg) | |
| conversation.append({"role": "assistant", "content": raw_reply}) | |
| return abort_msg, conversation | |
| conversation.append({"role": "assistant", "content": raw_reply}) | |
| conversation.append({"role": "user", "content": tool_results_block.strip()}) | |
| else: | |
| final_text = "[Batas maksimum turn tercapai]" | |
| return final_text, conversation | |