vietqa-api / src /utils /firebase.py
quanho114
Update delete_user_permanently to include Firestore deletion
a78ed19
import firebase_admin
from firebase_admin import credentials, auth
from src.config import settings
_firebase_app = None
def get_firebase_app():
"""Initialize or return existing Firebase Admin app."""
global _firebase_app
if _firebase_app:
return _firebase_app
if _firebase_app:
return _firebase_app
try:
# Priority 1: Check environment variable (for Cloud Deployment)
import os
import json
env_creds = os.getenv("FIREBASE_CREDENTIALS_JSON")
if env_creds:
try:
cred_dict = json.loads(env_creds)
cred = credentials.Certificate(cred_dict)
_firebase_app = firebase_admin.initialize_app(cred)
print("[Firebase] Admin SDK initialized from environment variable")
return _firebase_app
except Exception as e:
print(f"[Firebase] Failed to parse FIREBASE_CREDENTIALS_JSON: {e}")
# Priority 2: Check file path
cred_path = settings.firebase_service_account_path
if not cred_path:
print("[Firebase] No service account path configured.")
return None
if not os.path.exists(cred_path):
print(f"[Firebase] Service account file not found at {cred_path}")
return None
cred = credentials.Certificate(cred_path)
_firebase_app = firebase_admin.initialize_app(cred)
print(f"[Firebase] Admin SDK initialized with {cred_path}")
return _firebase_app
except Exception as e:
print(f"[Firebase] Failed to initialize Admin SDK: {e}")
return None
def delete_user_permanently(uid: str) -> bool:
"""
Permanently delete a user from Firebase Authentication.
"""
app = get_firebase_app()
if not app:
raise Exception("Firebase Admin SDK not initialized (missing serviceAccountKey.json?)")
try:
# Delete from Auth
auth.delete_user(uid)
print(f"[Firebase] Deleted user {uid} from Auth")
# Delete from Firestore
from firebase_admin import firestore
db = firestore.client()
db.collection('users').document(uid).delete()
print(f"[Firebase] Deleted user {uid} from Firestore")
return True
except Exception as e:
print(f"[Firebase] Error deleting user {uid}: {e}")
raise e
def verify_id_token(id_token: str):
"""Verify ID token and return decoded token."""
app = get_firebase_app()
if not app:
raise Exception("Firebase Admin SDK not initialized")
return auth.verify_id_token(id_token)