gemini-claw / src /multi_agent.py
Ricky01anjay's picture
Upload files using @huggingface/hub
8c7fe70 verified
"""
multi_agent.py β€” Agentic loop untuk Gemini Claw.
Filosofi inti:
"Humans set direction; Claw executes and delivers."
Single agent (Executor) yang langsung mengeksekusi permintaan user
menggunakan tools. Tidak ada Architect atau Reviewer.
Urutan konteks setiap turn:
1. System prompt (dimuat dari system_prompt.md)
2. History percakapan sebelumnya (user & assistant)
3. Input user saat ini
Error handling:
- Output setiap tool diperiksa terhadap pola kegagalan.
- Jika error terdeteksi, pesan koreksi eksplisit diinjeksikan ke konteks
sehingga AI WAJIB memperbaiki sebelum lanjut.
- Jika consecutive error >= MAX_CONSECUTIVE_ERRORS, loop dihentikan paksa.
"""
from __future__ import annotations
import os
import re
from pathlib import Path
from .agent_tools import dispatch, set_workspace
from .gemini_client import GeminiAPIError, call_gemini, DEFAULT_MODEL
MAX_EXEC_TURNS = 25 # Batas turn per siklus
MAX_CONSECUTIVE_ERRORS = 4 # hentikan paksa jika AI terus gagal tanpa progres
# ── Load system prompt dari file ──────────────────────────────────────────────
_PROMPT_FILE = Path(__file__).parent.parent / "system_prompt.md"
def _load_system_prompt(workspace: Path) -> str:
"""Baca system_prompt.md dan isi placeholder {workspace}."""
template = _PROMPT_FILE.read_text(encoding="utf-8")
return template.replace("{workspace}", str(workspace))
# ── Injection anti-halusinasi (sama seperti agent_loop) ──────────────────────
_INJECTION = (
"[SYSTEM INJECTION β€” IKUTI TANPA PENGECUALIAN]\n"
"1. Kamu HANYA menulis balasan kamu sendiri. "
"DILARANG menulis label seperti 'User:', 'Assistant:', 'History:', "
"'Agent:', atau melanjutkan percakapan seolah kamu adalah user.\n"
"2. Mulai responmu dengan token: [start_response]\n"
"3. Akhiri responmu dengan token: [end_response]\n"
"4. Jangan menulis apapun setelah [end_response].\n"
"Sekarang tulis responmu:"
)
_RESPONSE_TOKEN_RE = re.compile(r"\[start_response\]|\[end_response\]", re.IGNORECASE)
# ── TOOL CALL PARSER (format markdown sesuai system_prompt.md) ───────────────
# Format:
# ```tools_call
# [nama]
# <nama_tool>
# [reason]
# <alasan>
# [param]
# <nilai>
# [param_multiline]
# <<EOF
# baris 1
# EOF
# ```
_TOOLS_CALL_BLOCK_RE = re.compile(r"```tools_call\n(.*?)```", re.DOTALL)
_PARAM_KEY_RE = re.compile(r"^\[(\w+)\]$")
_INT_PARAMS = {"offset", "limit", "timeout", "context_lines"}
def _parse_tools_call_block(block: str) -> dict | None:
"""Parse satu blok tools_call menjadi dict {name, params}."""
lines = block.splitlines()
fields: dict[str, str] = {}
i = 0
while i < len(lines):
key_match = _PARAM_KEY_RE.match(lines[i].strip())
if key_match:
key = key_match.group(1)
i += 1
if i >= len(lines):
break
if lines[i].strip() == "<<EOF":
i += 1
multi: list[str] = []
while i < len(lines) and lines[i].strip() != "EOF":
multi.append(lines[i])
i += 1
fields[key] = "\n".join(multi)
else:
fields[key] = lines[i].strip()
i += 1
name = fields.pop("nama", None)
reason = fields.pop("reason", "") or ""
if name:
return {"name": name, "reason": reason, "params": fields}
return None
def _parse_tool_calls(text: str) -> list[dict]:
calls = []
for m in _TOOLS_CALL_BLOCK_RE.finditer(text):
parsed = _parse_tools_call_block(m.group(1))
if parsed:
calls.append(parsed)
return calls
def _strip_tool_calls(text: str) -> str:
"""Hapus blok tools_call dan token [start/end_response] dari teks."""
cleaned = _TOOLS_CALL_BLOCK_RE.sub("", text)
cleaned = _RESPONSE_TOKEN_RE.sub("", cleaned)
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
return cleaned.strip()
def _render_tool_result(name: str, output: str) -> str:
return f'\n<tool_result name="{name}">\n{output}\n</tool_result>\n'
def _cast_params(params: dict) -> dict:
result = {}
for k, v in params.items():
if k in _INT_PARAMS:
try:
result[k] = int(v)
except ValueError:
result[k] = v
else:
result[k] = v
return result
# ── Error detection ───────────────────────────────────────────────────────────
_ERROR_PATTERNS = re.compile(
r"(\[Error\]"
r"|\[exit code:\s*[^0]\d*\]"
r"|Traceback \(most recent"
r"|FileNotFoundError"
r"|PermissionError"
r"|SyntaxError"
r"|tidak ditemukan"
r"|String tidak ditemukan"
r"|String ditemukan \d+ kali"
r"|Gagal)",
re.IGNORECASE,
)
def _is_error_output(output: str) -> bool:
"""Kembalikan True jika output tool mengindikasikan kegagalan."""
return bool(_ERROR_PATTERNS.search(output))
def _build_error_correction_message(tool_name: str, output: str) -> str:
"""
Buat pesan injeksi yang memaksa AI fokus memperbaiki error
sebelum melanjutkan ke langkah berikutnya.
"""
return (
f"[SYSTEM β€” ERROR DETECTED]\n"
f"Tool `{tool_name}` GAGAL dengan pesan:\n\n"
f"{output.strip()}\n\n"
f"INSTRUKSI WAJIB:\n"
f"1. Analisis pesan error di atas dengan seksama.\n"
f"2. JANGAN lanjut ke langkah berikutnya sebelum error ini diperbaiki.\n"
f"3. Jika error berasal dari `edit_file`: baca ulang file terlebih dahulu\n"
f" dengan `read_file` untuk mendapatkan konten terkini, lalu ulangi edit\n"
f" dengan `old_string` yang PERSIS sesuai isi file.\n"
f"4. Jika error berasal dari `bash`: periksa perintah, path, dan dependensi.\n"
f"5. Setelah berhasil memperbaiki, lanjutkan rencana semula."
)
# ── AGENT LOOP ────────────────────────────────────────────────────────────────
class MultiAgentLoop:
"""
Executor agent loop untuk Gemini Claw.
Urutan konteks setiap API call:
[system prompt] β†’ [history user & assistant] β†’ [input user]
History hanya hidup selama sesi loop; input baru memulai history baru
jika conversation dikosongkan dari luar.
"""
def __init__(self, model: str = DEFAULT_MODEL, workspace: str | Path | None = None):
self.model = model
self.workspace = Path(workspace) if workspace else Path("/tmp/workspace")
self.workspace.mkdir(parents=True, exist_ok=True)
def _build_prompt(self, conversation: list[dict], system: str) -> str:
"""
Susun prompt:
[system prompt] β†’ [20 history terakhir] β†’ [input user saat ini] β†’ [injection]
"""
lines = []
# ── 1. System prompt ──────────────────────────────────────────────────
lines.append(system)
lines.append("")
# ── 2. History (20 pesan, kecuali input user saat ini) ───────────────
history = conversation[:-1]
for msg in history[-20:]:
role = "User" if msg["role"] == "user" else "Assistant"
lines.append(f"{role}: {msg['content']}")
if len(history) > 0:
lines.append("")
# ── 3. Input user saat ini ────────────────────────────────────────────
current_input = conversation[-1]["content"] if conversation else ""
lines.append(f"User: {current_input}")
lines.append("")
# ── 4. Injection anti-halusinasi ──────────────────────────────────────
lines.append(_INJECTION)
return "\n".join(lines)
def run(
self,
user_input: str,
conversation: list[dict],
on_phase: callable = None,
on_text: callable = None,
on_tool_start: callable = None,
on_tool_result: callable = None,
) -> tuple[str, list[dict]]:
"""
Jalankan executor loop untuk satu giliran user.
Returns: (final_reply_text, updated_conversation)
"""
if on_phase:
on_phase("executor", "βš™οΈ Mengerjakan tugas...")
set_workspace(self.workspace)
system = _load_system_prompt(self.workspace)
# Mulai dari snapshot history, tambah pesan user saat ini
conv = list(conversation)
conv.append({"role": "user", "content": user_input})
final_text = ""
turn = 0
consecutive_errors = 0 # jumlah error beruntun tanpa progres
while turn < MAX_EXEC_TURNS:
turn += 1
full_prompt = self._build_prompt(conv, system)
try:
raw_reply = call_gemini(full_prompt, model=self.model)
except GeminiAPIError as e:
err = f"[Error API] {e}"
if on_text:
on_text(err)
return err, conv
tool_calls = _parse_tool_calls(raw_reply)
visible_text = _strip_tool_calls(raw_reply)
if visible_text and on_text:
on_text(visible_text)
if not tool_calls:
# Tidak ada tool call β†’ selesai
conv.append({"role": "assistant", "content": raw_reply})
final_text = visible_text
break
# ── Eksekusi tool calls ───────────────────────────────────────────
tool_results_block = ""
turn_had_error = False
for tc in tool_calls:
name = tc["name"]
reason = tc.get("reason", "")
params = _cast_params(tc["params"])
if on_tool_start:
on_tool_start(name, params, reason)
output = dispatch(name, params)
if on_tool_result:
on_tool_result(name, output)
if _is_error_output(output):
# Ganti output biasa dengan pesan koreksi yang eksplisit
error_msg = _build_error_correction_message(name, output)
tool_results_block += _render_tool_result(name, error_msg)
turn_had_error = True
else:
tool_results_block += _render_tool_result(name, output)
# ── Update consecutive error counter ──────────────────────────────
if turn_had_error:
consecutive_errors += 1
else:
consecutive_errors = 0 # reset jika ada turn bersih
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS:
abort_msg = (
f"[SYSTEM β€” LOOP DIHENTIKAN]\n"
f"AI telah gagal {consecutive_errors} kali berturut-turut tanpa memperbaiki "
f"error. Harap periksa error terakhir secara manual."
)
if on_text:
on_text(abort_msg)
conv.append({"role": "assistant", "content": raw_reply})
return abort_msg, conv
conv.append({"role": "assistant", "content": raw_reply})
conv.append({"role": "user", "content": tool_results_block.strip()})
else:
final_text = "[Batas maksimum turn tercapai]"
return final_text, conv