admin / app.py
CORVO-AI's picture
Upload 6 files
30a02c3 verified
"""
ServerClass DB Admin Dashboard
Flask + Socket.IO backend with background polling and real-time broadcasts.
"""
import os
import time
import threading
from typing import Dict, Any, Optional
import requests
from flask import Flask, render_template, request, jsonify
from flask_socketio import SocketIO, emit, disconnect
# -----------------------------------------------------------------------------
# Configuration
# -----------------------------------------------------------------------------
UPSTREAM_API = os.environ.get("UPSTREAM_API", "https://dooratre-db.hf.space")
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "10")) # seconds
SECRET_KEY = os.environ.get("FLASK_SECRET_KEY", os.urandom(24).hex())
app = Flask(__name__)
app.config["SECRET_KEY"] = SECRET_KEY
socketio = SocketIO(app, cors_allowed_origins="*", async_mode="eventlet")
# -----------------------------------------------------------------------------
# Per-connection credential store (sid -> {admin_secret, api_key})
# -----------------------------------------------------------------------------
_sessions: Dict[str, Dict[str, str]] = {}
_sessions_lock = threading.Lock()
def get_session(sid: str) -> Optional[Dict[str, str]]:
with _sessions_lock:
return _sessions.get(sid)
def set_session(sid: str, creds: Dict[str, str]) -> None:
with _sessions_lock:
_sessions[sid] = creds
def drop_session(sid: str) -> None:
with _sessions_lock:
_sessions.pop(sid, None)
# -----------------------------------------------------------------------------
# Upstream API client
# -----------------------------------------------------------------------------
def fetch_upstream(creds: Dict[str, str]) -> Dict[str, Any]:
"""Fetch users + server status from upstream API using provided credentials."""
result: Dict[str, Any] = {
"ok": True,
"users": [],
"total": 0,
"status": None,
"errors": {},
"timestamp": int(time.time()),
}
admin_secret = creds.get("admin_secret", "").strip()
api_key = creds.get("api_key", "").strip()
# Users (admin endpoint)
try:
r = requests.get(
f"{UPSTREAM_API}/admin/users",
headers={"X-Admin-Secret": admin_secret, "Content-Type": "application/json"},
timeout=15,
)
if r.ok:
data = r.json()
result["users"] = data.get("users", [])
result["total"] = data.get("total", len(result["users"]))
else:
result["errors"]["users"] = f"{r.status_code}: {r.text[:200]}"
except Exception as e:
result["errors"]["users"] = str(e)
# Server status (API key)
try:
r = requests.get(
f"{UPSTREAM_API}/api/server-status",
headers={"X-API-Key": api_key, "Content-Type": "application/json"},
timeout=15,
)
if r.ok:
result["status"] = r.json()
else:
result["errors"]["status"] = f"{r.status_code}: {r.text[:200]}"
except Exception as e:
result["errors"]["status"] = str(e)
if result["errors"] and not result["users"] and not result["status"]:
result["ok"] = False
return result
# -----------------------------------------------------------------------------
# Background poller
# -----------------------------------------------------------------------------
def background_poller():
"""Periodically fetches data for each connected session and pushes updates."""
while True:
socketio.sleep(POLL_INTERVAL)
with _sessions_lock:
sessions_snapshot = dict(_sessions)
for sid, creds in sessions_snapshot.items():
if not creds.get("admin_secret") and not creds.get("api_key"):
continue
try:
payload = fetch_upstream(creds)
payload["source"] = "auto"
socketio.emit("data_update", payload, to=sid)
except Exception as e:
socketio.emit("error", {"message": str(e)}, to=sid)
# -----------------------------------------------------------------------------
# Routes
# -----------------------------------------------------------------------------
@app.route("/")
def index():
return render_template("index.html", poll_interval=POLL_INTERVAL)
@app.route("/health")
def health():
return jsonify({"status": "ok", "sessions": len(_sessions)})
# -----------------------------------------------------------------------------
# Socket.IO events
# -----------------------------------------------------------------------------
@socketio.on("connect")
def on_connect():
set_session(request.sid, {"admin_secret": "", "api_key": ""})
emit("connected", {"sid": request.sid, "poll_interval": POLL_INTERVAL})
@socketio.on("disconnect")
def on_disconnect():
drop_session(request.sid)
@socketio.on("authenticate")
def on_authenticate(data):
"""Client sends credentials; server stores them and triggers immediate fetch."""
creds = {
"admin_secret": (data or {}).get("admin_secret", "").strip(),
"api_key": (data or {}).get("api_key", "").strip(),
}
set_session(request.sid, creds)
payload = fetch_upstream(creds)
payload["source"] = "manual"
emit("data_update", payload)
@socketio.on("refresh")
def on_refresh():
"""Manual refresh request."""
creds = get_session(request.sid) or {}
payload = fetch_upstream(creds)
payload["source"] = "manual"
emit("data_update", payload)
# -----------------------------------------------------------------------------
# Start background poller at import time (gunicorn-friendly)
# -----------------------------------------------------------------------------
_poller_started = False
_poller_lock = threading.Lock()
def _ensure_poller():
global _poller_started
with _poller_lock:
if not _poller_started:
socketio.start_background_task(background_poller)
_poller_started = True
_ensure_poller()
# -----------------------------------------------------------------------------
# Entry point (only used for local `python app.py`)
# -----------------------------------------------------------------------------
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
socketio.run(app, host="0.0.0.0", port=port, debug=False)