from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from sqlalchemy.orm import selectinload from jose import JWTError, jwt from ..db.database import get_db from ..db.models import User from ..core.config import settings oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login") async def get_current_user( token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db) ): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) user_id: str = payload.get("sub") if user_id is None: raise credentials_exception except JWTError: raise credentials_exception stmt = select(User).options(selectinload(User.roles)).where(User.id == int(user_id)) result = await db.execute(stmt) user = result.scalar_one_or_none() if user is None: raise credentials_exception return user async def get_current_active_user( current_user: User = Depends(get_current_user) ): if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user async def get_current_superuser( current_user: User = Depends(get_current_user) ): if not current_user.is_superuser: raise HTTPException( status_code=403, detail="The user doesn't have enough privileges" ) return current_user