from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional from jose import jwt, JWTError from passlib.hash import argon2 from fastapi import HTTPException, status from app.config import settings SECRET_KEY = settings.secret_key ALGORITHM = settings.algorithm ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes REFRESH_TOKEN_EXPIRE_DAYS = settings.refresh_token_expire_days def _now() -> datetime: return datetime.now(timezone.utc) def create_access_token(subject: str, role: Optional[str] = None) -> str: expire = _now() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) payload = {"sub": subject, "exp": expire, "iat": _now(), "type": "access"} if role: payload["role"] = role return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) def create_refresh_token(subject: str) -> str: expire = _now() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) payload = {"sub": subject, "exp": expire, "iat": _now(), "type": "refresh"} return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) def decode_token(token: str) -> Dict[str, Any]: try: return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError: raise ValueError("Invalid token or signature") def verify_access_token(token: str) -> str: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) subject: Optional[str] = payload.get("sub") token_type = payload.get("type") if subject is None or token_type != "access": raise credentials_exception return subject except JWTError: raise credentials_exception def hash_refresh_token(raw_refresh: str) -> str: return argon2.hash(raw_refresh) def verify_refresh_token(raw_refresh: str, hash_value: str) -> bool: try: return argon2.verify(raw_refresh, hash_value) except Exception: return False