import os import time import json import shutil import asyncio import zipfile import re import subprocess from pathlib import Path from fastapi import FastAPI, WebSocket, WebSocketDisconnect, UploadFile, File, BackgroundTasks, Form from fastapi.responses import HTMLResponse, FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware import httpx import uvicorn # ========================================== # KONFIGURASI & VARIABEL GLOBAL # ========================================== PORT = 7860 BASE_DIR = Path("/tmp/puruai_sessions") SESSION_TIMEOUT = 259200 # 3 Hari API_URL = 'https://puruboy-api.vercel.app/api/ai/gemini-v2' BASE_DELAY = 3 # Prompt Utama (Pelaksana) dengan kemampuan Penjelasan (Chain of Thought) SYSTEM_PROMPT = """YOU ARE AN ELITE AI CODE EDITOR AGENT DESIGNED TO OPERATE WITH MAXIMUM PRECISION IN A CONTROLLED FILE SYSTEM ENVIRONMENT. YOU MUST EXECUTE EXACTLY ONE ACTION PER LOOP AND STRICTLY FOLLOW THE PROVIDED EXECUTION COMMAND FORMAT. YOUR CORE OBJECTIVE IS TO ANALYZE, DECIDE, EXPLAIN YOUR LOGIC, AND EXECUTE FILE OPERATIONS STEP-BY-STEP. --- ### 🔥 CORE RULES - YOU MUST MAKE **ONLY ONE DECISION PER LOOP** - YOU MUST **EXPLAIN YOUR THINKING PROCESS FIRST**, THEN OUTPUT EXACTLY ONE COMMAND - YOU MUST OPERATE BASED ONLY ON AVAILABLE CONTEXT (NO INTERNET, NO ASSUMPTIONS) - YOU MUST FOLLOW STRICT PATH FORMAT: `#root/...` --- ### 📂 AVAILABLE ACTIONS 1. READ ALL FILE STRUCTURE all 2. READ FILE CONTENT read #root/filename.ext 3. WRITE / OVERWRITE FILE write #root/filename.extFILE_CONTENT_HERE 4. MOVE FILE move #root/source.ext#root/destination.ext 5. DELETE FILE delete #root/filename.ext 6. SEARCH (TRIMMED CONTEXT) SEARCH_QUERY 7. FINISH / STOP EXECUTION (WHEN TASK IS COMPLETE) stop --- ### 🧠 CHAIN OF THOUGHT PROCESS (MANDATORY) YOU MUST FOLLOW THIS THINKING SEQUENCE AND **WRITE IT DOWN** BEFORE EVERY ACTION: 1. UNDERSTAND: Identify the user's goal clearly. 2. ANALYZE: Check what information is missing based on current context. 3. DECIDE: Decide the safest and most logical next step (read, write, search, etc). 4. EXECUTE: Output exactly one valid command. --- ### ⚠️ WHAT NOT TO DO (CRITICAL) - NEVER OUTPUT MULTIPLE COMMANDS ❌ WRONG: all read ... - NEVER PUT YOUR EXPLANATION INSIDE THE EXECUTION TAG ❌ WRONG: I will read the file now. read #root/file.js ✅ CORRECT: I need to check the contents of file.js to find the bug. read #root/file.js - NEVER GUESS FILE CONTENT ❌ WRONG: write ...> (without reading first) - NEVER SKIP STEPS (LIKE WRITING BEFORE READING STRUCTURE) - NEVER USE INVALID PATH FORMAT ❌ WRONG: `/home/user/file.js` ✅ CORRECT: `#root/file.js` - NEVER PERFORM MORE THAN ONE ACTION PER LOOP --- ### ✅ OPTIMAL STRATEGY - ALWAYS START WITH EXPLANATION + all - THEN: READ relevant files → ANALYZE → MODIFY step-by-step - WHEN FINISHED: EXPLAIN completion → stop --- ### 🎯 FEW-SHOT EXAMPLES #### Example 1: Initial Step User: "Fix bug in app.js" ✅ Correct AI Response: I need to understand the current project structure first before locating and modifying `app.js`. Let me read the directory structure. all --- #### Example 2: After Seeing Structure ✅ Correct AI Response: I see `app.js` in the root directory. I need to read its content to understand the bug before making any changes. read #root/app.js --- #### Example 3: After Analysis ✅ Correct AI Response: I found the issue. The variable `userData` is undefined. I will fix the function and write the updated code back to `app.js`. write #root/app.jsFIXED_CODE --- #### Example 4: Task Completed ✅ Correct AI Response: The bug in `app.js` has been successfully fixed and the changes are written. My task is complete. stop --- ### 🚀 FINAL DIRECTIVE YOU ARE A DETERMINISTIC FILE-SYSTEM AGENT. ALWAYS EXPLAIN YOUR LOGIC BRIEFLY FIRST. THEN EXECUTE EXACTLY ONE ACTION. NEVER BREAK THE XML FORMAT FOR COMMANDS.""" SESSIONS = {} app = FastAPI() app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) # ========================================== # UTILITAS FILE SYSTEM SERVER # ========================================== def get_session_dir(session_id: str) -> Path: return BASE_DIR / session_id def update_session_activity(session_id: str): if session_id in SESSIONS: SESSIONS[session_id]["last_active"] = time.time() session_dir = get_session_dir(session_id) if session_dir.exists(): os.utime(session_dir, None) def get_vfs_list(session_id: str) -> list: session_dir = get_session_dir(session_id) if not session_dir.exists(): return [] files = [] for root, _, filenames in os.walk(session_dir): for filename in filenames: full_path = Path(root) / filename rel_path = full_path.relative_to(session_dir).as_posix() files.append(rel_path) return sorted(files) def is_binary(file_path: Path) -> bool: try: with open(file_path, 'tr') as check_file: check_file.read(1024) return False except UnicodeDecodeError: return True async def broadcast_ws(session_id: str, message: dict): if session_id in SESSIONS and SESSIONS[session_id].get("ws"): try: await SESSIONS[session_id]["ws"].send_json(message) except: pass async def send_vfs_update(session_id: str): await broadcast_ws(session_id, {"type": "vfs", "data": get_vfs_list(session_id)}) # ========================================== # LOGIKA AGENT AI PELAKSANA # ========================================== def execute_command(session_id: str, command_str: str) -> dict: session_dir = get_session_dir(session_id) os.makedirs(session_dir, exist_ok=True) try: exec_match = re.search(r'\s*([\s\S]*?)\s*', command_str, re.IGNORECASE) if not exec_match: return {"action": "stop", "log": "Operasi selesai atau tag tidak ditemukan. Berhenti."} cmd_body = exec_match.group(1).strip() # 0. STOP if cmd_body.lower() == 'stop': return {"action": "stop", "log": "Task Completed. Agent dihentikan secara eksplisit oleh AI."} # 1. ALL if cmd_body == 'all': files = get_vfs_list(session_id) return {"action": "all", "log": f"VFS Structure:\n" + "\n".join(files) if files else 'Directory is empty.'} # 2. READ read_match = re.search(r'^read\s*(.*?)', cmd_body, re.IGNORECASE) if read_match: clean_path = read_match.group(1).replace('#root/', '').strip() file_path = session_dir / clean_path if file_path.exists() and file_path.is_file(): if is_binary(file_path): return {"action": "read", "log": "[Binary File Unreadable]"} with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return {"action": "read", "log": f"Content of {clean_path}:\n{content}"} raise Exception("File not found.") # 3. WRITE write_match = re.search(r'^write\s*(.*?)\s*([\s\S]*?)', cmd_body, re.IGNORECASE) if write_match: clean_path = write_match.group(1).replace('#root/', '').strip() content = write_match.group(2) file_path = session_dir / clean_path os.makedirs(file_path.parent, exist_ok=True) with open(file_path, 'w', encoding='utf-8') as f: f.write(content) return {"action": "write", "log": f"Successfully written to {clean_path}."} # 4. DELETE delete_match = re.search(r'^delete\s*(.*?)', cmd_body, re.IGNORECASE) if delete_match: clean_path = delete_match.group(1).replace('#root/', '').strip() file_path = session_dir / clean_path if file_path.exists(): if file_path.is_dir(): shutil.rmtree(file_path) else: os.remove(file_path) return {"action": "delete", "log": f"Deleted: {clean_path}"} raise Exception("File not found.") # 5. MOVE move_match = re.search(r'^move\s*(.*?)\s*(.*?)', cmd_body, re.IGNORECASE) if move_match: src_path = move_match.group(1).replace('#root/', '').strip() dst_path = move_match.group(2).replace('#root/', '').strip() src_full = session_dir / src_path dst_full = session_dir / dst_path if src_full.exists(): os.makedirs(dst_full.parent, exist_ok=True) shutil.move(str(src_full), str(dst_full)) return {"action": "move", "log": f"Moved {src_path} to {dst_path}"} raise Exception("Source file not found.") # 6. SEARCH / TRIM trim_match = re.search(r'^(.*?)', cmd_body, re.IGNORECASE) if trim_match: query = trim_match.group(1).strip() results = [] for root, _, files in os.walk(session_dir): for file in files: fp = Path(root) / file if is_binary(fp): continue try: with open(fp, 'r', encoding='utf-8') as f: lines = f.readlines() for i, line in enumerate(lines): if query.lower() in line.lower(): rel_path = fp.relative_to(session_dir).as_posix() results.append(f"{rel_path}:{i+1}: {line.strip()[:120]}") except: pass res_str = "\n".join(results) if results else "No matches found." return {"action": "search", "log": f"Search Results for '{query}':\n{res_str}"} raise Exception("Command structure unrecognized or missing required tags.") except Exception as e: return {"action": "error", "log": f"ERROR: {str(e)}"} async def fetch_ai(prompt_text: str) -> str: async with httpx.AsyncClient(timeout=90.0) as client: response = await client.post(API_URL, json={"prompt": prompt_text}) response.raise_for_status() return response.json()['result']['answer'] async def agent_loop(session_id: str, initial_prompt: str): session = SESSIONS[session_id] session["is_looping"] = True session["chat_history"].append({"role": "user", "text": initial_prompt}) session["ai_memory"].append({"role": "user", "text": initial_prompt}) await broadcast_ws(session_id, {"type": "chat_update", "data": session["chat_history"]}) while session["is_looping"]: try: update_session_activity(session_id) vfs_list = get_vfs_list(session_id) base_payload = f"{SYSTEM_PROMPT}\n\n[Context VFS: {', '.join(vfs_list) if vfs_list else 'Empty'}]\n\n" history_log = "\n".join([f"{m['role'].upper()}: {m['text']}" for m in session["ai_memory"][-12:]]) ai_response = await fetch_ai(f"{base_payload}History:\n{history_log}\n\nExecute NEXT STEP:") session["chat_history"].append({"role": "ai", "text": ai_response}) session["ai_memory"].append({"role": "ai", "text": ai_response}) await broadcast_ws(session_id, {"type": "chat_update", "data": session["chat_history"]}) exec_result = execute_command(session_id, ai_response) if exec_result: await send_vfs_update(session_id) if exec_result["action"] == "stop": session["is_looping"] = False session["ai_memory"] = [] # Tambahkan log terakhir sebelum berhenti ke UI sys_msg = f"SystemLog (STOP):\n{exec_result['log']}" session["chat_history"].append({"role": "system", "text": sys_msg}) await broadcast_ws(session_id, {"type": "chat_update", "data": session["chat_history"]}) await broadcast_ws(session_id, {"type": "status", "text": "Selesai", "statusType": "done", "isLooping": False}) break sys_msg = f"SystemLog ({exec_result['action']}):\n{exec_result['log']}" session["chat_history"].append({"role": "system", "text": sys_msg}) session["ai_memory"].append({"role": "system", "text": sys_msg}) await broadcast_ws(session_id, {"type": "chat_update", "data": session["chat_history"]}) await broadcast_ws(session_id, {"type": "status", "text": f"Jeda {BASE_DELAY}s...", "statusType": "idle"}) await asyncio.sleep(BASE_DELAY) except Exception as e: session["is_looping"] = False session["chat_history"].append({"role": "system", "text": f"SYSTEM ERROR: {str(e)}"}) await broadcast_ws(session_id, {"type": "chat_update", "data": session["chat_history"]}) await broadcast_ws(session_id, {"type": "status", "text": "Error", "statusType": "error", "isLooping": False}) break # ========================================== # ENDPOINTS REST & WEBSOCKET # ========================================== @app.get("/") async def get_ui(): return HTMLResponse(HTML_CONTENT) @app.get("/api/check_session/{session_id}") async def check_session(session_id: str): return {"valid": session_id in SESSIONS or get_session_dir(session_id).exists()} @app.websocket("/ws/{session_id}") async def websocket_endpoint(websocket: WebSocket, session_id: str): await websocket.accept() if session_id not in SESSIONS: SESSIONS[session_id] = {"chat_history": [], "ai_memory": [], "is_looping": False, "last_active": time.time(), "ws": websocket} else: SESSIONS[session_id]["ws"] = websocket session = SESSIONS[session_id] await websocket.send_json({"type": "chat_update", "data": session["chat_history"]}) await websocket.send_json({"type": "vfs", "data": get_vfs_list(session_id)}) try: while True: data = await websocket.receive_text() payload = json.loads(data) if payload["action"] == "prompt" and not session["is_looping"]: asyncio.create_task(agent_loop(session_id, payload["text"])) except WebSocketDisconnect: if session_id in SESSIONS: SESSIONS[session_id]["ws"] = None @app.post("/api/upload_zip") async def upload_zip(session_id: str = Form(...), file: UploadFile = File(...)): s_dir = get_session_dir(session_id) if s_dir.exists(): shutil.rmtree(s_dir) os.makedirs(s_dir, exist_ok=True) temp_zip = s_dir / "temp.zip" with open(temp_zip, "wb") as b: shutil.copyfileobj(file.file, b) with zipfile.ZipFile(temp_zip, 'r') as z: z.extractall(s_dir) os.remove(temp_zip) await send_vfs_update(session_id) return {"status": "success"} @app.get("/api/download_zip/{session_id}") async def download_zip(session_id: str): s_dir = get_session_dir(session_id) z_name = f"/tmp/PuruAI_{session_id}.zip" with zipfile.ZipFile(z_name, 'w', zipfile.ZIP_DEFLATED) as z: for root, _, files in os.walk(s_dir): if "_context_upload" in root: continue for f in files: z.write(os.path.join(root, f), os.path.relpath(os.path.join(root, f), s_dir)) return FileResponse(z_name, filename="Project.zip") @app.post("/api/clear_session") async def clear_session(session_id: str = Form(...)): if session_id in SESSIONS: SESSIONS[session_id]["chat_history"] = [] SESSIONS[session_id]["ai_memory"] = [] SESSIONS[session_id]["is_looping"] = False return {"status": "success"} @app.post("/api/delete_session") async def delete_session(session_id: str = Form(...)): if session_id in SESSIONS: del SESSIONS[session_id] s_dir = get_session_dir(session_id) if s_dir.exists(): shutil.rmtree(s_dir) return {"status": "success"} # ========================================== # UI FRONTEND (HTML) # ========================================== HTML_CONTENT = """ PuruAI - Autonomous Agent
P

PuruAI

Mulai Project

PROJECT FILES Download ZIP
""" if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=PORT)