Spaces:
Sleeping
Sleeping
Update main.py
Browse files
main.py
CHANGED
|
@@ -186,14 +186,12 @@ def verify_token(req):
|
|
| 186 |
"""
|
| 187 |
Verify Firebase ID token and check if user is admin.
|
| 188 |
Automatically creates admin entry in DB if email is in ADMIN_EMAILS but UID is not found.
|
| 189 |
-
|
| 190 |
Returns decoded token dict if valid admin, None otherwise.
|
| 191 |
"""
|
| 192 |
-
logger = logging.getLogger('guards_api') # Assuming logger is configured as in your full script
|
| 193 |
-
|
| 194 |
auth_header = req.headers.get("Authorization")
|
| 195 |
if not auth_header:
|
| 196 |
-
|
| 197 |
return None
|
| 198 |
|
| 199 |
try:
|
|
@@ -203,86 +201,84 @@ def verify_token(req):
|
|
| 203 |
else:
|
| 204 |
token = auth_header.strip()
|
| 205 |
|
| 206 |
-
# Verify the token
|
| 207 |
decoded = firebase_auth.verify_id_token(token)
|
| 208 |
-
|
| 209 |
|
| 210 |
# Get user UID from decoded token
|
| 211 |
-
uid = decoded.get('uid') or decoded.get('user_id') # Fallback
|
| 212 |
-
|
| 213 |
|
| 214 |
if not uid:
|
| 215 |
-
|
| 216 |
return None
|
| 217 |
|
| 218 |
-
#
|
| 219 |
-
|
| 220 |
-
|
| 221 |
-
|
| 222 |
-
# If email not directly available, try to get it from firebase identities
|
| 223 |
-
if not email and 'firebase' in decoded and 'identities' in decoded['firebase']:
|
| 224 |
-
identities = decoded['firebase']['identities']
|
| 225 |
-
logger.debug(f"Firebase identities found: {identities}")
|
| 226 |
-
# Common provider emails (Google, etc.)
|
| 227 |
-
google_emails = identities.get('google.com', [])
|
| 228 |
-
if google_emails and isinstance(google_emails, list):
|
| 229 |
-
email = google_emails[0] # Take the first one if multiple
|
| 230 |
-
logger.debug(f"Email extracted from google.com identity: {email}")
|
| 231 |
-
# Add checks for other providers if needed (e.g., 'email', etc.)
|
| 232 |
-
# Fallback: check if there's a generic 'email' list
|
| 233 |
-
if not email:
|
| 234 |
-
generic_emails = identities.get('email', [])
|
| 235 |
-
if generic_emails and isinstance(generic_emails, list):
|
| 236 |
-
email = generic_emails[0]
|
| 237 |
-
logger.debug(f"Email extracted from generic 'email' identity: {email}")
|
| 238 |
-
|
| 239 |
-
logger.debug(f"Final extracted email: {email}")
|
| 240 |
-
|
| 241 |
-
# Check if user is admin by querying Firebase Realtime Database using UID
|
| 242 |
admins_ref = db.reference("/admins")
|
| 243 |
-
|
| 244 |
-
|
| 245 |
|
| 246 |
-
if
|
| 247 |
# User is already an admin in the database
|
| 248 |
-
|
|
|
|
| 249 |
return decoded
|
| 250 |
|
| 251 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 252 |
# User's email is in the approved list, but UID not found in DB.
|
| 253 |
# This is likely their first time accessing the API.
|
| 254 |
-
# Automatically create their admin entry using their UID.
|
| 255 |
-
|
| 256 |
try:
|
| 257 |
admins_ref.child(uid).set({
|
| 258 |
"email": email,
|
| 259 |
"is_admin": True
|
| 260 |
-
# Removed first_seen to keep it simple like original, add back if needed
|
| 261 |
})
|
| 262 |
-
|
| 263 |
-
#
|
| 264 |
return decoded
|
| 265 |
except Exception as db_error:
|
| 266 |
-
|
| 267 |
-
#
|
| 268 |
return None
|
| 269 |
-
|
| 270 |
else:
|
| 271 |
-
# User is not in the approved
|
| 272 |
-
|
| 273 |
return None
|
| 274 |
|
|
|
|
|
|
|
| 275 |
except firebase_auth.InvalidIdTokenError as e:
|
| 276 |
-
|
| 277 |
return None
|
| 278 |
except firebase_auth.ExpiredIdTokenError as e:
|
| 279 |
-
|
| 280 |
return None
|
| 281 |
except firebase_auth.RevokedIdTokenError as e:
|
| 282 |
-
|
| 283 |
return None
|
| 284 |
except Exception as e:
|
| 285 |
-
|
|
|
|
|
|
|
|
|
|
| 286 |
return None
|
| 287 |
|
| 288 |
# === Admin Setup (Legacy - kept for compatibility) ===
|
|
|
|
| 186 |
"""
|
| 187 |
Verify Firebase ID token and check if user is admin.
|
| 188 |
Automatically creates admin entry in DB if email is in ADMIN_EMAILS but UID is not found.
|
| 189 |
+
Uses Firebase Admin SDK to get user email if not directly available in token or DB.
|
| 190 |
Returns decoded token dict if valid admin, None otherwise.
|
| 191 |
"""
|
|
|
|
|
|
|
| 192 |
auth_header = req.headers.get("Authorization")
|
| 193 |
if not auth_header:
|
| 194 |
+
print("Authorization header missing.")
|
| 195 |
return None
|
| 196 |
|
| 197 |
try:
|
|
|
|
| 201 |
else:
|
| 202 |
token = auth_header.strip()
|
| 203 |
|
| 204 |
+
# Verify the token to get the UID
|
| 205 |
decoded = firebase_auth.verify_id_token(token)
|
| 206 |
+
# print(f"DEBUG: Token verified successfully. Decoded token keys: {list(decoded.keys())}") # Optional debug
|
| 207 |
|
| 208 |
# Get user UID from decoded token
|
| 209 |
+
uid = decoded.get('uid') or decoded.get('user_id') # Fallback for robustness
|
| 210 |
+
# print(f"DEBUG: Extracted UID: {uid}") # Optional debug
|
| 211 |
|
| 212 |
if not uid:
|
| 213 |
+
print("Verified token does not contain a UID or user_id.")
|
| 214 |
return None
|
| 215 |
|
| 216 |
+
# --- Core Logic Change Starts Here ---
|
| 217 |
+
|
| 218 |
+
# 1. Check if user is admin by querying Firebase Realtime Database using UID first
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 219 |
admins_ref = db.reference("/admins")
|
| 220 |
+
admin_data_db = admins_ref.child(uid).get()
|
| 221 |
+
# print(f"DEBUG: Admin data retrieved for UID {uid}: {admin_data_db}") # Optional debug
|
| 222 |
|
| 223 |
+
if admin_data_db and admin_data_db.get("is_admin", False):
|
| 224 |
# User is already an admin in the database
|
| 225 |
+
email_for_logging = admin_data_db.get("email", "Unknown Email")
|
| 226 |
+
print(f"User {uid} ({email_for_logging}) is authorized as admin (found in DB).")
|
| 227 |
return decoded
|
| 228 |
|
| 229 |
+
# 2. If not found in DB, get the user's email using the Firebase Admin SDK
|
| 230 |
+
email = None
|
| 231 |
+
try:
|
| 232 |
+
user_record = firebase_auth.get_user(uid)
|
| 233 |
+
email = user_record.email
|
| 234 |
+
# print(f"DEBUG: Email retrieved via Firebase Admin SDK for UID {uid}: {email}") # Optional debug
|
| 235 |
+
except firebase_auth.UserNotFoundError:
|
| 236 |
+
print(f"User with UID {uid} not found in Firebase Authentication.")
|
| 237 |
+
return None
|
| 238 |
+
except Exception as e:
|
| 239 |
+
print(f"Error fetching user details for UID {uid} from Firebase Auth: {e}")
|
| 240 |
+
# Cannot reliably verify email, deny access
|
| 241 |
+
return None
|
| 242 |
+
|
| 243 |
+
# 3. Check if the retrieved email is in the approved ADMIN_EMAILS list
|
| 244 |
+
if email and email in ADMIN_EMAILS:
|
| 245 |
# User's email is in the approved list, but UID not found in DB.
|
| 246 |
# This is likely their first time accessing the API.
|
| 247 |
+
# Automatically create their admin entry in the Realtime Database using their UID.
|
| 248 |
+
print(f"First time admin access for {email} (UID: {uid}). Creating database entry.")
|
| 249 |
try:
|
| 250 |
admins_ref.child(uid).set({
|
| 251 |
"email": email,
|
| 252 |
"is_admin": True
|
|
|
|
| 253 |
})
|
| 254 |
+
print(f"Admin entry successfully created for UID {uid}.")
|
| 255 |
+
# User is now an admin, grant access
|
| 256 |
return decoded
|
| 257 |
except Exception as db_error:
|
| 258 |
+
print(f"Failed to create admin entry for UID {uid} in database: {db_error}")
|
| 259 |
+
# Failed to record admin status, deny access for security
|
| 260 |
return None
|
|
|
|
| 261 |
else:
|
| 262 |
+
# User's email is not in the approved list or email could not be retrieved/verified
|
| 263 |
+
print(f"Access denied for UID {uid}. Email '{email}' not in approved list or not retrievable.")
|
| 264 |
return None
|
| 265 |
|
| 266 |
+
# --- Core Logic Change Ends Here ---
|
| 267 |
+
|
| 268 |
except firebase_auth.InvalidIdTokenError as e:
|
| 269 |
+
print(f"Invalid Firebase ID token provided: {e}")
|
| 270 |
return None
|
| 271 |
except firebase_auth.ExpiredIdTokenError as e:
|
| 272 |
+
print(f"Firebase ID token has expired: {e}")
|
| 273 |
return None
|
| 274 |
except firebase_auth.RevokedIdTokenError as e:
|
| 275 |
+
print(f"Firebase ID token has been revoked: {e}")
|
| 276 |
return None
|
| 277 |
except Exception as e:
|
| 278 |
+
print(f"Unexpected error during token verification: {e}")
|
| 279 |
+
# Consider logging the full traceback in production
|
| 280 |
+
# import traceback
|
| 281 |
+
# print(traceback.format_exc())
|
| 282 |
return None
|
| 283 |
|
| 284 |
# === Admin Setup (Legacy - kept for compatibility) ===
|