Spaces:
Running
Running
| """ | |
| FastAPI dependencies for authentication and authorization. | |
| """ | |
| from typing import Optional | |
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import OAuth2PasswordBearer | |
| from sqlmodel import select | |
| from src.db.firebase import get_firebase_db | |
| from src.db.models import User | |
| from src.auth.security import decode_access_token | |
| from src.utils.logger import setup_logger | |
| logger = setup_logger(__name__) | |
| # OAuth2 scheme for extracting bearer tokens from Authorization header | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") | |
| from firebase_admin import auth as firebase_auth | |
| from src.db.firebase import verify_token | |
| import inspect | |
| import time | |
| async def get_current_user( | |
| token: str = Depends(oauth2_scheme) | |
| ) -> User: | |
| """ | |
| Get the currently authenticated user with support for Firebase and custom JWT. | |
| """ | |
| db = get_firebase_db() | |
| firebase_error = None | |
| # Log token prefix for debugging | |
| token_prefix = token[:10] if token else "None" | |
| logger.info(f"Authenticating token starting with: {token_prefix}...") | |
| # 1. Try Firebase Verification | |
| try: | |
| firebase_result = verify_token(token) | |
| # Check if verify_token returned a dict, otherwise it's an internal error | |
| if not isinstance(firebase_result, dict): | |
| firebase_error = f"Internal Error [CP0]: verify_token returned {type(firebase_result)} (Expected dict)" | |
| firebase_payload = None | |
| else: | |
| firebase_payload = firebase_result.get("payload") | |
| firebase_error = firebase_result.get("error") | |
| if firebase_payload is not None: | |
| uid = firebase_payload.get("uid") | |
| email = firebase_payload.get("email") | |
| if not uid: | |
| logger.error("Firebase payload missing 'uid' [CP1]") | |
| raise HTTPException(status_code=401, detail="Invalid Firebase token payload [CP1]") | |
| if db is None: | |
| logger.warning(f"Firestore not available, returning transient user for {email} [CP2]") | |
| return User(id=uid, email=email or "unknown@example.com", username=firebase_payload.get("name", uid), role="user") | |
| # Retrieve from Firestore | |
| user_doc = db.collection("users").document(uid).get() | |
| if user_doc.exists: | |
| user_data = user_doc.to_dict() | |
| user_data["id"] = user_doc.id | |
| # Ensure compatibility fields are set for User model instantiation | |
| user_data.setdefault("email", email or "unknown@example.com") | |
| user_data.setdefault("username", firebase_payload.get("name", uid)) | |
| user_data.setdefault("role", user_data.get("role", "user")) | |
| # Pydantic model instantiation (Resilient to missing password_hash due to model defaults) | |
| return User(**user_data) | |
| else: | |
| logger.info(f"New Firebase user detected: {email or uid} [CP3]") | |
| return User( | |
| id=uid, | |
| email=email or "unknown@example.com", | |
| username=firebase_payload.get("name", uid), | |
| # password_hash is optional in User model, or can be set to "" | |
| role="user" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error in Firebase auth path: {repr(e)} [CP4]") | |
| firebase_error = str(e) or repr(e) | |
| # 2. Fallback to Custom JWT Decoding | |
| try: | |
| payload = decode_access_token(token) | |
| if payload: | |
| username: Optional[str] = payload.get("sub") | |
| if not username: | |
| raise HTTPException(status_code=401, detail="Token missing subject claim [CP5]") | |
| if db is None: | |
| # For custom JWT, if DB is not available, we can't verify user existence | |
| # This path should ideally not be hit if Firebase is the primary auth | |
| logger.warning(f"Firestore not available, returning mock user for {username} [CP5a]") | |
| return User(id="mock_id", email="mock@example.com", username=username, role="user") | |
| users_ref = db.collection("users") | |
| query = users_ref.where("username", "==", username).limit(1).stream() | |
| user_doc = next(query, None) | |
| if user_doc: | |
| user_data = user_doc.to_dict() | |
| user_data["id"] = user_doc.id | |
| return User(**user_data) | |
| else: | |
| logger.error(f"User {username} not found in database [CP6]") | |
| raise HTTPException(status_code=401, detail="User account not found [CP6]") | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error in custom JWT auth path: {repr(e)} [CP7]") | |
| # If both failed, then it's a 401 | |
| error_detail = f"Authentication failed [CP8]: {firebase_error if firebase_error else 'Invalid credentials'}" | |
| logger.error(error_detail) | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=error_detail, | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| async def get_current_active_user( | |
| current_user: User = Depends(get_current_user), | |
| ) -> User: | |
| """ | |
| Get the current active user (for future soft-delete support). | |
| Currently returns the user as-is, but can be extended to check | |
| for account status, email verification, banned users, etc. | |
| Args: | |
| current_user: User from get_current_user dependency | |
| Returns: | |
| User object if user is active | |
| Raises: | |
| HTTPException: 400 Bad Request if user is inactive | |
| Usage: | |
| @app.get("/protected") | |
| async def protected_route(user: User = Depends(get_current_active_user)): | |
| return {"message": f"Hello active user {user.username}"} | |
| """ | |
| # Future: Check if user.is_active, user.is_verified, etc. | |
| # if not current_user.is_active: | |
| # raise HTTPException(status_code=400, detail="Inactive user") | |
| return current_user | |