""" FastAPI dependencies for authentication and authorization. """ from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlmodel import select from src.db.firebase import get_firebase_db from src.db.models import User from src.auth.security import decode_access_token from src.utils.logger import setup_logger logger = setup_logger(__name__) # OAuth2 scheme for extracting bearer tokens from Authorization header oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") from firebase_admin import auth as firebase_auth from src.db.firebase import verify_token import inspect import time async def get_current_user( token: str = Depends(oauth2_scheme) ) -> User: """ Get the currently authenticated user with support for Firebase and custom JWT. """ db = get_firebase_db() firebase_error = None # Log token prefix for debugging token_prefix = token[:10] if token else "None" logger.info(f"Authenticating token starting with: {token_prefix}...") # 1. Try Firebase Verification try: firebase_result = verify_token(token) # Check if verify_token returned a dict, otherwise it's an internal error if not isinstance(firebase_result, dict): firebase_error = f"Internal Error [CP0]: verify_token returned {type(firebase_result)} (Expected dict)" firebase_payload = None else: firebase_payload = firebase_result.get("payload") firebase_error = firebase_result.get("error") if firebase_payload is not None: uid = firebase_payload.get("uid") email = firebase_payload.get("email") if not uid: logger.error("Firebase payload missing 'uid' [CP1]") raise HTTPException(status_code=401, detail="Invalid Firebase token payload [CP1]") if db is None: logger.warning(f"Firestore not available, returning transient user for {email} [CP2]") return User(id=uid, email=email or "unknown@example.com", username=firebase_payload.get("name", uid), role="user") # Retrieve from Firestore user_doc = db.collection("users").document(uid).get() if user_doc.exists: user_data = user_doc.to_dict() user_data["id"] = user_doc.id # Ensure compatibility fields are set for User model instantiation user_data.setdefault("email", email or "unknown@example.com") user_data.setdefault("username", firebase_payload.get("name", uid)) user_data.setdefault("role", user_data.get("role", "user")) # Pydantic model instantiation (Resilient to missing password_hash due to model defaults) return User(**user_data) else: logger.info(f"New Firebase user detected: {email or uid} [CP3]") return User( id=uid, email=email or "unknown@example.com", username=firebase_payload.get("name", uid), # password_hash is optional in User model, or can be set to "" role="user" ) except HTTPException: raise except Exception as e: logger.error(f"Error in Firebase auth path: {repr(e)} [CP4]") firebase_error = str(e) or repr(e) # 2. Fallback to Custom JWT Decoding try: payload = decode_access_token(token) if payload: username: Optional[str] = payload.get("sub") if not username: raise HTTPException(status_code=401, detail="Token missing subject claim [CP5]") if db is None: # For custom JWT, if DB is not available, we can't verify user existence # This path should ideally not be hit if Firebase is the primary auth logger.warning(f"Firestore not available, returning mock user for {username} [CP5a]") return User(id="mock_id", email="mock@example.com", username=username, role="user") users_ref = db.collection("users") query = users_ref.where("username", "==", username).limit(1).stream() user_doc = next(query, None) if user_doc: user_data = user_doc.to_dict() user_data["id"] = user_doc.id return User(**user_data) else: logger.error(f"User {username} not found in database [CP6]") raise HTTPException(status_code=401, detail="User account not found [CP6]") except HTTPException: raise except Exception as e: logger.error(f"Error in custom JWT auth path: {repr(e)} [CP7]") # If both failed, then it's a 401 error_detail = f"Authentication failed [CP8]: {firebase_error if firebase_error else 'Invalid credentials'}" logger.error(error_detail) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=error_detail, headers={"WWW-Authenticate": "Bearer"}, ) async def get_current_active_user( current_user: User = Depends(get_current_user), ) -> User: """ Get the current active user (for future soft-delete support). Currently returns the user as-is, but can be extended to check for account status, email verification, banned users, etc. Args: current_user: User from get_current_user dependency Returns: User object if user is active Raises: HTTPException: 400 Bad Request if user is inactive Usage: @app.get("/protected") async def protected_route(user: User = Depends(get_current_active_user)): return {"message": f"Hello active user {user.username}"} """ # Future: Check if user.is_active, user.is_verified, etc. # if not current_user.is_active: # raise HTTPException(status_code=400, detail="Inactive user") return current_user