Spaces:
Sleeping
Sleeping
| import jwt | |
| from datetime import datetime, timedelta | |
| from typing import Optional, Dict, Any | |
| from fastapi import HTTPException, status | |
| from config.settings import settings | |
| from models.user import UserRead | |
| def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: | |
| """ | |
| Create a new access token with the provided data. | |
| Args: | |
| data: Dictionary containing the data to encode in the token | |
| expires_delta: Optional timedelta for token expiration (defaults to settings value) | |
| Returns: | |
| Encoded JWT token as string | |
| """ | |
| to_encode = data.copy() | |
| if expires_delta: | |
| expire = datetime.utcnow() + expires_delta | |
| else: | |
| expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes) | |
| to_encode.update({"exp": expire, "iat": datetime.utcnow(), "type": "access"}) | |
| encoded_jwt = jwt.encode( | |
| to_encode, | |
| settings.jwt_secret, | |
| algorithm=settings.jwt_algorithm | |
| ) | |
| return encoded_jwt | |
| def create_refresh_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: | |
| """ | |
| Create a new refresh token with the provided data. | |
| Args: | |
| data: Dictionary containing the data to encode in the token | |
| expires_delta: Optional timedelta for token expiration (defaults to settings value) | |
| Returns: | |
| Encoded JWT token as string | |
| """ | |
| to_encode = data.copy() | |
| if expires_delta: | |
| expire = datetime.utcnow() + expires_delta | |
| else: | |
| # Default refresh token expiration to 7 days | |
| expire = datetime.utcnow() + timedelta(days=settings.refresh_token_expire_days) | |
| to_encode.update({"exp": expire, "iat": datetime.utcnow(), "type": "refresh"}) | |
| encoded_jwt = jwt.encode( | |
| to_encode, | |
| settings.jwt_secret, | |
| algorithm=settings.jwt_algorithm | |
| ) | |
| return encoded_jwt | |
| def verify_token(token: str) -> Dict[str, Any]: | |
| """ | |
| Verify and decode a JWT token. | |
| Args: | |
| token: JWT token string to verify | |
| Returns: | |
| Decoded token payload as dictionary | |
| Raises: | |
| HTTPException: If token is invalid, expired, or cannot be decoded | |
| """ | |
| try: | |
| payload = jwt.decode( | |
| token, | |
| settings.jwt_secret, | |
| algorithms=[settings.jwt_algorithm] | |
| ) | |
| # Check if token is expired | |
| if "exp" in payload: | |
| exp_timestamp = payload["exp"] | |
| if datetime.fromtimestamp(exp_timestamp) < datetime.utcnow(): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Token has expired", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| return payload | |
| except jwt.ExpiredSignatureError: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Token has expired", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| except jwt.InvalidTokenError: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Could not validate credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| except Exception: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Could not validate credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| def get_user_id_from_token(token: str) -> int: | |
| """ | |
| Extract user ID from JWT token. | |
| Args: | |
| token: JWT token string | |
| Returns: | |
| User ID as integer | |
| Raises: | |
| HTTPException: If token is invalid or user_id is not in token | |
| """ | |
| payload = verify_token(token) | |
| user_id = payload.get("sub") | |
| if user_id is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Could not validate credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| # Ensure user_id is an integer | |
| try: | |
| user_id = int(user_id) | |
| except (ValueError, TypeError): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid user ID in token", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| return user_id |