from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from jose import JWTError, jwt from ..db.database import get_db from ..db.models import User from ..core.config import settings oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login") async def get_current_user( token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db) ): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) user_id: str = payload.get("sub") if user_id is None: raise credentials_exception except JWTError: raise credentials_exception stmt = select(User).where(User.id == int(user_id)) result = await db.execute(stmt) user = result.scalar_one_or_none() if user is None: raise credentials_exception return user async def get_current_active_user( current_user: User = Depends(get_current_user) ): if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user async def get_current_superuser( current_user: User = Depends(get_current_user) ): if not current_user.is_superuser: raise HTTPException( status_code=403, detail="The user doesn't have enough privileges" ) return current_user