Spaces:
Running
Running
File size: 1,758 Bytes
cf25e9f 2d2110c cf25e9f 2d2110c cf25e9f 2d2110c cf25e9f 2d2110c cf25e9f 2d2110c cf25e9f 2d2110c cf25e9f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | from datetime import datetime, timedelta
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from app.cores.config import settings
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from app.cores.database import get_db
from app.models.user import User
from app.schemas.token import TokenData
from sqlalchemy.orm import Session
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def decode_token(token: str) -> dict:
return jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_token(token)
user_id = payload.get("user_id")
if user_id is None:
raise credentials_exception
except:
raise credentials_exception
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise credentials_exception
return user |