File size: 6,586 Bytes
30a02c3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | """
ServerClass DB Admin Dashboard
Flask + Socket.IO backend with background polling and real-time broadcasts.
"""
import os
import time
import threading
from typing import Dict, Any, Optional
import requests
from flask import Flask, render_template, request, jsonify
from flask_socketio import SocketIO, emit, disconnect
# -----------------------------------------------------------------------------
# Configuration
# -----------------------------------------------------------------------------
UPSTREAM_API = os.environ.get("UPSTREAM_API", "https://dooratre-db.hf.space")
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "10")) # seconds
SECRET_KEY = os.environ.get("FLASK_SECRET_KEY", os.urandom(24).hex())
app = Flask(__name__)
app.config["SECRET_KEY"] = SECRET_KEY
socketio = SocketIO(app, cors_allowed_origins="*", async_mode="eventlet")
# -----------------------------------------------------------------------------
# Per-connection credential store (sid -> {admin_secret, api_key})
# -----------------------------------------------------------------------------
_sessions: Dict[str, Dict[str, str]] = {}
_sessions_lock = threading.Lock()
def get_session(sid: str) -> Optional[Dict[str, str]]:
with _sessions_lock:
return _sessions.get(sid)
def set_session(sid: str, creds: Dict[str, str]) -> None:
with _sessions_lock:
_sessions[sid] = creds
def drop_session(sid: str) -> None:
with _sessions_lock:
_sessions.pop(sid, None)
# -----------------------------------------------------------------------------
# Upstream API client
# -----------------------------------------------------------------------------
def fetch_upstream(creds: Dict[str, str]) -> Dict[str, Any]:
"""Fetch users + server status from upstream API using provided credentials."""
result: Dict[str, Any] = {
"ok": True,
"users": [],
"total": 0,
"status": None,
"errors": {},
"timestamp": int(time.time()),
}
admin_secret = creds.get("admin_secret", "").strip()
api_key = creds.get("api_key", "").strip()
# Users (admin endpoint)
try:
r = requests.get(
f"{UPSTREAM_API}/admin/users",
headers={"X-Admin-Secret": admin_secret, "Content-Type": "application/json"},
timeout=15,
)
if r.ok:
data = r.json()
result["users"] = data.get("users", [])
result["total"] = data.get("total", len(result["users"]))
else:
result["errors"]["users"] = f"{r.status_code}: {r.text[:200]}"
except Exception as e:
result["errors"]["users"] = str(e)
# Server status (API key)
try:
r = requests.get(
f"{UPSTREAM_API}/api/server-status",
headers={"X-API-Key": api_key, "Content-Type": "application/json"},
timeout=15,
)
if r.ok:
result["status"] = r.json()
else:
result["errors"]["status"] = f"{r.status_code}: {r.text[:200]}"
except Exception as e:
result["errors"]["status"] = str(e)
if result["errors"] and not result["users"] and not result["status"]:
result["ok"] = False
return result
# -----------------------------------------------------------------------------
# Background poller
# -----------------------------------------------------------------------------
def background_poller():
"""Periodically fetches data for each connected session and pushes updates."""
while True:
socketio.sleep(POLL_INTERVAL)
with _sessions_lock:
sessions_snapshot = dict(_sessions)
for sid, creds in sessions_snapshot.items():
if not creds.get("admin_secret") and not creds.get("api_key"):
continue
try:
payload = fetch_upstream(creds)
payload["source"] = "auto"
socketio.emit("data_update", payload, to=sid)
except Exception as e:
socketio.emit("error", {"message": str(e)}, to=sid)
# -----------------------------------------------------------------------------
# Routes
# -----------------------------------------------------------------------------
@app.route("/")
def index():
return render_template("index.html", poll_interval=POLL_INTERVAL)
@app.route("/health")
def health():
return jsonify({"status": "ok", "sessions": len(_sessions)})
# -----------------------------------------------------------------------------
# Socket.IO events
# -----------------------------------------------------------------------------
@socketio.on("connect")
def on_connect():
set_session(request.sid, {"admin_secret": "", "api_key": ""})
emit("connected", {"sid": request.sid, "poll_interval": POLL_INTERVAL})
@socketio.on("disconnect")
def on_disconnect():
drop_session(request.sid)
@socketio.on("authenticate")
def on_authenticate(data):
"""Client sends credentials; server stores them and triggers immediate fetch."""
creds = {
"admin_secret": (data or {}).get("admin_secret", "").strip(),
"api_key": (data or {}).get("api_key", "").strip(),
}
set_session(request.sid, creds)
payload = fetch_upstream(creds)
payload["source"] = "manual"
emit("data_update", payload)
@socketio.on("refresh")
def on_refresh():
"""Manual refresh request."""
creds = get_session(request.sid) or {}
payload = fetch_upstream(creds)
payload["source"] = "manual"
emit("data_update", payload)
# -----------------------------------------------------------------------------
# Start background poller at import time (gunicorn-friendly)
# -----------------------------------------------------------------------------
_poller_started = False
_poller_lock = threading.Lock()
def _ensure_poller():
global _poller_started
with _poller_lock:
if not _poller_started:
socketio.start_background_task(background_poller)
_poller_started = True
_ensure_poller()
# -----------------------------------------------------------------------------
# Entry point (only used for local `python app.py`)
# -----------------------------------------------------------------------------
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
socketio.run(app, host="0.0.0.0", port=port, debug=False) |