Spaces:
Paused
Paused
| """ | |
| multi_agent.py β Agentic loop untuk Gemini Claw. | |
| Filosofi inti: | |
| "Humans set direction; Claw executes and delivers." | |
| Single agent (Executor) yang langsung mengeksekusi permintaan user | |
| menggunakan tools. Tidak ada Architect atau Reviewer. | |
| Urutan konteks setiap turn: | |
| 1. System prompt (dimuat dari system_prompt.md) | |
| 2. History percakapan sebelumnya (user & assistant) | |
| 3. Input user saat ini | |
| Error handling: | |
| - Output setiap tool diperiksa terhadap pola kegagalan. | |
| - Jika error terdeteksi, pesan koreksi eksplisit diinjeksikan ke konteks | |
| sehingga AI WAJIB memperbaiki sebelum lanjut. | |
| - Jika consecutive error >= MAX_CONSECUTIVE_ERRORS, loop dihentikan paksa. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import re | |
| from pathlib import Path | |
| from .agent_tools import dispatch, set_workspace | |
| from .gemini_client import GeminiAPIError, call_gemini, DEFAULT_MODEL | |
| MAX_EXEC_TURNS = 25 # Batas turn per siklus | |
| MAX_CONSECUTIVE_ERRORS = 4 # hentikan paksa jika AI terus gagal tanpa progres | |
| # ββ Load system prompt dari file ββββββββββββββββββββββββββββββββββββββββββββββ | |
| _PROMPT_FILE = Path(__file__).parent.parent / "system_prompt.md" | |
| def _load_system_prompt(workspace: Path) -> str: | |
| """Baca system_prompt.md dan isi placeholder {workspace}.""" | |
| template = _PROMPT_FILE.read_text(encoding="utf-8") | |
| return template.replace("{workspace}", str(workspace)) | |
| # ββ Injection anti-halusinasi (sama seperti agent_loop) ββββββββββββββββββββββ | |
| _INJECTION = ( | |
| "[SYSTEM INJECTION β IKUTI TANPA PENGECUALIAN]\n" | |
| "1. Kamu HANYA menulis balasan kamu sendiri. " | |
| "DILARANG menulis label seperti 'User:', 'Assistant:', 'History:', " | |
| "'Agent:', atau melanjutkan percakapan seolah kamu adalah user.\n" | |
| "2. Mulai responmu dengan token: [start_response]\n" | |
| "3. Akhiri responmu dengan token: [end_response]\n" | |
| "4. Jangan menulis apapun setelah [end_response].\n" | |
| "Sekarang tulis responmu:" | |
| ) | |
| _RESPONSE_TOKEN_RE = re.compile(r"\[start_response\]|\[end_response\]", re.IGNORECASE) | |
| # ββ TOOL CALL PARSER (format markdown sesuai system_prompt.md) βββββββββββββββ | |
| # Format: | |
| # ```tools_call | |
| # [nama] | |
| # <nama_tool> | |
| # [reason] | |
| # <alasan> | |
| # [param] | |
| # <nilai> | |
| # [param_multiline] | |
| # <<EOF | |
| # baris 1 | |
| # EOF | |
| # ``` | |
| _TOOLS_CALL_BLOCK_RE = re.compile(r"```tools_call\n(.*?)```", re.DOTALL) | |
| _PARAM_KEY_RE = re.compile(r"^\[(\w+)\]$") | |
| _INT_PARAMS = {"offset", "limit", "timeout", "context_lines"} | |
| def _parse_tools_call_block(block: str) -> dict | None: | |
| """Parse satu blok tools_call menjadi dict {name, params}.""" | |
| lines = block.splitlines() | |
| fields: dict[str, str] = {} | |
| i = 0 | |
| while i < len(lines): | |
| key_match = _PARAM_KEY_RE.match(lines[i].strip()) | |
| if key_match: | |
| key = key_match.group(1) | |
| i += 1 | |
| if i >= len(lines): | |
| break | |
| if lines[i].strip() == "<<EOF": | |
| i += 1 | |
| multi: list[str] = [] | |
| while i < len(lines) and lines[i].strip() != "EOF": | |
| multi.append(lines[i]) | |
| i += 1 | |
| fields[key] = "\n".join(multi) | |
| else: | |
| fields[key] = lines[i].strip() | |
| i += 1 | |
| name = fields.pop("nama", None) | |
| reason = fields.pop("reason", "") or "" | |
| if name: | |
| return {"name": name, "reason": reason, "params": fields} | |
| return None | |
| def _parse_tool_calls(text: str) -> list[dict]: | |
| calls = [] | |
| for m in _TOOLS_CALL_BLOCK_RE.finditer(text): | |
| parsed = _parse_tools_call_block(m.group(1)) | |
| if parsed: | |
| calls.append(parsed) | |
| return calls | |
| def _strip_tool_calls(text: str) -> str: | |
| """Hapus blok tools_call dan token [start/end_response] dari teks.""" | |
| cleaned = _TOOLS_CALL_BLOCK_RE.sub("", text) | |
| cleaned = _RESPONSE_TOKEN_RE.sub("", cleaned) | |
| cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) | |
| return cleaned.strip() | |
| def _render_tool_result(name: str, output: str) -> str: | |
| return f'\n<tool_result name="{name}">\n{output}\n</tool_result>\n' | |
| def _cast_params(params: dict) -> dict: | |
| result = {} | |
| for k, v in params.items(): | |
| if k in _INT_PARAMS: | |
| try: | |
| result[k] = int(v) | |
| except ValueError: | |
| result[k] = v | |
| else: | |
| result[k] = v | |
| return result | |
| # ββ Error detection βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _ERROR_PATTERNS = re.compile( | |
| r"(\[Error\]" | |
| r"|\[exit code:\s*[^0]\d*\]" | |
| r"|Traceback \(most recent" | |
| r"|FileNotFoundError" | |
| r"|PermissionError" | |
| r"|SyntaxError" | |
| r"|tidak ditemukan" | |
| r"|String tidak ditemukan" | |
| r"|String ditemukan \d+ kali" | |
| r"|Gagal)", | |
| re.IGNORECASE, | |
| ) | |
| def _is_error_output(output: str) -> bool: | |
| """Kembalikan True jika output tool mengindikasikan kegagalan.""" | |
| return bool(_ERROR_PATTERNS.search(output)) | |
| def _build_error_correction_message(tool_name: str, output: str) -> str: | |
| """ | |
| Buat pesan injeksi yang memaksa AI fokus memperbaiki error | |
| sebelum melanjutkan ke langkah berikutnya. | |
| """ | |
| return ( | |
| f"[SYSTEM β ERROR DETECTED]\n" | |
| f"Tool `{tool_name}` GAGAL dengan pesan:\n\n" | |
| f"{output.strip()}\n\n" | |
| f"INSTRUKSI WAJIB:\n" | |
| f"1. Analisis pesan error di atas dengan seksama.\n" | |
| f"2. JANGAN lanjut ke langkah berikutnya sebelum error ini diperbaiki.\n" | |
| f"3. Jika error berasal dari `edit_file`: baca ulang file terlebih dahulu\n" | |
| f" dengan `read_file` untuk mendapatkan konten terkini, lalu ulangi edit\n" | |
| f" dengan `old_string` yang PERSIS sesuai isi file.\n" | |
| f"4. Jika error berasal dari `bash`: periksa perintah, path, dan dependensi.\n" | |
| f"5. Setelah berhasil memperbaiki, lanjutkan rencana semula." | |
| ) | |
| # ββ AGENT LOOP ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class MultiAgentLoop: | |
| """ | |
| Executor agent loop untuk Gemini Claw. | |
| Urutan konteks setiap API call: | |
| [system prompt] β [history user & assistant] β [input user] | |
| History hanya hidup selama sesi loop; input baru memulai history baru | |
| jika conversation dikosongkan dari luar. | |
| """ | |
| def __init__(self, model: str = DEFAULT_MODEL, workspace: str | Path | None = None): | |
| self.model = model | |
| self.workspace = Path(workspace) if workspace else Path("/tmp/workspace") | |
| self.workspace.mkdir(parents=True, exist_ok=True) | |
| def _build_prompt(self, conversation: list[dict], system: str) -> str: | |
| """ | |
| Susun prompt: | |
| [system prompt] β [20 history terakhir] β [input user saat ini] β [injection] | |
| """ | |
| lines = [] | |
| # ββ 1. System prompt ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| lines.append(system) | |
| lines.append("") | |
| # ββ 2. History (20 pesan, kecuali input user saat ini) βββββββββββββββ | |
| history = conversation[:-1] | |
| for msg in history[-20:]: | |
| role = "User" if msg["role"] == "user" else "Assistant" | |
| lines.append(f"{role}: {msg['content']}") | |
| if len(history) > 0: | |
| lines.append("") | |
| # ββ 3. Input user saat ini ββββββββββββββββββββββββββββββββββββββββββββ | |
| current_input = conversation[-1]["content"] if conversation else "" | |
| lines.append(f"User: {current_input}") | |
| lines.append("") | |
| # ββ 4. Injection anti-halusinasi ββββββββββββββββββββββββββββββββββββββ | |
| lines.append(_INJECTION) | |
| return "\n".join(lines) | |
| def run( | |
| self, | |
| user_input: str, | |
| conversation: list[dict], | |
| on_phase: callable = None, | |
| on_text: callable = None, | |
| on_tool_start: callable = None, | |
| on_tool_result: callable = None, | |
| ) -> tuple[str, list[dict]]: | |
| """ | |
| Jalankan executor loop untuk satu giliran user. | |
| Returns: (final_reply_text, updated_conversation) | |
| """ | |
| if on_phase: | |
| on_phase("executor", "βοΈ Mengerjakan tugas...") | |
| set_workspace(self.workspace) | |
| system = _load_system_prompt(self.workspace) | |
| # Mulai dari snapshot history, tambah pesan user saat ini | |
| conv = list(conversation) | |
| conv.append({"role": "user", "content": user_input}) | |
| final_text = "" | |
| turn = 0 | |
| consecutive_errors = 0 # jumlah error beruntun tanpa progres | |
| while turn < MAX_EXEC_TURNS: | |
| turn += 1 | |
| full_prompt = self._build_prompt(conv, system) | |
| try: | |
| raw_reply = call_gemini(full_prompt, model=self.model) | |
| except GeminiAPIError as e: | |
| err = f"[Error API] {e}" | |
| if on_text: | |
| on_text(err) | |
| return err, conv | |
| tool_calls = _parse_tool_calls(raw_reply) | |
| visible_text = _strip_tool_calls(raw_reply) | |
| if visible_text and on_text: | |
| on_text(visible_text) | |
| if not tool_calls: | |
| # Tidak ada tool call β selesai | |
| conv.append({"role": "assistant", "content": raw_reply}) | |
| final_text = visible_text | |
| break | |
| # ββ Eksekusi tool calls βββββββββββββββββββββββββββββββββββββββββββ | |
| tool_results_block = "" | |
| turn_had_error = False | |
| for tc in tool_calls: | |
| name = tc["name"] | |
| reason = tc.get("reason", "") | |
| params = _cast_params(tc["params"]) | |
| if on_tool_start: | |
| on_tool_start(name, params, reason) | |
| output = dispatch(name, params) | |
| if on_tool_result: | |
| on_tool_result(name, output) | |
| if _is_error_output(output): | |
| # Ganti output biasa dengan pesan koreksi yang eksplisit | |
| error_msg = _build_error_correction_message(name, output) | |
| tool_results_block += _render_tool_result(name, error_msg) | |
| turn_had_error = True | |
| else: | |
| tool_results_block += _render_tool_result(name, output) | |
| # ββ Update consecutive error counter ββββββββββββββββββββββββββββββ | |
| if turn_had_error: | |
| consecutive_errors += 1 | |
| else: | |
| consecutive_errors = 0 # reset jika ada turn bersih | |
| if consecutive_errors >= MAX_CONSECUTIVE_ERRORS: | |
| abort_msg = ( | |
| f"[SYSTEM β LOOP DIHENTIKAN]\n" | |
| f"AI telah gagal {consecutive_errors} kali berturut-turut tanpa memperbaiki " | |
| f"error. Harap periksa error terakhir secara manual." | |
| ) | |
| if on_text: | |
| on_text(abort_msg) | |
| conv.append({"role": "assistant", "content": raw_reply}) | |
| return abort_msg, conv | |
| conv.append({"role": "assistant", "content": raw_reply}) | |
| conv.append({"role": "user", "content": tool_results_block.strip()}) | |
| else: | |
| final_text = "[Batas maksimum turn tercapai]" | |
| return final_text, conv | |