be / app /security.py
Lucii1's picture
init prj
8ac4183
from datetime import datetime, timedelta, timezone
import jwt
from passlib.context import CryptContext
from app.config import settings
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from app.auth.models import User
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/v1/auth/login")
ALGORITHM = "HS256"
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(subject: str) -> str:
expire = datetime.now(timezone.utc) + timedelta(seconds=settings.JWT_ACCESS_TOKEN_EXPIRED_IN)
to_encode = {"sub": subject, "exp": expire}
return jwt.encode(to_encode, settings.JWT_ACCESS_TOKEN_SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(subject: str, extended: bool = False) -> str:
expire_in = (
settings.JWT_REFRESH_TOKEN_EXTENDED_EXPIRED_IN
if extended
else settings.JWT_REFRESH_TOKEN_EXPIRED_IN
)
expire = datetime.now(timezone.utc) + timedelta(seconds=expire_in)
to_encode = {"sub": subject, "exp": expire}
return jwt.encode(to_encode, settings.JWT_REFRESH_TOKEN_SECRET_KEY, algorithm=ALGORITHM)
def decode_access_token(token: str):
return jwt.decode(token, settings.JWT_ACCESS_TOKEN_SECRET_KEY, algorithms=[ALGORITHM])
def decode_refresh_token(token: str):
return jwt.decode(token, settings.JWT_REFRESH_TOKEN_SECRET_KEY, algorithms=[ALGORITHM])
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
try:
payload = jwt.decode(
token,
settings.JWT_ACCESS_TOKEN_SECRET_KEY,
algorithms=[ALGORITHM]
)
user_id: str = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token: no subject"
)
user = await User.get(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
return user
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired"
)
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)