"""Security utilities for authentication and authorization.""" import jwt from datetime import datetime, timedelta from passlib.context import CryptContext from fastapi import HTTPException, status from typing import Optional # Password hashing context pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(password: str) -> str: """ Hash a password using bcrypt. Args: password: Plain text password Returns: Hashed password string """ return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """ Verify a password against its hash. Args: plain_password: Plain text password to verify hashed_password: Hashed password to compare against Returns: True if password matches, False otherwise """ return pwd_context.verify(plain_password, hashed_password) def create_jwt_token(user_id: int, email: str, secret: str, expiration_days: int = 7) -> str: """ Create a JWT token for a user. Args: user_id: User's unique identifier email: User's email address secret: Secret key for signing the token expiration_days: Number of days until token expires (default: 7) Returns: Encoded JWT token string """ now = datetime.utcnow() payload = { "sub": str(user_id), "email": email, "iat": now, "exp": now + timedelta(days=expiration_days), "iss": "better-auth" } return jwt.encode(payload, secret, algorithm="HS256") def verify_jwt_token(token: str, secret: str) -> dict: """ Verify and decode a JWT token. Args: token: JWT token string to verify secret: Secret key used to sign the token Returns: Decoded token payload as dictionary Raises: HTTPException: 401 if token is expired or invalid """ try: payload = jwt.decode( token, secret, algorithms=["HS256"], options={ "verify_signature": True, "verify_exp": True, "require": ["sub", "email", "iat", "exp", "iss"] } ) # Validate issuer if payload.get("iss") != "better-auth": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token issuer", headers={"WWW-Authenticate": "Bearer"} ) return payload except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired", headers={"WWW-Authenticate": "Bearer"} ) except jwt.InvalidTokenError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token", headers={"WWW-Authenticate": "Bearer"} )