File size: 10,886 Bytes
ef271cc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8c7fe70
ef271cc
 
 
8c7fe70
 
 
 
 
 
ef271cc
 
 
 
8c7fe70
 
ef271cc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
"""
agent_loop.py β€” Agentic loop standalone untuk Gemini Claw.

Urutan prompt setiap turn:
  [20 history terakhir] β†’ [system prompt] β†’ [input user] β†’ [injection]

Injection:
  Pesan singkat yang WAJIB dipatuhi AI:
  - Jangan halusinasi (jangan tulis 'User:', 'Assistant:', dll.)
  - Awali dengan [start_response], akhiri dengan [end_response]

Error handling:
  - Output tool diperiksa terhadap pola error.
  - Jika error: injeksi pesan koreksi, AI wajib perbaiki dulu.
  - Jika consecutive error >= MAX_CONSECUTIVE_ERRORS: loop dihentikan.
"""
from __future__ import annotations

import re
from pathlib import Path

from .agent_tools import dispatch, TOOL_REGISTRY, set_workspace
from .gemini_client import GeminiAPIError, call_gemini, DEFAULT_MODEL

MAX_TURNS              = 30
MAX_CONSECUTIVE_ERRORS = 4

_PROMPT_FILE = Path(__file__).parent.parent / "system_prompt.md"


def build_system_prompt(workspace: str | Path | None = None) -> str:
    """Baca system_prompt.md dan isi placeholder {workspace}."""
    import os
    if workspace:
        cwd = Path(workspace)
    else:
        ws_env = os.environ.get("CLAW_WORKSPACE", "")
        cwd = Path(ws_env) if ws_env else Path.cwd()
    template = _PROMPT_FILE.read_text(encoding="utf-8")
    return template.replace("{workspace}", str(cwd))


# ── Injection anti-halusinasi ─────────────────────────────────────────────────

_INJECTION = (
    "[SYSTEM INJECTION β€” IKUTI TANPA PENGECUALIAN]\n"
    "1. Kamu HANYA menulis balasan kamu sendiri. "
    "DILARANG menulis label seperti 'User:', 'Assistant:', 'History:', "
    "'Agent:', atau melanjutkan percakapan seolah kamu adalah user.\n"
    "2. Mulai responmu dengan token: [start_response]\n"
    "3. Akhiri responmu dengan token: [end_response]\n"
    "4. Jangan menulis apapun setelah [end_response].\n"
    "Sekarang tulis responmu:"
)


# ── Regex tools_call ──────────────────────────────────────────────────────────

_TOOLS_CALL_BLOCK_RE = re.compile(r"```tools_call\n(.*?)```", re.DOTALL)
_PARAM_KEY_RE        = re.compile(r"^\[(\w+)\]$")

# Regex untuk strip token halusinasi-guard
_RESPONSE_TOKEN_RE   = re.compile(r"\[start_response\]|\[end_response\]", re.IGNORECASE)


def _parse_tools_call_block(block: str) -> dict | None:
    lines  = block.splitlines()
    fields: dict[str, str] = {}
    i = 0
    while i < len(lines):
        key_match = _PARAM_KEY_RE.match(lines[i].strip())
        if key_match:
            key = key_match.group(1)
            i  += 1
            if i >= len(lines):
                break
            if lines[i].strip() == "<<EOF":
                i += 1
                multi: list[str] = []
                while i < len(lines) and lines[i].strip() != "EOF":
                    multi.append(lines[i])
                    i += 1
                fields[key] = "\n".join(multi)
            else:
                fields[key] = lines[i].strip()
        i += 1

    name   = fields.pop("nama", None)
    reason = fields.pop("reason", "") or ""
    if name:
        return {"name": name, "reason": reason, "params": fields}
    return None


def parse_tool_calls(text: str) -> list[dict]:
    calls = []
    for m in _TOOLS_CALL_BLOCK_RE.finditer(text):
        parsed = _parse_tools_call_block(m.group(1))
        if parsed:
            calls.append(parsed)
    return calls


def has_tool_calls(text: str) -> bool:
    return bool(_TOOLS_CALL_BLOCK_RE.search(text))


def _cast_params(tool_name: str, raw: dict[str, str]) -> dict:
    INT_PARAMS = {"offset", "limit", "timeout", "context_lines"}
    result = {}
    for k, v in raw.items():
        if k in INT_PARAMS:
            try:
                result[k] = int(v)
            except ValueError:
                result[k] = v
        else:
            result[k] = v
    return result


def _render_result(name: str, params: dict, output: str) -> str:
    return f'\n<tool_result name="{name}">\n{output}\n</tool_result>\n'


def strip_tool_calls(text: str) -> str:
    """Hapus blok tools_call dan token [start/end_response] dari teks."""
    cleaned = _TOOLS_CALL_BLOCK_RE.sub("", text)
    cleaned = _RESPONSE_TOKEN_RE.sub("", cleaned)
    cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
    return cleaned.strip()


# ── Error detection ───────────────────────────────────────────────────────────

