Spaces:
Sleeping
Sleeping
| """Security utilities for authentication and authorization.""" | |
| import jwt | |
| from datetime import datetime, timedelta | |
| from passlib.context import CryptContext | |
| from fastapi import HTTPException, status | |
| from typing import Optional | |
| # Password hashing context | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| def hash_password(password: str) -> str: | |
| """ | |
| Hash a password using bcrypt. | |
| Args: | |
| password: Plain text password | |
| Returns: | |
| Hashed password string | |
| """ | |
| return pwd_context.hash(password) | |
| def verify_password(plain_password: str, hashed_password: str) -> bool: | |
| """ | |
| Verify a password against its hash. | |
| Args: | |
| plain_password: Plain text password to verify | |
| hashed_password: Hashed password to compare against | |
| Returns: | |
| True if password matches, False otherwise | |
| """ | |
| return pwd_context.verify(plain_password, hashed_password) | |
| def create_jwt_token(user_id: int, email: str, secret: str, expiration_days: int = 7) -> str: | |
| """ | |
| Create a JWT token for a user. | |
| Args: | |
| user_id: User's unique identifier | |
| email: User's email address | |
| secret: Secret key for signing the token | |
| expiration_days: Number of days until token expires (default: 7) | |
| Returns: | |
| Encoded JWT token string | |
| """ | |
| now = datetime.utcnow() | |
| payload = { | |
| "sub": str(user_id), | |
| "email": email, | |
| "iat": now, | |
| "exp": now + timedelta(days=expiration_days), | |
| "iss": "better-auth" | |
| } | |
| return jwt.encode(payload, secret, algorithm="HS256") | |
| def verify_jwt_token(token: str, secret: str) -> dict: | |
| """ | |
| Verify and decode a JWT token. | |
| Args: | |
| token: JWT token string to verify | |
| secret: Secret key used to sign the token | |
| Returns: | |
| Decoded token payload as dictionary | |
| Raises: | |
| HTTPException: 401 if token is expired or invalid | |
| """ | |
| try: | |
| payload = jwt.decode( | |
| token, | |
| secret, | |
| algorithms=["HS256"], | |
| options={ | |
| "verify_signature": True, | |
| "verify_exp": True, | |
| "require": ["sub", "email", "iat", "exp", "iss"] | |
| } | |
| ) | |
| # Validate issuer | |
| if payload.get("iss") != "better-auth": | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid token issuer", | |
| headers={"WWW-Authenticate": "Bearer"} | |
| ) | |
| return payload | |
| except jwt.ExpiredSignatureError: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Token has expired", | |
| headers={"WWW-Authenticate": "Bearer"} | |
| ) | |
| except jwt.InvalidTokenError: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid token", | |
| headers={"WWW-Authenticate": "Bearer"} | |
| ) | |