Spaces:
Running
Running
File size: 3,343 Bytes
24a9e77 bbf5d46 b7adbf8 b9f1c36 24a9e77 94158f8 24a9e77 6379af5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | import os
import json
import sys
from typing import Dict
# --- Firebase Imports ---
try:
import firebase_admin
from firebase_admin import credentials, firestore, auth
FIREBASE_AVAILABLE = True
except ImportError:
firebase_admin = None
credentials = None
firestore = None
auth = None
FIREBASE_AVAILABLE = False
print("❌ Firebase libraries not available - This version requires Firebase for Hugging Face")
# --- Constants ---
STABLECOINS = {
'USDT', 'USDC', 'BUSD', 'DAI', 'BSC-USD', 'USD1', 'CBBTC', 'WBNB', 'WETH',
'RLUST','USDE','PYUSD','WBTC','USDT0','SBUSDT', 'TUSD', 'USDP', 'USDON', 'USDD',
'FRAX', 'JUPUSD', 'MSUSD', 'GUSD', 'LUSD', 'USDC.E', 'BVUSDC', 'WAETHUSDC',
'WAETHUSDT','CRVUSD','VBUSDC', 'MSETH', 'FXUSD', 'USDCV', 'REUR', 'RUSD', 'FDUSD',
'USDH'
}
FIREBASE_WEB_API_KEY = os.environ.get("FIREBASE_API_KEY")
# --- Database Initialization ---
db = None # Global DB object
def init_firebase():
"""Initialize Firebase connection using environment variables."""
global db
if not FIREBASE_AVAILABLE:
raise ImportError("Firebase libraries not installed.")
firebase_config_str = os.environ.get("FIREBASE_CONFIG")
if not firebase_config_str:
# In development, you might want to skip this or warn
print("⚠️ FIREBASE_CONFIG not set")
return None
try:
if not firebase_admin._apps:
cred = credentials.Certificate(json.loads(firebase_config_str))
firebase_admin.initialize_app(cred)
db = firestore.client()
print("✅ Firebase Connected Successfully")
return db
except Exception as e:
raise Exception(f"Firebase initialization failed: {e}")
# --- User Management Helpers ---
def get_user_keys(uid) -> Dict:
if not db: return {}
try:
doc = db.collection('users').document(uid).get()
if doc.exists:
return doc.to_dict()
except Exception as e:
print(f"Firestore Error: {e}")
return {}
def update_user_keys(uid, data):
if not db: return False
try:
db.collection('users').document(uid).set(data, merge=True)
return True
except Exception:
return False
def is_user_setup_complete(uid):
keys = get_user_keys(uid)
required = ["CMC_API_KEY", "COINGECKO_API_KEY", "LIVECOINWATCH_API_KEY", "COINALYZE_VTMR_URL"]
for k in required:
if k not in keys or not keys[k] or "CONFIG_" in str(keys[k]):
return False
return True
# Admin dashboard stats
def increment_global_stat(field: str):
"""Atomically increments a global statistic in Firestore."""
if not db: return
try:
# 'stats' collection, 'global' document
ref = db.collection('stats').document('global')
# Use merge=True to create the document if it doesn't exist
ref.set({field: firestore.Increment(1)}, merge=True)
except Exception as e:
print(f"⚠️ Stats Increment Error: {e}")
def get_global_stats() -> Dict:
"""Fetches global statistics from Firestore."""
if not db: return {}
try:
doc = db.collection('stats').document('global').get()
return doc.to_dict() if doc.exists else {}
except Exception as e:
print(f"⚠️ Stats Fetch Error: {e}")
return {} |