AIdea-Server / src /auth /dependencies.py
Ahmed Mostafa
fix log v1.5.4
b17b103
"""
FastAPI dependencies for authentication and authorization.
"""
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlmodel import select
from src.db.firebase import get_firebase_db
from src.db.models import User
from src.auth.security import decode_access_token
from src.utils.logger import setup_logger
logger = setup_logger(__name__)
# OAuth2 scheme for extracting bearer tokens from Authorization header
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
from firebase_admin import auth as firebase_auth
from src.db.firebase import verify_token
import inspect
import time
async def get_current_user(
token: str = Depends(oauth2_scheme)
) -> User:
"""
Get the currently authenticated user with support for Firebase and custom JWT.
"""
db = get_firebase_db()
firebase_error = None
# Log token prefix for debugging
token_prefix = token[:10] if token else "None"
logger.info(f"Authenticating token starting with: {token_prefix}...")
# 1. Try Firebase Verification
try:
firebase_result = verify_token(token)
# Check if verify_token returned a dict, otherwise it's an internal error
if not isinstance(firebase_result, dict):
firebase_error = f"Internal Error [CP0]: verify_token returned {type(firebase_result)} (Expected dict)"
firebase_payload = None
else:
firebase_payload = firebase_result.get("payload")
firebase_error = firebase_result.get("error")
if firebase_payload is not None:
uid = firebase_payload.get("uid")
email = firebase_payload.get("email")
if not uid:
logger.error("Firebase payload missing 'uid' [CP1]")
raise HTTPException(status_code=401, detail="Invalid Firebase token payload [CP1]")
if db is None:
logger.warning(f"Firestore not available, returning transient user for {email} [CP2]")
return User(id=uid, email=email or "unknown@example.com", username=firebase_payload.get("name", uid), role="user")
# Retrieve from Firestore
user_doc = db.collection("users").document(uid).get()
if user_doc.exists:
user_data = user_doc.to_dict()
user_data["id"] = user_doc.id
# Ensure compatibility fields are set for User model instantiation
user_data.setdefault("email", email or "unknown@example.com")
user_data.setdefault("username", firebase_payload.get("name", uid))
user_data.setdefault("role", user_data.get("role", "user"))
# Pydantic model instantiation (Resilient to missing password_hash due to model defaults)
return User(**user_data)
else:
logger.info(f"New Firebase user detected: {email or uid} [CP3]")
return User(
id=uid,
email=email or "unknown@example.com",
username=firebase_payload.get("name", uid),
# password_hash is optional in User model, or can be set to ""
role="user"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in Firebase auth path: {repr(e)} [CP4]")
firebase_error = str(e) or repr(e)
# 2. Fallback to Custom JWT Decoding
try:
payload = decode_access_token(token)
if payload:
username: Optional[str] = payload.get("sub")
if not username:
raise HTTPException(status_code=401, detail="Token missing subject claim [CP5]")
if db is None:
# For custom JWT, if DB is not available, we can't verify user existence
# This path should ideally not be hit if Firebase is the primary auth
logger.warning(f"Firestore not available, returning mock user for {username} [CP5a]")
return User(id="mock_id", email="mock@example.com", username=username, role="user")
users_ref = db.collection("users")
query = users_ref.where("username", "==", username).limit(1).stream()
user_doc = next(query, None)
if user_doc:
user_data = user_doc.to_dict()
user_data["id"] = user_doc.id
return User(**user_data)
else:
logger.error(f"User {username} not found in database [CP6]")
raise HTTPException(status_code=401, detail="User account not found [CP6]")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in custom JWT auth path: {repr(e)} [CP7]")
# If both failed, then it's a 401
error_detail = f"Authentication failed [CP8]: {firebase_error if firebase_error else 'Invalid credentials'}"
logger.error(error_detail)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=error_detail,
headers={"WWW-Authenticate": "Bearer"},
)
async def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
"""
Get the current active user (for future soft-delete support).
Currently returns the user as-is, but can be extended to check
for account status, email verification, banned users, etc.
Args:
current_user: User from get_current_user dependency
Returns:
User object if user is active
Raises:
HTTPException: 400 Bad Request if user is inactive
Usage:
@app.get("/protected")
async def protected_route(user: User = Depends(get_current_active_user)):
return {"message": f"Hello active user {user.username}"}
"""
# Future: Check if user.is_active, user.is_verified, etc.
# if not current_user.is_active:
# raise HTTPException(status_code=400, detail="Inactive user")
return current_user