gemini-claw / app.py
PurCLI Agent
Initial commit from PurCLI Coding Agent
ef271cc
"""
app.py — Gemini Claw Web UI
Flask backend dengan Job Polling untuk streaming respons agent.
Job disimpan server-side; client bisa tutup/refresh browser lalu lanjut polling.
Setiap sesi user mendapat workspace terisolasi di BASE_WORKSPACE/{session_id}/
"""
from __future__ import annotations
import io
import json
import os
import shutil
import threading
import time
import uuid
import zipfile
from pathlib import Path
from flask import Flask, jsonify, render_template, request, send_file, send_from_directory
# ── Workspace base ─────────────────────────────────────────────────────────────
_default_base = "/home/runner/work" if os.path.isdir("/home/runner") else "/tmp/workspace"
BASE_WORKSPACE = Path(os.environ.get("CLAW_WORKSPACE", _default_base))
BASE_WORKSPACE.mkdir(parents=True, exist_ok=True)
from src.multi_agent import MultiAgentLoop
from src.gemini_client import DEFAULT_MODEL
app = Flask(__name__)
app.secret_key = os.urandom(24)
@app.after_request
def set_security_headers(response):
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "SAMEORIGIN"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data:; "
"frame-src 'self'; "
"connect-src 'self';"
)
return response
# ── Session storage (in-memory) ────────────────────────────────────────────────
sessions: dict[str, dict] = {}
# ── Job storage (in-memory, polling-based) ─────────────────────────────────────
# Struktur job:
# {
# job_id: {
# "sid": str,
# "status": "running" | "done" | "error",
# "events": [ {type, ...}, ... ], <- append-only event log
# "lock": threading.Lock(),
# "created_at": float,
# }
# }
jobs: dict[str, dict] = {}
_jobs_lock = threading.Lock()
JOB_TTL_SECONDS = 7200 # bersihkan job > 2 jam
def _cleanup_old_jobs():
"""Hapus job yang sudah lebih dari JOB_TTL_SECONDS detik."""
cutoff = time.time() - JOB_TTL_SECONDS
with _jobs_lock:
expired = [jid for jid, j in jobs.items() if j["created_at"] < cutoff]
for jid in expired:
del jobs[jid]
def get_session(sid: str) -> dict:
if sid not in sessions:
ws = BASE_WORKSPACE / sid
ws.mkdir(parents=True, exist_ok=True)
sessions[sid] = {
"conversation": [],
"model": DEFAULT_MODEL,
"workspace": ws,
"conv_lock": threading.Lock(),
}
return sessions[sid]
def session_workspace(sid: str) -> Path:
return get_session(sid)["workspace"]
# ── Path security helper ───────────────────────────────────────────────────────
def _assert_inside_workspace(ws: Path, target: Path) -> None:
ws_real = ws.resolve()
tgt_real = target.resolve()
if tgt_real != ws_real and not str(tgt_real).startswith(str(ws_real) + os.sep):
raise PermissionError(f"Akses ditolak: path di luar workspace ({target})")
# ── Routes ─────────────────────────────────────────────────────────────────────
@app.route("/favicon.ico")
def favicon():
return "", 204
@app.route("/api/session", methods=["POST"])
def restore_session():
"""
Frontend kirim session_id dari localStorage.
Backend buat workspace-nya jika belum ada, lalu kembalikan path.
"""
body = request.get_json(force=True)
sid = body.get("session_id", "").strip()
if not sid:
sid = str(uuid.uuid4())
sess = get_session(sid)
return jsonify({"session_id": sid, "workspace": str(sess["workspace"])})
@app.route("/")
def index():
sid = str(uuid.uuid4())
ws = session_workspace(sid)
return render_template("index.html", session_id=sid, model=DEFAULT_MODEL, workspace=str(ws))
# ── Job Polling API ────────────────────────────────────────────────────────────
@app.route("/api/chat", methods=["POST"])
def chat():
"""
Buat job baru untuk menjalankan agent.
Return: { job_id: str }
Agent berjalan di background thread; hasilnya disimpan sebagai event log.
Client polling /api/job/<job_id>?offset=N untuk mengambil event secara bertahap.
Karena job_id disimpan di localStorage, polling bisa resume setelah refresh/tutup browser.
"""
body = request.get_json(force=True)
sid = body.get("session_id", "")
prompt = body.get("prompt", "").strip()
model = body.get("model", DEFAULT_MODEL)
if not prompt:
return jsonify({"error": "Prompt kosong"}), 400
sess = get_session(sid)
sess["model"] = model
ws = sess["workspace"]
# Buat job baru
job_id = str(uuid.uuid4())
job = {
"sid": sid,
"status": "running",
"events": [],
"lock": threading.Lock(),
"created_at": time.time(),
}
with _jobs_lock:
jobs[job_id] = job
# Ambil snapshot conversation (thread-safe)
with sess["conv_lock"]:
conv_snapshot = list(sess["conversation"])
def _append(event: dict):
with job["lock"]:
job["events"].append(event)
def on_phase(phase: str, message: str):
_append({"type": "phase", "phase": phase, "message": message})
def on_text(text: str):
if text.strip():
_append({"type": "text", "content": text})
def on_tool_start(name: str, params: dict, reason: str = ""):
param_str = ", ".join(f"{k}={repr(v)[:50]}" for k, v in params.items())
_append({"type": "tool_start", "name": name, "params": param_str, "reason": reason})
def on_tool_result(name: str, output: str):
lines = output.strip().splitlines()
preview = "\n".join(lines[:12])
if len(lines) > 12:
preview += f"\n... ({len(lines) - 12} baris lebih)"
_append({"type": "tool_result", "name": name, "output": preview})
def run_agent():
try:
agent = MultiAgentLoop(model=model, workspace=ws)
_, updated = agent.run(
prompt,
conv_snapshot,
on_phase=on_phase,
on_text=on_text,
on_tool_start=on_tool_start,
on_tool_result=on_tool_result,
)
with sess["conv_lock"]:
sess["conversation"] = updated
with job["lock"]:
job["status"] = "done"
_append({"type": "done"})
except Exception as e:
_append({"type": "error", "content": str(e)})
with job["lock"]:
job["status"] = "error"
_append({"type": "done"})
finally:
_cleanup_old_jobs()
thread = threading.Thread(target=run_agent, daemon=True)
thread.start()
return jsonify({"job_id": job_id})
@app.route("/api/job/<job_id>", methods=["GET"])
def get_job(job_id: str):
"""
Polling endpoint utama.
Query param: offset (int, default 0) — ambil events[offset:]
Return:
{ status: "running"|"done"|"error", events: [...], total: int }
- status "running" -> client lanjut polling
- status "done"/"error" -> client berhenti, hapus job dari localStorage
"""
if job_id not in jobs:
return jsonify({"error": "Job tidak ditemukan", "status": "not_found"}), 404
offset = max(0, int(request.args.get("offset", 0)))
job = jobs[job_id]
with job["lock"]:
events = list(job["events"][offset:])
status = job["status"]
total = len(job["events"])
return jsonify({
"status": status,
"events": events,
"total": total,
})
@app.route("/api/job/<job_id>/status", methods=["GET"])
def job_status_check(job_id: str):
"""Cek status job ringan tanpa mengambil event list."""
if job_id not in jobs:
return jsonify({"status": "not_found"}), 404
job = jobs[job_id]
with job["lock"]:
return jsonify({"status": job["status"], "total": len(job["events"])})
# ── Existing endpoints ─────────────────────────────────────────────────────────
@app.route("/api/clear", methods=["POST"])
def clear_session():
"""Hapus HANYA riwayat percakapan — file workspace tidak tersentuh."""
body = request.get_json(force=True)
sid = body.get("session_id", "")
if sid in sessions:
with sessions[sid]["conv_lock"]:
sessions[sid]["conversation"] = []
return jsonify({"ok": True})
@app.route("/api/new-workspace", methods=["POST"])
def new_workspace():
"""
Hapus workspace lama (file + conversation) dan buat session baru yang bersih.
Frontend kirim old_session_id; backend hapus workspace lamanya lalu return session_id baru.
"""
body = request.get_json(force=True)
old_sid = body.get("session_id", "").strip()
# Hapus workspace lama
if old_sid:
old_ws = BASE_WORKSPACE / old_sid
if old_ws.exists():
try:
shutil.rmtree(old_ws)
except Exception:
pass
sessions.pop(old_sid, None)
# Buat session baru
new_sid = str(uuid.uuid4())
sess = get_session(new_sid)
return jsonify({"ok": True, "session_id": new_sid, "workspace": str(sess["workspace"])})
@app.route("/api/files/clear", methods=["POST"])
def clear_workspace_files():
body = request.get_json(force=True)
sid = body.get("session_id", "")
ws = session_workspace(sid)
deleted, errors = [], []
if ws.exists():
for entry in ws.iterdir():
try:
if entry.is_dir():
shutil.rmtree(entry)
else:
entry.unlink()
deleted.append(entry.name)
except Exception as e:
errors.append(f"{entry.name}: {e}")
return jsonify({"ok": True, "deleted": deleted, "errors": errors})
@app.route("/api/file/delete", methods=["POST"])
def delete_file():
body = request.get_json(force=True)
sid = body.get("session_id", "")
filepath = body.get("path", "").strip()
if not filepath:
return jsonify({"ok": False, "error": "Path diperlukan"}), 400
ws = session_workspace(sid)
target = (ws / filepath)
try:
_assert_inside_workspace(ws, target)
except PermissionError as e:
return jsonify({"ok": False, "error": str(e)}), 403
target = target.resolve()
if not target.exists():
return jsonify({"ok": False, "error": "File tidak ditemukan"}), 404
if not target.is_file():
return jsonify({"ok": False, "error": "Bukan file"}), 400
target.unlink()
return jsonify({"ok": True})
@app.route("/api/workspace", methods=["GET"])
def workspace_info():
sid = request.args.get("sid", "")
ws = session_workspace(sid) if sid else BASE_WORKSPACE
try:
entries = sorted(ws.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower()))
files = [
{"name": e.name, "is_dir": e.is_dir(), "size": e.stat().st_size if e.is_file() else None}
for e in entries[:30]
]
except Exception:
files = []
return jsonify({"path": str(ws), "files": files})
SKIP_DIRS = {"node_modules", ".git", ".cache", "__pycache__", ".next", "dist", ".venv", "venv"}
SKIP_NAMES = {".DS_Store", "Thumbs.db"}
def _is_hidden(name: str) -> bool:
return name.startswith(".")
def _list_files_recursive(workspace: Path) -> list[dict]:
results = []
def walk(directory: Path, prefix: str = ""):
try:
entries = sorted(directory.iterdir(), key=lambda x: (x.is_file(), x.name.lower()))
except PermissionError:
return
for entry in entries:
name = entry.name
if _is_hidden(name) or name in SKIP_NAMES:
continue
rel = (prefix + "/" + name).lstrip("/")
if entry.is_dir():
if name in SKIP_DIRS:
continue
walk(entry, rel)
else:
results.append({
"path": rel,
"name": name,
"size": entry.stat().st_size,
"is_html": name.lower().endswith((".html", ".htm")),
})
walk(workspace)
return results
@app.route("/api/files", methods=["GET"])
def list_files():
sid = request.args.get("sid", "")
ws = session_workspace(sid) if sid else BASE_WORKSPACE
return jsonify({"files": _list_files_recursive(ws)})
@app.route("/api/file", methods=["GET"])
def download_file():
sid = request.args.get("sid", "")
filepath = request.args.get("path", "")
if not filepath:
return "Path diperlukan", 400
ws = session_workspace(sid) if sid else BASE_WORKSPACE
target = ws / filepath
try:
_assert_inside_workspace(ws, target)
except PermissionError:
return "Akses ditolak: path traversal terdeteksi", 403
return send_from_directory(str(ws.resolve()), filepath, as_attachment=True)
@app.route("/workspace/<sid>/<path:filepath>")
def serve_workspace(sid: str, filepath: str):
ws = session_workspace(sid)
target = ws / filepath
try:
_assert_inside_workspace(ws, target)
except PermissionError:
return "Akses ditolak: path traversal terdeteksi", 403
return send_from_directory(str(ws.resolve()), filepath)
@app.route("/api/download-zip", methods=["GET"])
def download_zip():
sid = request.args.get("sid", "")
ws = session_workspace(sid) if sid else BASE_WORKSPACE
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for fp in sorted(ws.rglob("*")):
if fp.is_file():
parts = fp.relative_to(ws).parts
if any(p.startswith(".") or p in SKIP_DIRS for p in parts):
continue
zf.write(fp, fp.relative_to(ws))
buf.seek(0)
return send_file(buf, mimetype="application/zip", as_attachment=True, download_name="workspace.zip")
@app.route("/api/upload", methods=["POST"])
def upload_files():
sid = request.form.get("session_id", "")
ws = session_workspace(sid) if sid else BASE_WORKSPACE
ws.mkdir(parents=True, exist_ok=True)
files = request.files.getlist("files")
results = []
for f in files:
filename = f.filename or "upload"
safe_name = Path(filename).name
if safe_name.lower().endswith(".zip"):
try:
with zipfile.ZipFile(io.BytesIO(f.read())) as zf:
extracted = []
for member in zf.infolist():
target = ws / member.filename
try:
_assert_inside_workspace(ws, target)
except PermissionError:
continue
zf.extract(member, ws)
extracted.append(member.filename)
results.append({"name": safe_name, "type": "zip", "extracted": len(extracted), "ok": True})
except Exception as e:
results.append({"name": safe_name, "type": "zip", "ok": False, "error": str(e)})
else:
try:
dest = ws / safe_name
f.save(str(dest))
results.append({"name": safe_name, "type": "file", "size": dest.stat().st_size, "ok": True})
except Exception as e:
results.append({"name": safe_name, "type": "file", "ok": False, "error": str(e)})
return jsonify({"ok": True, "results": results})
if __name__ == "__main__":
port = int(os.environ.get("PORT", 5000))
app.run(host="0.0.0.0", port=port, debug=False, threaded=True)