gemini-claw / src /agent_loop.py
Ricky01anjay's picture
Upload files using @huggingface/hub
8c7fe70 verified
"""
agent_loop.py β€” Agentic loop standalone untuk Gemini Claw.
Urutan prompt setiap turn:
[20 history terakhir] β†’ [system prompt] β†’ [input user] β†’ [injection]
Injection:
Pesan singkat yang WAJIB dipatuhi AI:
- Jangan halusinasi (jangan tulis 'User:', 'Assistant:', dll.)
- Awali dengan [start_response], akhiri dengan [end_response]
Error handling:
- Output tool diperiksa terhadap pola error.
- Jika error: injeksi pesan koreksi, AI wajib perbaiki dulu.
- Jika consecutive error >= MAX_CONSECUTIVE_ERRORS: loop dihentikan.
"""
from __future__ import annotations
import re
from pathlib import Path
from .agent_tools import dispatch, TOOL_REGISTRY, set_workspace
from .gemini_client import GeminiAPIError, call_gemini, DEFAULT_MODEL
MAX_TURNS = 30
MAX_CONSECUTIVE_ERRORS = 4
_PROMPT_FILE = Path(__file__).parent.parent / "system_prompt.md"
def build_system_prompt(workspace: str | Path | None = None) -> str:
"""Baca system_prompt.md dan isi placeholder {workspace}."""
import os
if workspace:
cwd = Path(workspace)
else:
ws_env = os.environ.get("CLAW_WORKSPACE", "")
cwd = Path(ws_env) if ws_env else Path.cwd()
template = _PROMPT_FILE.read_text(encoding="utf-8")
return template.replace("{workspace}", str(cwd))
# ── Injection anti-halusinasi ─────────────────────────────────────────────────
_INJECTION = (
"[SYSTEM INJECTION β€” IKUTI TANPA PENGECUALIAN]\n"
"1. Kamu HANYA menulis balasan kamu sendiri. "
"DILARANG menulis label seperti 'User:', 'Assistant:', 'History:', "
"'Agent:', atau melanjutkan percakapan seolah kamu adalah user.\n"
"2. Mulai responmu dengan token: [start_response]\n"
"3. Akhiri responmu dengan token: [end_response]\n"
"4. Jangan menulis apapun setelah [end_response].\n"
"Sekarang tulis responmu:"
)
# ── Regex tools_call ──────────────────────────────────────────────────────────
_TOOLS_CALL_BLOCK_RE = re.compile(r"```tools_call\n(.*?)```", re.DOTALL)
_PARAM_KEY_RE = re.compile(r"^\[(\w+)\]$")
# Regex untuk strip token halusinasi-guard
_RESPONSE_TOKEN_RE = re.compile(r"\[start_response\]|\[end_response\]", re.IGNORECASE)
def _parse_tools_call_block(block: str) -> dict | None:
lines = block.splitlines()
fields: dict[str, str] = {}
i = 0
while i < len(lines):
key_match = _PARAM_KEY_RE.match(lines[i].strip())
if key_match:
key = key_match.group(1)
i += 1
if i >= len(lines):
break
if lines[i].strip() == "<<EOF":
i += 1
multi: list[str] = []
while i < len(lines) and lines[i].strip() != "EOF":
multi.append(lines[i])
i += 1
fields[key] = "\n".join(multi)
else:
fields[key] = lines[i].strip()
i += 1
name = fields.pop("nama", None)
reason = fields.pop("reason", "") or ""
if name:
return {"name": name, "reason": reason, "params": fields}
return None
def parse_tool_calls(text: str) -> list[dict]:
calls = []
for m in _TOOLS_CALL_BLOCK_RE.finditer(text):
parsed = _parse_tools_call_block(m.group(1))
if parsed:
calls.append(parsed)
return calls
def has_tool_calls(text: str) -> bool:
return bool(_TOOLS_CALL_BLOCK_RE.search(text))
def _cast_params(tool_name: str, raw: dict[str, str]) -> dict:
INT_PARAMS = {"offset", "limit", "timeout", "context_lines"}
result = {}
for k, v in raw.items():
if k in INT_PARAMS:
try:
result[k] = int(v)
except ValueError:
result[k] = v
else:
result[k] = v
return result
def _render_result(name: str, params: dict, output: str) -> str:
return f'\n<tool_result name="{name}">\n{output}\n</tool_result>\n'
def strip_tool_calls(text: str) -> str:
"""Hapus blok tools_call dan token [start/end_response] dari teks."""
cleaned = _TOOLS_CALL_BLOCK_RE.sub("", text)
cleaned = _RESPONSE_TOKEN_RE.sub("", cleaned)
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
return cleaned.strip()
# ── Error detection ───────────────────────────────────────────────────────────
_ERROR_PATTERNS = re.compile(
r"(\[Error\]"
r"|\[exit code:\s*[^0]\d*\]"
r"|Traceback \(most recent"
r"|FileNotFoundError"
r"|PermissionError"
r"|SyntaxError"
r"|tidak ditemukan"
r"|String tidak ditemukan"
r"|String ditemukan \d+ kali"
r"|Gagal)",
re.IGNORECASE,
)
def _is_error_output(output: str) -> bool:
return bool(_ERROR_PATTERNS.search(output))
def _build_error_correction_message(tool_name: str, output: str) -> str:
return (
f"[SYSTEM β€” ERROR DETECTED]\n"
f"Tool `{tool_name}` GAGAL dengan pesan:\n\n"
f"{output.strip()}\n\n"
f"INSTRUKSI WAJIB:\n"
f"1. Analisis pesan error di atas dengan seksama.\n"
f"2. JANGAN lanjut ke langkah berikutnya sebelum error ini diperbaiki.\n"
f"3. Jika error dari `edit_file`: gunakan `read_file` dulu untuk mendapat isi\n"
f" file terkini, lalu ulangi edit dengan `old_string` yang PERSIS sama.\n"
f"4. Jika error dari `bash`: periksa perintah, path, dan dependensi.\n"
f"5. Setelah memperbaiki, lanjutkan rencana semula."
)
# ── Agentic loop ──────────────────────────────────────────────────────────────
class AgentLoop:
"""
Loop agentic.
Urutan prompt setiap turn:
[20 history terakhir] β†’ [system prompt] β†’ [input user] β†’ [injection]
"""
def __init__(self, model: str = DEFAULT_MODEL, workspace: str | Path | None = None):
self.model = model
self.workspace = Path(workspace) if workspace else None
self.system = build_system_prompt(workspace)
def _build_full_prompt(self, conversation: list[dict]) -> str:
"""
Susun prompt:
[system prompt] β†’ [20 history terakhir] β†’ [input user] β†’ [injection]
"""
lines = []
# ── 1. System prompt ──────────────────────────────────────────────────
lines.append(self.system)
lines.append("")
# ── 2. History (20 pesan terakhir, kecuali input user saat ini) ──────
history = conversation[:-1]
for turn in history[-20:]:
role = "User" if turn["role"] == "user" else "Assistant"
lines.append(f"{role}: {turn['content']}")
if len(history) > 0:
lines.append("")
# ── 3. Input user saat ini ────────────────────────────────────────────
current_input = conversation[-1]["content"] if conversation else ""
lines.append(f"User: {current_input}")
lines.append("")
# ── 4. Injection anti-halusinasi ──────────────────────────────────────
lines.append(_INJECTION)
return "\n".join(lines)
def run(
self,
user_input: str,
conversation: list[dict],
on_text: callable = None,
on_tool_start: callable = None,
on_tool_result: callable = None,
) -> tuple[str, list[dict]]:
"""
Jalankan satu giliran agentic.
Returns: (final_reply_text, updated_conversation)
"""
if self.workspace:
set_workspace(self.workspace)
conversation = list(conversation)
conversation.append({"role": "user", "content": user_input})
final_text = ""
turn_count = 0
consecutive_errors = 0
while turn_count < MAX_TURNS:
turn_count += 1
prompt = self._build_full_prompt(conversation)
try:
raw_reply = call_gemini(prompt, model=self.model)
except GeminiAPIError as e:
err = f"[Error API] {e}"
if on_text:
on_text(err)
return err, conversation
tool_calls = parse_tool_calls(raw_reply)
visible_text = strip_tool_calls(raw_reply)
if visible_text and on_text:
on_text(visible_text)
if not tool_calls:
conversation.append({"role": "assistant", "content": raw_reply})
final_text = visible_text
break
tool_results_block = ""
turn_had_error = False
for tc in tool_calls:
name = tc["name"]
reason = tc.get("reason", "")
params = _cast_params(name, tc["params"])
if on_tool_start:
on_tool_start(name, params, reason)
output = dispatch(name, params)
if on_tool_result:
on_tool_result(name, output)
if _is_error_output(output):
error_msg = _build_error_correction_message(name, output)
tool_results_block += _render_result(name, params, error_msg)
turn_had_error = True
else:
tool_results_block += _render_result(name, params, output)
if turn_had_error:
consecutive_errors += 1
else:
consecutive_errors = 0
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS:
abort_msg = (
f"[SYSTEM β€” LOOP DIHENTIKAN]\n"
f"AI gagal {consecutive_errors}x berturut-turut. "
f"Periksa error terakhir secara manual."
)
if on_text:
on_text(abort_msg)
conversation.append({"role": "assistant", "content": raw_reply})
return abort_msg, conversation
conversation.append({"role": "assistant", "content": raw_reply})
conversation.append({"role": "user", "content": tool_results_block.strip()})
else:
final_text = "[Batas maksimum turn tercapai]"
return final_text, conversation