""" multi_agent.py — Agentic loop untuk Gemini Claw. Filosofi inti: "Humans set direction; Claw executes and delivers." Single agent (Executor) yang langsung mengeksekusi permintaan user menggunakan tools. Tidak ada Architect atau Reviewer. Urutan konteks setiap turn: 1. System prompt (dimuat dari system_prompt.md) 2. History percakapan sebelumnya (user & assistant) 3. Input user saat ini Error handling: - Output setiap tool diperiksa terhadap pola kegagalan. - Jika error terdeteksi, pesan koreksi eksplisit diinjeksikan ke konteks sehingga AI WAJIB memperbaiki sebelum lanjut. - Jika consecutive error >= MAX_CONSECUTIVE_ERRORS, loop dihentikan paksa. """ from __future__ import annotations import os import re from pathlib import Path from .agent_tools import dispatch, set_workspace from .gemini_client import GeminiAPIError, call_gemini, DEFAULT_MODEL MAX_EXEC_TURNS = 25 # Batas turn per siklus MAX_CONSECUTIVE_ERRORS = 4 # hentikan paksa jika AI terus gagal tanpa progres # ── Load system prompt dari file ────────────────────────────────────────────── _PROMPT_FILE = Path(__file__).parent.parent / "system_prompt.md" def _load_system_prompt(workspace: Path) -> str: """Baca system_prompt.md dan isi placeholder {workspace}.""" template = _PROMPT_FILE.read_text(encoding="utf-8") return template.replace("{workspace}", str(workspace)) # ── Injection anti-halusinasi (sama seperti agent_loop) ────────────────────── _INJECTION = ( "[SYSTEM INJECTION — IKUTI TANPA PENGECUALIAN]\n" "1. Kamu HANYA menulis balasan kamu sendiri. " "DILARANG menulis label seperti 'User:', 'Assistant:', 'History:', " "'Agent:', atau melanjutkan percakapan seolah kamu adalah user.\n" "2. Mulai responmu dengan token: [start_response]\n" "3. Akhiri responmu dengan token: [end_response]\n" "4. Jangan menulis apapun setelah [end_response].\n" "Sekarang tulis responmu:" ) _RESPONSE_TOKEN_RE = re.compile(r"\[start_response\]|\[end_response\]", re.IGNORECASE) # ── TOOL CALL PARSER (format markdown sesuai system_prompt.md) ─────────────── # Format: # ```tools_call # [nama] # # [reason] # # [param] # # [param_multiline] # < dict | None: """Parse satu blok tools_call menjadi dict {name, params}.""" lines = block.splitlines() fields: dict[str, str] = {} i = 0 while i < len(lines): key_match = _PARAM_KEY_RE.match(lines[i].strip()) if key_match: key = key_match.group(1) i += 1 if i >= len(lines): break if lines[i].strip() == "< list[dict]: calls = [] for m in _TOOLS_CALL_BLOCK_RE.finditer(text): parsed = _parse_tools_call_block(m.group(1)) if parsed: calls.append(parsed) return calls def _strip_tool_calls(text: str) -> str: """Hapus blok tools_call dan token [start/end_response] dari teks.""" cleaned = _TOOLS_CALL_BLOCK_RE.sub("", text) cleaned = _RESPONSE_TOKEN_RE.sub("", cleaned) cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) return cleaned.strip() def _render_tool_result(name: str, output: str) -> str: return f'\n\n{output}\n\n' def _cast_params(params: dict) -> dict: result = {} for k, v in params.items(): if k in _INT_PARAMS: try: result[k] = int(v) except ValueError: result[k] = v else: result[k] = v return result # ── Error detection ─────────────────────────────────────────────────────────── _ERROR_PATTERNS = re.compile( r"(\[Error\]" r"|\[exit code:\s*[^0]\d*\]" r"|Traceback \(most recent" r"|FileNotFoundError" r"|PermissionError" r"|SyntaxError" r"|tidak ditemukan" r"|String tidak ditemukan" r"|String ditemukan \d+ kali" r"|Gagal)", re.IGNORECASE, ) def _is_error_output(output: str) -> bool: """Kembalikan True jika output tool mengindikasikan kegagalan.""" return bool(_ERROR_PATTERNS.search(output)) def _build_error_correction_message(tool_name: str, output: str) -> str: """ Buat pesan injeksi yang memaksa AI fokus memperbaiki error sebelum melanjutkan ke langkah berikutnya. """ return ( f"[SYSTEM — ERROR DETECTED]\n" f"Tool `{tool_name}` GAGAL dengan pesan:\n\n" f"{output.strip()}\n\n" f"INSTRUKSI WAJIB:\n" f"1. Analisis pesan error di atas dengan seksama.\n" f"2. JANGAN lanjut ke langkah berikutnya sebelum error ini diperbaiki.\n" f"3. Jika error berasal dari `edit_file`: baca ulang file terlebih dahulu\n" f" dengan `read_file` untuk mendapatkan konten terkini, lalu ulangi edit\n" f" dengan `old_string` yang PERSIS sesuai isi file.\n" f"4. Jika error berasal dari `bash`: periksa perintah, path, dan dependensi.\n" f"5. Setelah berhasil memperbaiki, lanjutkan rencana semula." ) # ── AGENT LOOP ──────────────────────────────────────────────────────────────── class MultiAgentLoop: """ Executor agent loop untuk Gemini Claw. Urutan konteks setiap API call: [system prompt] → [history user & assistant] → [input user] History hanya hidup selama sesi loop; input baru memulai history baru jika conversation dikosongkan dari luar. """ def __init__(self, model: str = DEFAULT_MODEL, workspace: str | Path | None = None): self.model = model self.workspace = Path(workspace) if workspace else Path("/tmp/workspace") self.workspace.mkdir(parents=True, exist_ok=True) def _build_prompt(self, conversation: list[dict], system: str) -> str: """ Susun prompt: [system prompt] → [20 history terakhir] → [input user saat ini] → [injection] """ lines = [] # ── 1. System prompt ────────────────────────────────────────────────── lines.append(system) lines.append("") # ── 2. History (20 pesan, kecuali input user saat ini) ─────────────── history = conversation[:-1] for msg in history[-20:]: role = "User" if msg["role"] == "user" else "Assistant" lines.append(f"{role}: {msg['content']}") if len(history) > 0: lines.append("") # ── 3. Input user saat ini ──────────────────────────────────────────── current_input = conversation[-1]["content"] if conversation else "" lines.append(f"User: {current_input}") lines.append("") # ── 4. Injection anti-halusinasi ────────────────────────────────────── lines.append(_INJECTION) return "\n".join(lines) def run( self, user_input: str, conversation: list[dict], on_phase: callable = None, on_text: callable = None, on_tool_start: callable = None, on_tool_result: callable = None, ) -> tuple[str, list[dict]]: """ Jalankan executor loop untuk satu giliran user. Returns: (final_reply_text, updated_conversation) """ if on_phase: on_phase("executor", "⚙️ Mengerjakan tugas...") set_workspace(self.workspace) system = _load_system_prompt(self.workspace) # Mulai dari snapshot history, tambah pesan user saat ini conv = list(conversation) conv.append({"role": "user", "content": user_input}) final_text = "" turn = 0 consecutive_errors = 0 # jumlah error beruntun tanpa progres while turn < MAX_EXEC_TURNS: turn += 1 full_prompt = self._build_prompt(conv, system) try: raw_reply = call_gemini(full_prompt, model=self.model) except GeminiAPIError as e: err = f"[Error API] {e}" if on_text: on_text(err) return err, conv tool_calls = _parse_tool_calls(raw_reply) visible_text = _strip_tool_calls(raw_reply) if visible_text and on_text: on_text(visible_text) if not tool_calls: # Tidak ada tool call → selesai conv.append({"role": "assistant", "content": raw_reply}) final_text = visible_text break # ── Eksekusi tool calls ─────────────────────────────────────────── tool_results_block = "" turn_had_error = False for tc in tool_calls: name = tc["name"] reason = tc.get("reason", "") params = _cast_params(tc["params"]) if on_tool_start: on_tool_start(name, params, reason) output = dispatch(name, params) if on_tool_result: on_tool_result(name, output) if _is_error_output(output): # Ganti output biasa dengan pesan koreksi yang eksplisit error_msg = _build_error_correction_message(name, output) tool_results_block += _render_tool_result(name, error_msg) turn_had_error = True else: tool_results_block += _render_tool_result(name, output) # ── Update consecutive error counter ────────────────────────────── if turn_had_error: consecutive_errors += 1 else: consecutive_errors = 0 # reset jika ada turn bersih if consecutive_errors >= MAX_CONSECUTIVE_ERRORS: abort_msg = ( f"[SYSTEM — LOOP DIHENTIKAN]\n" f"AI telah gagal {consecutive_errors} kali berturut-turut tanpa memperbaiki " f"error. Harap periksa error terakhir secara manual." ) if on_text: on_text(abort_msg) conv.append({"role": "assistant", "content": raw_reply}) return abort_msg, conv conv.append({"role": "assistant", "content": raw_reply}) conv.append({"role": "user", "content": tool_results_block.strip()}) else: final_text = "[Batas maksimum turn tercapai]" return final_text, conv