db2 / app.py
Dooratre's picture
Update app.py
ec9959a verified
"""
Main DB Server - Central Registry for all ServerClass instances.
Runs at: https://dooratre-db.hf.space
Responsibilities:
- Store user→server mapping
- Device fingerprint (token) β†’ user lookup
- Server load balancing (assign new users to least loaded server)
- GitHub backup of registry data
"""
from flask import Flask, request, jsonify
from datetime import datetime
from functools import wraps
from config import (
SECRET_KEY, PORT, ADMIN_SECRET, API_SECRET,
SERVER_URL_PATTERN, TOTAL_SERVERS, MAX_USERS_PER_SERVER,
ALLOWED_ORIGINS, WEBSITE_URL, LOGIN_WEBSITE_URL
)
from memory import get_db
app = Flask(__name__)
app.secret_key = SECRET_KEY
# ═══════════════════════════════════════════════════════════════
# CORS Middleware
# ═══════════════════════════════════════════════════════════════
@app.after_request
def add_cors_headers(response):
origin = request.headers.get('Origin', '')
if origin in ALLOWED_ORIGINS or not origin:
response.headers['Access-Control-Allow-Origin'] = origin if origin else '*'
else:
# Allow all serverclass origins dynamically
for i in range(1, TOTAL_SERVERS + 1):
if f"serverclass{i}" in origin:
response.headers['Access-Control-Allow-Origin'] = origin
break
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, X-API-Key, X-Admin-Secret'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'
response.headers['Access-Control-Allow-Credentials'] = 'true'
return response
@app.before_request
def handle_preflight():
if request.method == 'OPTIONS':
response = jsonify({"ok": True})
return response
# ═══════════════════════════════════════════════════════════════
# Auth Helpers
# ═══════════════════════════════════════════════════════════════
def require_api_key(f):
"""Require X-API-Key header matching API_SECRET."""
@wraps(f)
def decorated(*args, **kwargs):
api_key = request.headers.get('X-API-Key', '')
if api_key != API_SECRET:
return jsonify({"ok": False, "error": "Unauthorized"}), 401
return f(*args, **kwargs)
return decorated
def require_admin(f):
"""Require X-Admin-Secret header."""
@wraps(f)
def decorated(*args, **kwargs):
secret = request.headers.get('X-Admin-Secret', '') or request.args.get('secret', '')
if secret != ADMIN_SECRET:
return jsonify({"ok": False, "error": "Unauthorized"}), 401
return f(*args, **kwargs)
return decorated
def _server_url(server_num):
"""Convert server number to full URL."""
return SERVER_URL_PATTERN.replace("{N}", str(server_num))
# ═══════════════════════════════════════════════════════════════
# API: Check Token (Auto-login)
# Website calls this on page load with device fingerprint
# ═══════════════════════════════════════════════════════════════
@app.route('/api/check-token', methods=['POST'])
@require_api_key
def check_token():
"""
Check if a device fingerprint token is linked to a user.
Request: {"token": "device_fingerprint_string"}
Response if found:
{"ok": true, "found": true, "username": "...", "server_num": N, "server_url": "..."}
Response if not found:
{"ok": true, "found": false}
"""
data = request.get_json()
if not data:
return jsonify({"ok": False, "error": "No JSON data"}), 400
token = (data.get('token', '') or '').strip()
if not token:
return jsonify({"ok": False, "error": "No token provided"}), 400
db = get_db()
user = db.get_user_by_token(token)
if user:
server_num = user.get('server_num')
return jsonify({
"ok": True,
"found": True,
"username": user.get('username'),
"server_num": server_num,
"server_url": _server_url(server_num),
})
else:
return jsonify({
"ok": True,
"found": False,
})
# ═══════════════════════════════════════════════════════════════
# API: Lookup User (Login flow - find which server)
# Website calls this when user types username in login
# ═══════════════════════════════════════════════════════════════
@app.route('/api/lookup-user', methods=['POST'])
@require_api_key
def lookup_user():
"""
Look up which server a username is on.
Request: {"username": "..."}
Response if found:
{"ok": true, "found": true, "server_num": N, "server_url": "..."}
Response if not found:
{"ok": true, "found": false}
"""
data = request.get_json()
if not data:
return jsonify({"ok": False, "error": "No JSON data"}), 400
username = (data.get('username', '') or '').strip()
if not username:
return jsonify({"ok": False, "error": "No username provided"}), 400
db = get_db()
user = db.get_user(username)
if user:
server_num = user.get('server_num')
return jsonify({
"ok": True,
"found": True,
"username": username,
"server_num": server_num,
"server_url": _server_url(server_num),
})
else:
return jsonify({
"ok": True,
"found": False,
})
# ═══════════════════════════════════════════════════════════════
# API: Get Best Server (Signup flow - find server with space)
# Website calls this before redirecting to signup
# ═══════════════════════════════════════════════════════════════
@app.route('/api/get-best-server', methods=['POST'])
@require_api_key
def get_best_server():
"""
Find the server with the lowest user count for new signup.
Request: {} (empty or with token)
Response:
{"ok": true, "server_num": N, "server_url": "..."}
"""
db = get_db()
best = db.get_best_server()
if best is None:
return jsonify({
"ok": False,
"error": "All servers are full",
}), 503
return jsonify({
"ok": True,
"server_num": best,
"server_url": _server_url(best),
})
# ═══════════════════════════════════════════════════════════════
# API: Register User (Server calls this after successful signup)
# The individual server calls this to register user in central DB
# ═══════════════════════════════════════════════════════════════
@app.route('/api/register-user', methods=['POST'])
@require_api_key
def register_user():
"""
Register a new user in the central registry.
Called by the server after successful signup+verification.
Request: {
"username": "...",
"telegram_id": "...",
"server_num": N,
"token": "device_fingerprint" (optional)
}
Response:
{"ok": true, "message": "User registered"}
"""
data = request.get_json()
if not data:
return jsonify({"ok": False, "error": "No JSON data"}), 400
username = (data.get('username', '') or '').strip()
telegram_id = (data.get('telegram_id', '') or '').strip()
server_num = data.get('server_num')
token = (data.get('token', '') or '').strip() or None
if not username:
return jsonify({"ok": False, "error": "No username"}), 400
if not server_num:
return jsonify({"ok": False, "error": "No server_num"}), 400
try:
server_num = int(server_num)
except (ValueError, TypeError):
return jsonify({"ok": False, "error": "Invalid server_num"}), 400
db = get_db()
success, error = db.register_user(username, telegram_id, server_num, token)
if not success:
return jsonify({"ok": False, "error": error}), 409
return jsonify({
"ok": True,
"message": f"User '{username}' registered on server {server_num}",
})
# ═══════════════════════════════════════════════════════════════
# API: Link Token (After login success, link device to user)
# Server or website calls this after successful login
# ═══════════════════════════════════════════════════════════════
@app.route('/api/link-token', methods=['POST'])
@require_api_key
def link_token():
"""
Link a device fingerprint token to a user.
Called after successful login.
Request: {"username": "...", "token": "device_fingerprint"}
Response:
{"ok": true}
"""
data = request.get_json()
if not data:
return jsonify({"ok": False, "error": "No JSON data"}), 400
username = (data.get('username', '') or '').strip()
token = (data.get('token', '') or '').strip()
if not username:
return jsonify({"ok": False, "error": "No username"}), 400
if not token:
return jsonify({"ok": False, "error": "No token"}), 400
db = get_db()
success, error = db.link_token(username, token)
if not success:
return jsonify({"ok": False, "error": error}), 404
return jsonify({"ok": True})
@app.route('/api/unlink-token', methods=['POST'])
@require_api_key
def unlink_token():
"""Remove a specific token from the registry."""
data = request.get_json()
if not data:
return jsonify({"ok": False, "error": "No JSON data"}), 400
token = (data.get('token', '') or '').strip()
if not token:
return jsonify({"ok": False, "error": "No token"}), 400
db = get_db()
db.unlink_token(token)
return jsonify({"ok": True})
@app.route('/api/unlink-user-tokens', methods=['POST'])
@require_api_key
def unlink_user_tokens():
"""Remove ALL tokens for a user. Returns count of tokens removed."""
data = request.get_json()
if not data:
return jsonify({"ok": False, "error": "No JSON data"}), 400
username = (data.get('username', '') or '').strip()
if not username:
return jsonify({"ok": False, "error": "No username"}), 400
db = get_db()
removed_count = 0
with db._locks['users_registry']:
user = db._data['users_registry'].get(username)
if user:
old_tokens = user.get('tokens', [])
for t in old_tokens:
token_val = t.get('token', '')
if token_val:
with db._locks['tokens_index']:
if token_val in db._data['tokens_index']:
del db._data['tokens_index'][token_val]
removed_count += 1
user['tokens'] = []
else:
# User not in registry - but token might still be in index!
# Search tokens_index for any token pointing to this username
with db._locks['tokens_index']:
tokens_to_remove = [
tok for tok, uname in db._data['tokens_index'].items()
if uname == username
]
for tok in tokens_to_remove:
del db._data['tokens_index'][tok]
removed_count += 1
db._save_to_shared()
print(f" βœ… Cleared {removed_count} tokens for '{username}'")
return jsonify({
"ok": True,
"removed": removed_count,
"username": username,
})
# ═══════════════════════════════════════════════════════════════
# API: Login Success Notification
# Server calls this after user successfully logs in
# ═══════════════════════════════════════════════════════════════
@app.route('/api/login-success', methods=['POST'])
@require_api_key
def login_success():
"""
Server notifies Main DB that a user logged in successfully.
Updates last_login and optionally links a new token.
Request: {
"username": "...",
"token": "device_fingerprint" (optional)
}
"""
data = request.get_json()
if not data:
return jsonify({"ok": False, "error": "No JSON data"}), 400
username = (data.get('username', '') or '').strip()
token = (data.get('token', '') or '').strip() or None
if not username:
return jsonify({"ok": False, "error": "No username"}), 400
db = get_db()
db.update_user_login(username)
if token:
db.link_token(username, token)
return jsonify({"ok": True})
# ═══════════════════════════════════════════════════════════════
# API: Check Username Exists
# Website calls this to check if username is taken before signup
# ═══════════════════════════════════════════════════════════════
@app.route('/api/check-username', methods=['POST'])
@require_api_key
def check_username():
"""
Check if a username already exists in any server.
Request: {"username": "..."}
Response: {"ok": true, "exists": true/false, "server_num": N or null}
"""
data = request.get_json()
if not data:
return jsonify({"ok": False, "error": "No JSON data"}), 400
username = (data.get('username', '') or '').strip()
if not username:
return jsonify({"ok": False, "error": "No username"}), 400
db = get_db()
user = db.get_user(username)
if user:
return jsonify({
"ok": True,
"exists": True,
"server_num": user.get('server_num'),
})
else:
return jsonify({
"ok": True,
"exists": False,
"server_num": None,
})
# ═══════════════════════════════════════════════════════════════
# API: Server Status
# ═══════════════════════════════════════════════════════════════
@app.route('/api/server-status', methods=['GET'])
@require_api_key
def server_status():
"""Get all server user counts."""
db = get_db()
counts = db.get_server_counts()
servers = []
for i in range(1, TOTAL_SERVERS + 1):
num_str = str(i)
count = counts.get(num_str, 0)
servers.append({
"server_num": i,
"users": count,
"max": MAX_USERS_PER_SERVER,
"available": count < MAX_USERS_PER_SERVER,
"url": _server_url(i),
})
return jsonify({
"ok": True,
"servers": servers,
"total_users": db.get_total_users(),
"total_servers": TOTAL_SERVERS,
"max_per_server": MAX_USERS_PER_SERVER,
})
# ═══════════════════════════════════════════════════════════════
# ADMIN: Backup to GitHub
# ═══════════════════════════════════════════════════════════════
@app.route('/admin/backup', methods=['POST'])
@require_admin
def admin_backup():
"""Push all registry data to GitHub."""
db = get_db()
success, errors = db.push_to_github()
if success:
return jsonify({
"ok": True,
"message": "All data pushed to GitHub",
"stats": db.get_stats(),
})
else:
return jsonify({
"ok": False,
"errors": errors,
"stats": db.get_stats(),
}), 500
# ═══════════════════════════════════════════════════════════════
# ADMIN: Stats
# ═══════════════════════════════════════════════════════════════
@app.route('/admin/stats', methods=['GET'])
@require_admin
def admin_stats():
db = get_db()
return jsonify({
"ok": True,
"stats": db.get_stats(),
})
# ═══════════════════════════════════════════════════════════════
# ADMIN: Manual register (for testing)
# ═══════════════════════════════════════════════════════════════
@app.route('/admin/register', methods=['POST'])
@require_admin
def admin_register():
"""Manually register a user (for testing/migration)."""
data = request.get_json()
if not data:
return jsonify({"ok": False, "error": "No JSON data"}), 400
username = (data.get('username', '') or '').strip()
telegram_id = (data.get('telegram_id', '') or '').strip()
server_num = data.get('server_num')
if not username or not server_num:
return jsonify({"ok": False, "error": "username and server_num required"}), 400
db = get_db()
success, error = db.register_user(username, telegram_id, int(server_num))
if not success:
return jsonify({"ok": False, "error": error}), 409
return jsonify({"ok": True, "message": f"Registered {username} on server {server_num}"})
# ═══════════════════════════════════════════════════════════════
# ADMIN: List all users
# ═══════════════════════════════════════════════════════════════
@app.route('/admin/users', methods=['GET'])
@require_admin
def admin_users():
"""List all registered users."""
db = get_db()
with db._locks['users_registry']:
all_users = []
for uname, udata in db._data['users_registry'].items():
all_users.append({
"username": uname,
"server_num": udata.get('server_num'),
"telegram_id": udata.get('telegram_id', ''),
"tokens_count": len(udata.get('tokens', [])),
"created_at": udata.get('created_at', ''),
"last_login": udata.get('last_login', ''),
})
return jsonify({
"ok": True,
"users": all_users,
"total": len(all_users),
})
# ═══════════════════════════════════════════════════════════════
# ADMIN: Delete user (for testing)
# ═══════════════════════════════════════════════════════════════
@app.route('/admin/delete-user', methods=['POST'])
@require_admin
def admin_delete_user():
data = request.get_json()
if not data:
return jsonify({"ok": False, "error": "No JSON data"}), 400
username = (data.get('username', '') or '').strip()
if not username:
return jsonify({"ok": False, "error": "No username"}), 400
db = get_db()
with db._locks['users_registry']:
user = db._data['users_registry'].pop(username, None)
if not user:
return jsonify({"ok": False, "error": "User not found"}), 404
# Remove tokens from index
for t in user.get('tokens', []):
token_val = t.get('token', '')
if token_val:
with db._locks['tokens_index']:
db._data['tokens_index'].pop(token_val, None)
# Decrement server count
server_num = str(user.get('server_num', 0))
with db._locks['server_counts']:
if server_num in db._data['server_counts']:
db._data['server_counts'][server_num] = max(
0, db._data['server_counts'][server_num] - 1
)
return jsonify({"ok": True, "message": f"Deleted user '{username}'"})
# ═══════════════════════════════════════════════════════════════
# Health Check
# ═══════════════════════════════════════════════════════════════
@app.route('/health', methods=['GET'])
def health():
db = get_db()
return jsonify({
"status": "healthy",
"service": "main-db-registry",
"total_users": db.get_total_users(),
"total_servers": TOTAL_SERVERS,
"version": "1.0.0",
})
@app.route('/', methods=['GET'])
def index():
return jsonify({
"service": "ServerClass Main DB Registry",
"status": "running",
"docs": {
"POST /api/check-token": "Check device fingerprint",
"POST /api/lookup-user": "Find user's server",
"POST /api/get-best-server": "Get server for new signup",
"POST /api/register-user": "Register new user",
"POST /api/link-token": "Link device to user",
"POST /api/login-success": "Notify login success",
"POST /api/check-username": "Check if username exists",
"GET /api/server-status": "Server load info",
"GET /health": "Health check",
}
})
# ═══════════════════════════════════════════════════════════════
# MAIN
# ═══════════════════════════════════════════════════════════════
if __name__ == '__main__':
print("\n" + "═" * 60)
print(" πŸ—„οΈ ServerClass Main DB Registry")
print("═" * 60)
print(f" 🌐 http://localhost:{PORT}")
print("═" * 60 + "\n")
app.run(debug=True, host='0.0.0.0', port=PORT)