Spaces:
Sleeping
Sleeping
File size: 4,344 Bytes
697c967 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
import jwt
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from fastapi import HTTPException, status
from config.settings import settings
from models.user import UserRead
def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""
Create a new access token with the provided data.
Args:
data: Dictionary containing the data to encode in the token
expires_delta: Optional timedelta for token expiration (defaults to settings value)
Returns:
Encoded JWT token as string
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
to_encode.update({"exp": expire, "iat": datetime.utcnow(), "type": "access"})
encoded_jwt = jwt.encode(
to_encode,
settings.jwt_secret,
algorithm=settings.jwt_algorithm
)
return encoded_jwt
def create_refresh_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""
Create a new refresh token with the provided data.
Args:
data: Dictionary containing the data to encode in the token
expires_delta: Optional timedelta for token expiration (defaults to settings value)
Returns:
Encoded JWT token as string
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
# Default refresh token expiration to 7 days
expire = datetime.utcnow() + timedelta(days=settings.refresh_token_expire_days)
to_encode.update({"exp": expire, "iat": datetime.utcnow(), "type": "refresh"})
encoded_jwt = jwt.encode(
to_encode,
settings.jwt_secret,
algorithm=settings.jwt_algorithm
)
return encoded_jwt
def verify_token(token: str) -> Dict[str, Any]:
"""
Verify and decode a JWT token.
Args:
token: JWT token string to verify
Returns:
Decoded token payload as dictionary
Raises:
HTTPException: If token is invalid, expired, or cannot be decoded
"""
try:
payload = jwt.decode(
token,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm]
)
# Check if token is expired
if "exp" in payload:
exp_timestamp = payload["exp"]
if datetime.fromtimestamp(exp_timestamp) < datetime.utcnow():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
headers={"WWW-Authenticate": "Bearer"},
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
headers={"WWW-Authenticate": "Bearer"},
)
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
except Exception:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def get_user_id_from_token(token: str) -> int:
"""
Extract user ID from JWT token.
Args:
token: JWT token string
Returns:
User ID as integer
Raises:
HTTPException: If token is invalid or user_id is not in token
"""
payload = verify_token(token)
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Ensure user_id is an integer
try:
user_id = int(user_id)
except (ValueError, TypeError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid user ID in token",
headers={"WWW-Authenticate": "Bearer"},
)
return user_id |