Spaces:
Running
Running
File size: 6,144 Bytes
d74863e 6405808 d74863e c45db6e d74863e fbbd126 d74863e c29587d d74863e 6405808 d74863e b17b103 d74863e fbbd126 58c1f8e fbbd126 d580433 b17b103 d580433 fbbd126 4539280 2bfdff3 b17b103 2bfdff3 b17b103 2bfdff3 4539280 4577288 fbbd126 b17b103 fbbd126 b17b103 fbbd126 b17b103 fbbd126 b17b103 fbbd126 b17b103 fbbd126 b17b103 fbbd126 b17b103 fbbd126 58c1f8e fbbd126 b17b103 fbbd126 b17b103 fbbd126 b17b103 fbbd126 b17b103 fbbd126 b17b103 fbbd126 b17b103 fbbd126 d74863e 58c1f8e d74863e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | """
FastAPI dependencies for authentication and authorization.
"""
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlmodel import select
from src.db.firebase import get_firebase_db
from src.db.models import User
from src.auth.security import decode_access_token
from src.utils.logger import setup_logger
logger = setup_logger(__name__)
# OAuth2 scheme for extracting bearer tokens from Authorization header
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
from firebase_admin import auth as firebase_auth
from src.db.firebase import verify_token
import inspect
import time
async def get_current_user(
token: str = Depends(oauth2_scheme)
) -> User:
"""
Get the currently authenticated user with support for Firebase and custom JWT.
"""
db = get_firebase_db()
firebase_error = None
# Log token prefix for debugging
token_prefix = token[:10] if token else "None"
logger.info(f"Authenticating token starting with: {token_prefix}...")
# 1. Try Firebase Verification
try:
firebase_result = verify_token(token)
# Check if verify_token returned a dict, otherwise it's an internal error
if not isinstance(firebase_result, dict):
firebase_error = f"Internal Error [CP0]: verify_token returned {type(firebase_result)} (Expected dict)"
firebase_payload = None
else:
firebase_payload = firebase_result.get("payload")
firebase_error = firebase_result.get("error")
if firebase_payload is not None:
uid = firebase_payload.get("uid")
email = firebase_payload.get("email")
if not uid:
logger.error("Firebase payload missing 'uid' [CP1]")
raise HTTPException(status_code=401, detail="Invalid Firebase token payload [CP1]")
if db is None:
logger.warning(f"Firestore not available, returning transient user for {email} [CP2]")
return User(id=uid, email=email or "unknown@example.com", username=firebase_payload.get("name", uid), role="user")
# Retrieve from Firestore
user_doc = db.collection("users").document(uid).get()
if user_doc.exists:
user_data = user_doc.to_dict()
user_data["id"] = user_doc.id
# Ensure compatibility fields are set for User model instantiation
user_data.setdefault("email", email or "unknown@example.com")
user_data.setdefault("username", firebase_payload.get("name", uid))
user_data.setdefault("role", user_data.get("role", "user"))
# Pydantic model instantiation (Resilient to missing password_hash due to model defaults)
return User(**user_data)
else:
logger.info(f"New Firebase user detected: {email or uid} [CP3]")
return User(
id=uid,
email=email or "unknown@example.com",
username=firebase_payload.get("name", uid),
# password_hash is optional in User model, or can be set to ""
role="user"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in Firebase auth path: {repr(e)} [CP4]")
firebase_error = str(e) or repr(e)
# 2. Fallback to Custom JWT Decoding
try:
payload = decode_access_token(token)
if payload:
username: Optional[str] = payload.get("sub")
if not username:
raise HTTPException(status_code=401, detail="Token missing subject claim [CP5]")
if db is None:
# For custom JWT, if DB is not available, we can't verify user existence
# This path should ideally not be hit if Firebase is the primary auth
logger.warning(f"Firestore not available, returning mock user for {username} [CP5a]")
return User(id="mock_id", email="mock@example.com", username=username, role="user")
users_ref = db.collection("users")
query = users_ref.where("username", "==", username).limit(1).stream()
user_doc = next(query, None)
if user_doc:
user_data = user_doc.to_dict()
user_data["id"] = user_doc.id
return User(**user_data)
else:
logger.error(f"User {username} not found in database [CP6]")
raise HTTPException(status_code=401, detail="User account not found [CP6]")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in custom JWT auth path: {repr(e)} [CP7]")
# If both failed, then it's a 401
error_detail = f"Authentication failed [CP8]: {firebase_error if firebase_error else 'Invalid credentials'}"
logger.error(error_detail)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=error_detail,
headers={"WWW-Authenticate": "Bearer"},
)
async def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
"""
Get the current active user (for future soft-delete support).
Currently returns the user as-is, but can be extended to check
for account status, email verification, banned users, etc.
Args:
current_user: User from get_current_user dependency
Returns:
User object if user is active
Raises:
HTTPException: 400 Bad Request if user is inactive
Usage:
@app.get("/protected")
async def protected_route(user: User = Depends(get_current_active_user)):
return {"message": f"Hello active user {user.username}"}
"""
# Future: Check if user.is_active, user.is_verified, etc.
# if not current_user.is_active:
# raise HTTPException(status_code=400, detail="Inactive user")
return current_user
|