# comp_eb/auth_utils.py import os from datetime import datetime, timedelta, timezone from passlib.context import CryptContext from jose import JWTError, jwt from fastapi import Depends, HTTPException, status, Cookie from fastapi.security import OAuth2PasswordBearer import database import schemas, crud, models # Make sure models is imported from database import get_db from sqlalchemy.orm import Session from password_utils import verify_password, get_password_hash # --- Configuration --- # Generate a secret key with: openssl rand -hex 32 SECRET_KEY = os.getenv("SECRET_KEY", "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token") print("AUTH_UTILS SECRET_KEY:", SECRET_KEY) # --- JWT Token Handling --- def create_access_token(data: dict): to_encode = data.copy() expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt # --- Current User Dependency --- async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) email: str = payload.get("sub") if email is None: raise credentials_exception token_data = schemas.TokenData(email=email) except JWTError: raise credentials_exception user = crud.get_user_by_email(db, email=token_data.email) if user is None: raise credentials_exception return user # --- Admin User Dependency --- async def get_current_admin_user( admin_access_token: str = Cookie(None), db: Session = Depends(database.get_db), #current_user: models.User = Depends(get_current_user)): ): """ Dependency to get the current user and verify they are an admin. Raises a 403 Forbidden error if the user is not an admin. """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated as an admin", headers={"WWW-Authenticate": "Bearer"}, ) if not admin_access_token: raise credentials_exception try: payload = jwt.decode(admin_access_token, SECRET_KEY, algorithms=[ALGORITHM]) email: str = payload.get("sub") role: str = payload.get("role") if email is None or role != "admin": raise credentials_exception token_data = schemas.TokenData(email=email) except JWTError: raise credentials_exception user = crud.get_user_by_email(db, email=token_data.email) if user is None or not user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="The user does not have admin type shit rights" ) return user