import jwt from datetime import datetime, timedelta import schemas, database.connection as connection, models from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.orm import Session from config import settings from jwt.exceptions import InvalidTokenError from typing import Annotated SECRET_KEY = settings.secret_key ALGORITHM = settings.algorithm ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login") def create_access_token(data: dict) -> str: to_encode = data.copy() expire = datetime.now() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) # Add expiration time to the token encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_access_token(token: str, credentials_exception): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id = str(payload.get("user_id")) # Convert user_id to string if user_id is None: raise credentials_exception token_data = schemas.TokenData(id=user_id) except jwt.ExpiredSignatureError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired", headers={"WWW-Authenticate": "Bearer"}) except InvalidTokenError: raise credentials_exception return token_data async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(connection.get_db)): credentials_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}) token_data = verify_access_token(token, credentials_exception) user = db.query(models.User).filter(models.User.id == token_data.id).first() if user is None: raise credentials_exception return user