elampt's picture
Add application file
a7323d0
import jwt
from datetime import datetime, timedelta
import schemas, database.connection as connection, models
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from config import settings
from jwt.exceptions import InvalidTokenError
from typing import Annotated
SECRET_KEY = settings.secret_key
ALGORITHM = settings.algorithm
ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire}) # Add expiration time to the token
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_access_token(token: str, credentials_exception):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = str(payload.get("user_id")) # Convert user_id to string
if user_id is None:
raise credentials_exception
token_data = schemas.TokenData(id=user_id)
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired", headers={"WWW-Authenticate": "Bearer"})
except InvalidTokenError:
raise credentials_exception
return token_data
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(connection.get_db)):
credentials_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Could not validate credentials", headers={"WWW-Authenticate": "Bearer"})
token_data = verify_access_token(token, credentials_exception)
user = db.query(models.User).filter(models.User.id == token_data.id).first()
if user is None:
raise credentials_exception
return user