Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import requests | |
| import uvicorn | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import RedirectResponse | |
| from datetime import datetime, timedelta, timezone | |
| # --- CONFIGURATION --- | |
| ROBLOX_CLIENT_ID = os.environ.get("ROBLOX_CLIENT_ID") | |
| ROBLOX_CLIENT_SECRET = os.environ.get("ROBLOX_CLIENT_SECRET") | |
| REDIRECT_URI = os.environ.get("REDIRECT_URI", "http://localhost:8000/auth/callback") | |
| FIREBASE_JSON = os.environ.get("FIREBASE_SERVICE_ACCOUNT_JSON") | |
| app = FastAPI(title="Bloxbuddy Tokens Server") | |
| # --- FIREBASE INIT --- | |
| try: | |
| if not FIREBASE_JSON: raise ValueError("Missing FIREBASE_SERVICE_ACCOUNT_JSON") | |
| cred = credentials.Certificate(json.loads(FIREBASE_JSON)) | |
| firebase_admin.initialize_app(cred) | |
| db = firestore.client() | |
| print("✅ Connected to Firestore") | |
| except Exception as e: | |
| print(f"❌ Firebase Init Failed: {e}") | |
| db = None | |
| # --- ROUTES --- | |
| def health(): | |
| return {"status": "Running"} | |
| def login_to_roblox(): | |
| scope = "openid profile asset:write asset:read" | |
| url = (f"https://apis.roblox.com/oauth/v1/authorize?client_id={ROBLOX_CLIENT_ID}" | |
| f"&redirect_uri={REDIRECT_URI}&scope={scope}&response_type=code") | |
| return RedirectResponse(url) | |
| def roblox_callback(code: str, error: str = None): | |
| if error: return {"error": error} | |
| # 1. Exchange Code | |
| try: | |
| res = requests.post("https://apis.roblox.com/oauth/v1/token", data={ | |
| "client_id": ROBLOX_CLIENT_ID, "client_secret": ROBLOX_CLIENT_SECRET, | |
| "grant_type": "authorization_code", "code": code, "redirect_uri": REDIRECT_URI | |
| }) | |
| res.raise_for_status() | |
| token_data = res.json() | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Token Exchange Failed: {str(e)}") | |
| # 2. Get User ID | |
| try: | |
| user_res = requests.get("https://apis.roblox.com/oauth/v1/userinfo", | |
| headers={"Authorization": f"Bearer {token_data['access_token']}"}) | |
| user_info = user_res.json() | |
| roblox_user_id = str(user_info.get("sub")) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"User Info Failed: {str(e)}") | |
| # 3. Calculate Expiration | |
| expires_in_seconds = token_data.get("expires_in", 900) | |
| expiration_time = datetime.now(timezone.utc) + timedelta(seconds=expires_in_seconds) | |
| # 4. Store in Firestore | |
| if db: | |
| db.collection("user_tokens").document(roblox_user_id).set({ | |
| "roblox_user_id": roblox_user_id, | |
| "roblox_username": user_info.get("preferred_username"), | |
| "access_token": token_data["access_token"], | |
| "refresh_token": token_data["refresh_token"], | |
| "expires_at": expiration_time, | |
| "updated_at": firestore.SERVER_TIMESTAMP | |
| }, merge=True) | |
| return {"status": "Connected", "user": user_info.get("preferred_username"), "uid": roblox_user_id} | |
| def get_or_refresh_token(roblox_user_id: str, force_refresh: bool = False): | |
| """ | |
| Smart Endpoint: Returns cached token if valid. | |
| If force_refresh=True, ignores cache and forces a Roblox API refresh. | |
| """ | |
| if not db: raise HTTPException(status_code=503, detail="DB Unavailable") | |
| doc_ref = db.collection("user_tokens").document(roblox_user_id) | |
| doc = doc_ref.get() | |
| if not doc.exists: | |
| raise HTTPException(status_code=404, detail="User not connected") | |
| data = doc.to_dict() | |
| # --- CACHE CHECK (Skip if forcing refresh) --- | |
| if not force_refresh: | |
| stored_expiry = data.get("expires_at") | |
| if stored_expiry: | |
| # Check validity with 60s buffer | |
| now_utc = datetime.now(timezone.utc) | |
| if now_utc < (stored_expiry - timedelta(seconds=60)): | |
| return {"access_token": data["access_token"]} | |
| # --- REFRESH LOGIC --- | |
| print(f"🔄 Refreshing token for {roblox_user_id} (Force: {force_refresh})...") | |
| refresh_token = data.get("refresh_token") | |
| if not refresh_token: | |
| raise HTTPException(status_code=400, detail="No refresh token found. Re-login required.") | |
| res = requests.post("https://apis.roblox.com/oauth/v1/token", data={ | |
| "client_id": ROBLOX_CLIENT_ID, | |
| "client_secret": ROBLOX_CLIENT_SECRET, | |
| "grant_type": "refresh_token", | |
| "refresh_token": refresh_token | |
| }) | |
| if res.status_code != 200: | |
| print(f"❌ Refresh failed: {res.text}") | |
| raise HTTPException(status_code=401, detail=f"Refresh failed: {res.text}") | |
| new_tokens = res.json() | |
| new_expires_in = new_tokens.get("expires_in", 900) | |
| new_expiry = datetime.now(timezone.utc) + timedelta(seconds=new_expires_in) | |
| # Update DB | |
| doc_ref.update({ | |
| "access_token": new_tokens["access_token"], | |
| "refresh_token": new_tokens.get("refresh_token", refresh_token), | |
| "expires_at": new_expiry, | |
| "updated_at": firestore.SERVER_TIMESTAMP | |
| }) | |
| return {"access_token": new_tokens["access_token"]} | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) |