from datetime import datetime, timedelta, timezone import jwt from passlib.context import CryptContext from app.config import settings from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from app.auth.models import User pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/v1/auth/login") ALGORITHM = "HS256" def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def create_access_token(subject: str) -> str: expire = datetime.now(timezone.utc) + timedelta(seconds=settings.JWT_ACCESS_TOKEN_EXPIRED_IN) to_encode = {"sub": subject, "exp": expire} return jwt.encode(to_encode, settings.JWT_ACCESS_TOKEN_SECRET_KEY, algorithm=ALGORITHM) def create_refresh_token(subject: str, extended: bool = False) -> str: expire_in = ( settings.JWT_REFRESH_TOKEN_EXTENDED_EXPIRED_IN if extended else settings.JWT_REFRESH_TOKEN_EXPIRED_IN ) expire = datetime.now(timezone.utc) + timedelta(seconds=expire_in) to_encode = {"sub": subject, "exp": expire} return jwt.encode(to_encode, settings.JWT_REFRESH_TOKEN_SECRET_KEY, algorithm=ALGORITHM) def decode_access_token(token: str): return jwt.decode(token, settings.JWT_ACCESS_TOKEN_SECRET_KEY, algorithms=[ALGORITHM]) def decode_refresh_token(token: str): return jwt.decode(token, settings.JWT_REFRESH_TOKEN_SECRET_KEY, algorithms=[ALGORITHM]) async def get_current_user(token: str = Depends(oauth2_scheme)) -> User: try: payload = jwt.decode( token, settings.JWT_ACCESS_TOKEN_SECRET_KEY, algorithms=[ALGORITHM] ) user_id: str = payload.get("sub") if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token: no subject" ) user = await User.get(user_id) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found" ) return user except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired" ) except jwt.PyJWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token" )