Todo_App / auth /jwt_handler.py
Abdullahcoder54's picture
Push the app
697c967
raw
history blame
4.34 kB
import jwt
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from fastapi import HTTPException, status
from config.settings import settings
from models.user import UserRead
def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""
Create a new access token with the provided data.
Args:
data: Dictionary containing the data to encode in the token
expires_delta: Optional timedelta for token expiration (defaults to settings value)
Returns:
Encoded JWT token as string
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
to_encode.update({"exp": expire, "iat": datetime.utcnow(), "type": "access"})
encoded_jwt = jwt.encode(
to_encode,
settings.jwt_secret,
algorithm=settings.jwt_algorithm
)
return encoded_jwt
def create_refresh_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""
Create a new refresh token with the provided data.
Args:
data: Dictionary containing the data to encode in the token
expires_delta: Optional timedelta for token expiration (defaults to settings value)
Returns:
Encoded JWT token as string
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
# Default refresh token expiration to 7 days
expire = datetime.utcnow() + timedelta(days=settings.refresh_token_expire_days)
to_encode.update({"exp": expire, "iat": datetime.utcnow(), "type": "refresh"})
encoded_jwt = jwt.encode(
to_encode,
settings.jwt_secret,
algorithm=settings.jwt_algorithm
)
return encoded_jwt
def verify_token(token: str) -> Dict[str, Any]:
"""
Verify and decode a JWT token.
Args:
token: JWT token string to verify
Returns:
Decoded token payload as dictionary
Raises:
HTTPException: If token is invalid, expired, or cannot be decoded
"""
try:
payload = jwt.decode(
token,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm]
)
# Check if token is expired
if "exp" in payload:
exp_timestamp = payload["exp"]
if datetime.fromtimestamp(exp_timestamp) < datetime.utcnow():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
headers={"WWW-Authenticate": "Bearer"},
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
headers={"WWW-Authenticate": "Bearer"},
)
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
except Exception:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def get_user_id_from_token(token: str) -> int:
"""
Extract user ID from JWT token.
Args:
token: JWT token string
Returns:
User ID as integer
Raises:
HTTPException: If token is invalid or user_id is not in token
"""
payload = verify_token(token)
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Ensure user_id is an integer
try:
user_id = int(user_id)
except (ValueError, TypeError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid user ID in token",
headers={"WWW-Authenticate": "Bearer"},
)
return user_id