backend / app /auth /jwt_handler.py
precison9's picture
deploy FastAPI backend
62a1756
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
from jose import jwt, JWTError
from passlib.hash import argon2
from fastapi import HTTPException, status
from app.config import settings
SECRET_KEY = settings.secret_key
ALGORITHM = settings.algorithm
ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes
REFRESH_TOKEN_EXPIRE_DAYS = settings.refresh_token_expire_days
def _now() -> datetime:
return datetime.now(timezone.utc)
def create_access_token(subject: str, role: Optional[str] = None) -> str:
expire = _now() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {"sub": subject, "exp": expire, "iat": _now(), "type": "access"}
if role:
payload["role"] = role
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(subject: str) -> str:
expire = _now() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
payload = {"sub": subject, "exp": expire, "iat": _now(), "type": "refresh"}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token: str) -> Dict[str, Any]:
try:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
raise ValueError("Invalid token or signature")
def verify_access_token(token: str) -> str:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
subject: Optional[str] = payload.get("sub")
token_type = payload.get("type")
if subject is None or token_type != "access":
raise credentials_exception
return subject
except JWTError:
raise credentials_exception
def hash_refresh_token(raw_refresh: str) -> str:
return argon2.hash(raw_refresh)
def verify_refresh_token(raw_refresh: str, hash_value: str) -> bool:
try:
return argon2.verify(raw_refresh, hash_value)
except Exception:
return False