banter-api / auth_utils.py
EbukaGaus's picture
push
d5a3ec4
# comp_eb/auth_utils.py
import os
from datetime import datetime, timedelta, timezone
from passlib.context import CryptContext
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status, Cookie
from fastapi.security import OAuth2PasswordBearer
import database
import schemas, crud, models # Make sure models is imported
from database import get_db
from sqlalchemy.orm import Session
from password_utils import verify_password, get_password_hash
# --- Configuration ---
# Generate a secret key with: openssl rand -hex 32
SECRET_KEY = os.getenv("SECRET_KEY", "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
print("AUTH_UTILS SECRET_KEY:", SECRET_KEY)
# --- JWT Token Handling ---
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# --- Current User Dependency ---
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = schemas.TokenData(email=email)
except JWTError:
raise credentials_exception
user = crud.get_user_by_email(db, email=token_data.email)
if user is None:
raise credentials_exception
return user
# --- Admin User Dependency ---
async def get_current_admin_user(
admin_access_token: str = Cookie(None),
db: Session = Depends(database.get_db),
#current_user: models.User = Depends(get_current_user)):
):
"""
Dependency to get the current user and verify they are an admin.
Raises a 403 Forbidden error if the user is not an admin.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated as an admin",
headers={"WWW-Authenticate": "Bearer"},
)
if not admin_access_token:
raise credentials_exception
try:
payload = jwt.decode(admin_access_token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
role: str = payload.get("role")
if email is None or role != "admin":
raise credentials_exception
token_data = schemas.TokenData(email=email)
except JWTError:
raise credentials_exception
user = crud.get_user_by_email(db, email=token_data.email)
if user is None or not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user does not have admin type shit rights"
)
return user