""" AI HTML Editor — Flask Server Multi-file support, system prompt from file, Eruda-only preview. Job disimpan di memori dengan TTL 1 hari. """ from flask import Flask, request, jsonify, Response import uuid, time, re, threading, requests, os app = Flask(__name__) # ───────────────────────────────────────────── # LOAD SYSTEM PROMPT FROM FILE # ───────────────────────────────────────────── _SP_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'system_prompt.md') with open(_SP_PATH, 'r', encoding='utf-8') as _f: SYSTEM_PROMPT = _f.read() # ───────────────────────────────────────────── # JOB STORE (in-memory, TTL = 1 hari) # ───────────────────────────────────────────── JOBS: dict[str, dict] = {} JOB_TTL = 86_400 _lock = threading.Lock() def _purge_expired(): now = time.time() with _lock: expired = [jid for jid, j in JOBS.items() if now - j["created_at"] > JOB_TTL] for jid in expired: del JOBS[jid] def new_job(payload: dict) -> str: _purge_expired() jid = str(uuid.uuid4()) with _lock: JOBS[jid] = { "status": "pending", "created_at": time.time(), "payload": payload, "result": None, "error": None, } return jid def get_job(jid: str) -> dict | None: _purge_expired() with _lock: return JOBS.get(jid) def update_job(jid: str, **kwargs): with _lock: if jid in JOBS: JOBS[jid].update(kwargs) # ───────────────────────────────────────────── # FUZZY REPLACE (Levenshtein-based) # ───────────────────────────────────────────── def levenshtein(a: str, b: str) -> int: la, lb = len(a), len(b) if la == 0: return lb if lb == 0: return la matrix = [list(range(la + 1))] + [[i] + [0] * la for i in range(1, lb + 1)] for i in range(1, lb + 1): for j in range(1, la + 1): if b[i-1] == a[j-1]: matrix[i][j] = matrix[i-1][j-1] else: matrix[i][j] = 1 + min(matrix[i-1][j-1], matrix[i][j-1], matrix[i-1][j]) return matrix[lb][la] def calculate_similarity(s1: str, s2: str) -> float: longer = s1 if len(s1) >= len(s2) else s2 shorter = s1 if len(s1) < len(s2) else s2 if len(longer) == 0: return 1.0 return (len(longer) - levenshtein(longer, shorter)) / len(longer) def fuzzy_replace(source: str, search: str, replacement: str) -> dict: search_trimmed = search.strip() if not search_trimmed: return {"success": False, "result": source} # Level 1 — whitespace-agnostic regex flex = re.sub(r'\s+', r'\\s+', re.escape(search_trimmed)) try: m = re.search(flex, source) if m: return {"success": True, "result": source[:m.start()] + replacement.strip() + source[m.end():]} except re.error: pass # Level 2 — sliding window + Levenshtein if len(search_trimmed) > 3000: return {"success": False, "result": source} source_lines = source.split('\n') search_lines = [l.strip() for l in search_trimmed.split('\n') if l.strip()] if not search_lines: return {"success": False, "result": source} search_str = ''.join(search_lines) best = {"index": -1, "score": 0.0, "length": 0} n = len(search_lines) for window_size in [max(1, n - 1), n, n + 1]: for i in range(len(source_lines)): if i + window_size > len(source_lines): continue window_str = ''.join(l.strip() for l in source_lines[i:i + window_size]) if abs(len(window_str) - len(search_str)) > len(search_str) * 0.5: continue score = calculate_similarity(window_str, search_str) if score > best["score"]: best = {"index": i, "score": score, "length": window_size} if best["score"] > 0.70: result_lines = (source_lines[:best["index"]] + [replacement] + source_lines[best["index"] + best["length"]:]) return {"success": True, "result": '\n'.join(result_lines)} return {"success": False, "result": source} # ───────────────────────────────────────────── # PARSE tools_call BLOCKS # ───────────────────────────────────────────── def _parse_eof_content(block: str, tag: str) -> str | None: """ Extract content between: [tag] (optional text on same line) < ...content... EOF Returns the content string or None if tag not found. """ pattern = ( rf'\[{re.escape(tag)}\][^\n]*\n' # [tag] line r'<\s*\n' # < line r'([\s\S]*?)' # content (captured) r'\nEOF(?:\s|$)' # EOF terminator ) m = re.search(pattern, block) return m.group(1) if m else None def parse_tools_call_blocks(ai_response: str) -> list[dict]: """Parse all ```tools_call ... ``` blocks from AI response.""" pattern = r'```tools_call\s*\n([\s\S]*?)```' raw_blocks = re.findall(pattern, ai_response) results = [] for block in raw_blocks: tool: dict = {} m = re.search(r'\[name\]\s*\n\s*(\S+)', block) if m: tool['name'] = m.group(1).strip() m = re.search(r'\[reason\]\s*\n\s*(.+)', block) if m: tool['reason'] = m.group(1).strip() m = re.search(r'\[path\]\s*\n\s*(\S+)', block) if m: tool['path'] = m.group(1).strip() for tag in ('old_string', 'new_string', 'content'): val = _parse_eof_content(block, tag) if val is not None: tool[tag] = val if tool.get('name') and tool.get('path'): results.append(tool) return results # ───────────────────────────────────────────── # APPLY tools_call TO FILES # ───────────────────────────────────────────── def apply_tools_call(files: dict, ai_response: str) -> dict: """ Parse AI response (tools_call format) dan terapkan ke dict files. Return: {files, message, action_label} """ tools = parse_tools_call_blocks(ai_response) if not tools: raise ValueError( "Tidak ada blok ```tools_call``` yang valid ditemukan dalam respons AI. " "Gunakan format write_file atau edit_file sesuai instruksi sistem." ) # Ambil kalimat pembuka "Saya akan..." sebagai pesan balasan respond_m = re.search(r'Saya akan[^\n]+', ai_response) respond_text = respond_m.group(0).strip() if respond_m else "Kode berhasil diperbarui." temp_files = dict(files) success_count = 0 is_fuzzy = False for i, tool in enumerate(tools): name = tool.get('name', '') path = tool.get('path', '') if name == 'write_file': content = tool.get('content', '') temp_files[path] = content success_count += 1 elif name == 'edit_file': old_string = tool.get('old_string', '') new_string = tool.get('new_string', '') if path not in temp_files: raise ValueError( f"[edit_file ke-{i+1}] File '{path}' tidak ditemukan dalam project." ) current = temp_files[path] # Exact match if old_string in current: temp_files[path] = current.replace(old_string, new_string, 1) success_count += 1 # Trimmed exact match elif old_string.strip() and old_string.strip() in current: temp_files[path] = current.replace(old_string.strip(), new_string.strip(), 1) success_count += 1 else: # Similarity fallback res = fuzzy_replace(current, old_string, new_string) if res['success']: temp_files[path] = res['result'] success_count += 1 is_fuzzy = True else: raise ValueError( f"[edit_file ke-{i+1}, file '{path}'] old_string:\n" f"{old_string}\n\n" "TIDAK DITEMUKAN meskipun Similarity Fallback sudah aktif. " "Salin [old_string] dengan lebih teliti dari isi file yang diberikan!" ) else: raise ValueError(f"Tool tidak dikenal '{name}' pada blok ke-{i+1}. Gunakan write_file atau edit_file.") action_label = f"Diperbarui ({success_count} operasi){' ✨ Auto-Fix' if is_fuzzy else ''}" reasons = [t.get('reason', '') for t in tools if t.get('reason')] return {"files": temp_files, "message": respond_text, "action_label": action_label, "reasons": reasons} # ───────────────────────────────────────────── # BUILD PROMPT # Context: system_prompt.txt + semua file + instruksi user + error log (jika loop) # ───────────────────────────────────────────── def build_prompt(files: dict, user_instruction: str, error_feedback: str = None) -> str: files_context = "" for path, content in files.items(): files_context += f"\n[File: {path}]\n```\n{content}\n```\n" prompt = ( f"{SYSTEM_PROMPT}\n\n" f"--- PROJECT FILES ---\n{files_context}" f"--- END OF FILES ---\n\n" f"Instruksi User: {user_instruction}" ) if error_feedback: prompt += ( f"\n\n[ERROR PADA JAWABANMU SEBELUMNYA]:\n{error_feedback}\n\n" "Perbaiki [old_string] agar identik persis dengan isi file di PROJECT FILES. " "Jika perlu, salin ulang teks tersebut langsung dari sana." ) return prompt # ───────────────────────────────────────────── # AI CALL # ───────────────────────────────────────────── GEMINI_API = "https://puruboy-api.vercel.app/api/ai/gemini-v2" def call_ai(prompt: str) -> str: resp = requests.post( GEMINI_API, json={"prompt": prompt}, timeout=60, headers={"Content-Type": "application/json"}, ) resp.raise_for_status() data = resp.json() if not data.get("success"): raise RuntimeError("Gagal terhubung ke API / Server Error.") return data["result"]["answer"] def run_ai_loop(jid: str, user_instruction: str, files: dict, loop: int = 0, error_feedback: str = None, max_loops: int = 10): """Self-healing AI loop — rekursif di thread terpisah.""" if loop >= max_loops: update_job(jid, status="error", error=( "Gagal merubah kode setelah 10x loop perbaikan. " "AI terus-menerus gagal mencocokkan kode. " "Coba beri instruksi yang lebih spesifik." ), loop_count=loop, ) return try: prompt = build_prompt(files, user_instruction, error_feedback) ai_text = call_ai(prompt) result = apply_tools_call(files, ai_text) update_job(jid, status="done", result=result, loop_count=loop) except Exception as exc: threading.Thread( target=run_ai_loop, args=(jid, user_instruction, files, loop + 1, str(exc), max_loops), daemon=True, ).start() # ───────────────────────────────────────────── # ROUTES # ───────────────────────────────────────────── @app.route("/api/chat", methods=["POST"]) def api_chat(): """Body: { instruction: str, files: {path: content} } → { job_id: str }""" body = request.get_json(silent=True) or {} instruction = body.get("instruction", "").strip() files = body.get("files", {}) if not instruction: return jsonify({"error": "instruction wajib diisi"}), 400 if not isinstance(files, dict): return jsonify({"error": "files harus berupa object {path: content}"}), 400 jid = new_job({"instruction": instruction}) update_job(jid, status="running", loop_count=0) threading.Thread( target=run_ai_loop, args=(jid, instruction, files), daemon=True, ).start() return jsonify({"job_id": jid}), 202 @app.route("/api/job/", methods=["GET"]) def api_job_status(jid): job = get_job(jid) if not job: return jsonify({"error": "Job tidak ditemukan atau sudah expired (> 1 hari)"}), 404 resp = { "job_id": jid, "status": job["status"], "loop_count": job.get("loop_count", 0), } if job["status"] == "done": resp["result"] = job["result"] if job["status"] == "error": resp["error"] = job["error"] return jsonify(resp) @app.route("/api/job/", methods=["DELETE"]) def api_delete_job(jid): with _lock: if jid in JOBS: del JOBS[jid] return jsonify({"deleted": True}) return jsonify({"error": "Job tidak ditemukan"}), 404 # ───────────────────────────────────────────── # UI (embedded HTML) # ───────────────────────────────────────────── HTML = r""" AI HTML Editor AI Editor Download ZIP Simpan File Upload File AI sedang bekerja... (Self-Healing: /10) Salin Project Files New File Tambah Eruda Chat Code Preview """ @app.route("/") def index(): return Response(HTML, mimetype="text/html") if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)