_ERROR_PATTERNS = re.compile(
    r"(\[Error\]"
    r"|\[exit code:\s*[^0]\d*\]"
    r"|Traceback \(most recent"
    r"|FileNotFoundError"
    r"|PermissionError"
    r"|SyntaxError"
    r"|tidak ditemukan"
    r"|String tidak ditemukan"
    r"|String ditemukan \d+ kali"
    r"|Gagal)",
    re.IGNORECASE,
)


def _is_error_output(output: str) -> bool:
    return bool(_ERROR_PATTERNS.search(output))


def _build_error_correction_message(tool_name: str, output: str) -> str:
    return (
        f"[SYSTEM β€” ERROR DETECTED]\n"
        f"Tool `{tool_name}` GAGAL dengan pesan:\n\n"
        f"{output.strip()}\n\n"
        f"INSTRUKSI WAJIB:\n"
        f"1. Analisis pesan error di atas dengan seksama.\n"
        f"2. JANGAN lanjut ke langkah berikutnya sebelum error ini diperbaiki.\n"
        f"3. Jika error dari `edit_file`: gunakan `read_file` dulu untuk mendapat isi\n"
        f"   file terkini, lalu ulangi edit dengan `old_string` yang PERSIS sama.\n"
        f"4. Jika error dari `bash`: periksa perintah, path, dan dependensi.\n"
        f"5. Setelah memperbaiki, lanjutkan rencana semula."
    )


# ── Agentic loop ──────────────────────────────────────────────────────────────

class AgentLoop:
    """
    Loop agentic.

    Urutan prompt setiap turn:
      [20 history terakhir] β†’ [system prompt] β†’ [input user] β†’ [injection]
    """

    def __init__(self, model: str = DEFAULT_MODEL, workspace: str | Path | None = None):
        self.model     = model
        self.workspace = Path(workspace) if workspace else None
        self.system    = build_system_prompt(workspace)

    def _build_full_prompt(self, conversation: list[dict]) -> str:
        """
        Susun prompt:
          [system prompt] β†’ [20 history terakhir] β†’ [input user] β†’ [injection]
        """
        lines = []

        # ── 1. System prompt ──────────────────────────────────────────────────
        lines.append(self.system)
        lines.append("")

        # ── 2. History (20 pesan terakhir, kecuali input user saat ini) ──────
        history = conversation[:-1]
        for turn in history[-20:]:
            role = "User" if turn["role"] == "user" else "Assistant"
            lines.append(f"{role}: {turn['content']}")

        if len(history) > 0:
            lines.append("")

        # ── 3. Input user saat ini ────────────────────────────────────────────
        current_input = conversation[-1]["content"] if conversation else ""
        lines.append(f"User: {current_input}")
        lines.append("")

        # ── 4. Injection anti-halusinasi ──────────────────────────────────────
        lines.append(_INJECTION)

        return "\n".join(lines)

    def run(
        self,
        user_input: str,
        conversation: list[dict],
        on_text: callable = None,
        on_tool_start: callable = None,
        on_tool_result: callable = None,
    ) -> tuple[str, list[dict]]:
        """
        Jalankan satu giliran agentic.
        Returns: (final_reply_text, updated_conversation)
        """
        if self.workspace:
            set_workspace(self.workspace)

        conversation = list(conversation)
        conversation.append({"role": "user", "content": user_input})

        final_text         = ""
        turn_count         = 0
        consecutive_errors = 0

        while turn_count < MAX_TURNS:
            turn_count += 1
            prompt = self._build_full_prompt(conversation)

            try:
                raw_reply = call_gemini(prompt, model=self.model)
            except GeminiAPIError as e:
                err = f"[Error API] {e}"
                if on_text:
                    on_text(err)
                return err, conversation

            tool_calls   = parse_tool_calls(raw_reply)
            visible_text = strip_tool_calls(raw_reply)

            if visible_text and on_text:
                on_text(visible_text)

            if not tool_calls:
                conversation.append({"role": "assistant", "content": raw_reply})
                final_text = visible_text
                break

            tool_results_block = ""
            turn_had_error     = False

            for tc in tool_calls:
                name   = tc["name"]
                reason = tc.get("reason", "")
                params = _cast_params(name, tc["params"])

                if on_tool_start:
                    on_tool_start(name, params, reason)

                output = dispatch(name, params)

                if on_tool_result:
                    on_tool_result(name, output)

                if _is_error_output(output):
                    error_msg           = _build_error_correction_message(name, output)
                    tool_results_block += _render_result(name, params, error_msg)
                    turn_had_error      = True
                else:
                    tool_results_block += _render_result(name, params, output)

            if turn_had_error:
                consecutive_errors += 1
            else:
                consecutive_errors = 0

            if consecutive_errors >= MAX_CONSECUTIVE_ERRORS:
                abort_msg = (
                    f"[SYSTEM β€” LOOP DIHENTIKAN]\n"
                    f"AI gagal {consecutive_errors}x berturut-turut. "
                    f"Periksa error terakhir secara manual."
                )
                if on_text:
                    on_text(abort_msg)
                conversation.append({"role": "assistant", "content": raw_reply})
                return abort_msg, conversation

            conversation.append({"role": "assistant", "content": raw_reply})
            conversation.append({"role": "user",      "content": tool_results_block.strip()})

        else:
            final_text = "[Batas maksimum turn tercapai]"

        return final_text, conversation