""" ServerClass DB Admin Dashboard Flask + Socket.IO backend with background polling and real-time broadcasts. """ import os import time import threading from typing import Dict, Any, Optional import requests from flask import Flask, render_template, request, jsonify from flask_socketio import SocketIO, emit, disconnect # ----------------------------------------------------------------------------- # Configuration # ----------------------------------------------------------------------------- UPSTREAM_API = os.environ.get("UPSTREAM_API", "https://dooratre-db.hf.space") POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "10")) # seconds SECRET_KEY = os.environ.get("FLASK_SECRET_KEY", os.urandom(24).hex()) app = Flask(__name__) app.config["SECRET_KEY"] = SECRET_KEY socketio = SocketIO(app, cors_allowed_origins="*", async_mode="eventlet") # ----------------------------------------------------------------------------- # Per-connection credential store (sid -> {admin_secret, api_key}) # ----------------------------------------------------------------------------- _sessions: Dict[str, Dict[str, str]] = {} _sessions_lock = threading.Lock() def get_session(sid: str) -> Optional[Dict[str, str]]: with _sessions_lock: return _sessions.get(sid) def set_session(sid: str, creds: Dict[str, str]) -> None: with _sessions_lock: _sessions[sid] = creds def drop_session(sid: str) -> None: with _sessions_lock: _sessions.pop(sid, None) # ----------------------------------------------------------------------------- # Upstream API client # ----------------------------------------------------------------------------- def fetch_upstream(creds: Dict[str, str]) -> Dict[str, Any]: """Fetch users + server status from upstream API using provided credentials.""" result: Dict[str, Any] = { "ok": True, "users": [], "total": 0, "status": None, "errors": {}, "timestamp": int(time.time()), } admin_secret = creds.get("admin_secret", "").strip() api_key = creds.get("api_key", "").strip() # Users (admin endpoint) try: r = requests.get( f"{UPSTREAM_API}/admin/users", headers={"X-Admin-Secret": admin_secret, "Content-Type": "application/json"}, timeout=15, ) if r.ok: data = r.json() result["users"] = data.get("users", []) result["total"] = data.get("total", len(result["users"])) else: result["errors"]["users"] = f"{r.status_code}: {r.text[:200]}" except Exception as e: result["errors"]["users"] = str(e) # Server status (API key) try: r = requests.get( f"{UPSTREAM_API}/api/server-status", headers={"X-API-Key": api_key, "Content-Type": "application/json"}, timeout=15, ) if r.ok: result["status"] = r.json() else: result["errors"]["status"] = f"{r.status_code}: {r.text[:200]}" except Exception as e: result["errors"]["status"] = str(e) if result["errors"] and not result["users"] and not result["status"]: result["ok"] = False return result # ----------------------------------------------------------------------------- # Background poller # ----------------------------------------------------------------------------- def background_poller(): """Periodically fetches data for each connected session and pushes updates.""" while True: socketio.sleep(POLL_INTERVAL) with _sessions_lock: sessions_snapshot = dict(_sessions) for sid, creds in sessions_snapshot.items(): if not creds.get("admin_secret") and not creds.get("api_key"): continue try: payload = fetch_upstream(creds) payload["source"] = "auto" socketio.emit("data_update", payload, to=sid) except Exception as e: socketio.emit("error", {"message": str(e)}, to=sid) # ----------------------------------------------------------------------------- # Routes # ----------------------------------------------------------------------------- @app.route("/") def index(): return render_template("index.html", poll_interval=POLL_INTERVAL) @app.route("/health") def health(): return jsonify({"status": "ok", "sessions": len(_sessions)}) # ----------------------------------------------------------------------------- # Socket.IO events # ----------------------------------------------------------------------------- @socketio.on("connect") def on_connect(): set_session(request.sid, {"admin_secret": "", "api_key": ""}) emit("connected", {"sid": request.sid, "poll_interval": POLL_INTERVAL}) @socketio.on("disconnect") def on_disconnect(): drop_session(request.sid) @socketio.on("authenticate") def on_authenticate(data): """Client sends credentials; server stores them and triggers immediate fetch.""" creds = { "admin_secret": (data or {}).get("admin_secret", "").strip(), "api_key": (data or {}).get("api_key", "").strip(), } set_session(request.sid, creds) payload = fetch_upstream(creds) payload["source"] = "manual" emit("data_update", payload) @socketio.on("refresh") def on_refresh(): """Manual refresh request.""" creds = get_session(request.sid) or {} payload = fetch_upstream(creds) payload["source"] = "manual" emit("data_update", payload) # ----------------------------------------------------------------------------- # Start background poller at import time (gunicorn-friendly) # ----------------------------------------------------------------------------- _poller_started = False _poller_lock = threading.Lock() def _ensure_poller(): global _poller_started with _poller_lock: if not _poller_started: socketio.start_background_task(background_poller) _poller_started = True _ensure_poller() # ----------------------------------------------------------------------------- # Entry point (only used for local `python app.py`) # ----------------------------------------------------------------------------- if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) socketio.run(app, host="0.0.0.0", port=port, debug=False)