| from datetime import datetime, timedelta, timezone |
| from typing import Any, Dict, Optional |
| from jose import jwt, JWTError |
| from passlib.hash import argon2 |
| from fastapi import HTTPException, status |
| from app.config import settings |
|
|
| SECRET_KEY = settings.secret_key |
| ALGORITHM = settings.algorithm |
| ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes |
| REFRESH_TOKEN_EXPIRE_DAYS = settings.refresh_token_expire_days |
|
|
| def _now() -> datetime: |
| return datetime.now(timezone.utc) |
|
|
| def create_access_token(subject: str, role: Optional[str] = None) -> str: |
| expire = _now() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) |
| payload = {"sub": subject, "exp": expire, "iat": _now(), "type": "access"} |
| if role: |
| payload["role"] = role |
| return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) |
|
|
| def create_refresh_token(subject: str) -> str: |
| expire = _now() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) |
| payload = {"sub": subject, "exp": expire, "iat": _now(), "type": "refresh"} |
| return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) |
|
|
| def decode_token(token: str) -> Dict[str, Any]: |
| try: |
| return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) |
| except JWTError: |
| raise ValueError("Invalid token or signature") |
|
|
| def verify_access_token(token: str) -> str: |
| credentials_exception = HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Could not validate credentials", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| try: |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) |
| subject: Optional[str] = payload.get("sub") |
| token_type = payload.get("type") |
| if subject is None or token_type != "access": |
| raise credentials_exception |
| return subject |
| except JWTError: |
| raise credentials_exception |
|
|
| def hash_refresh_token(raw_refresh: str) -> str: |
| return argon2.hash(raw_refresh) |
|
|
| def verify_refresh_token(raw_refresh: str, hash_value: str) -> bool: |
| try: |
| return argon2.verify(raw_refresh, hash_value) |
| except Exception: |
| return False |