Spaces:
Sleeping
Sleeping
| # """ | |
| # Security Utilities | |
| # Password hashing and JWT token management | |
| # """ | |
| # from datetime import datetime, timedelta | |
| # from typing import Optional | |
| # from jose import JWTError, jwt | |
| # from passlib.context import CryptContext | |
| # from app.config import settings | |
| # # Password hashing context | |
| # pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| # # ============================================================================ | |
| # # PASSWORD HASHING | |
| # # ============================================================================ | |
| # def hash_password(password: str) -> str: | |
| # """ | |
| # Hash a password using bcrypt. | |
| # Args: | |
| # password: Plain text password | |
| # Returns: | |
| # str: Hashed password | |
| # """ | |
| # return pwd_context.hash(password) | |
| # def verify_password(plain_password: str, hashed_password: str) -> bool: | |
| # """ | |
| # Verify a password against a hash. | |
| # Args: | |
| # plain_password: Plain text password to verify | |
| # hashed_password: Hashed password from database | |
| # Returns: | |
| # bool: True if password matches, False otherwise | |
| # """ | |
| # return pwd_context.verify(plain_password, hashed_password) | |
| # # ============================================================================ | |
| # # JWT TOKEN MANAGEMENT | |
| # # ============================================================================ | |
| # def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: | |
| # """ | |
| # Create a JWT access token. | |
| # Args: | |
| # data: Data to encode in token (user_id, email, etc.) | |
| # expires_delta: Optional custom expiration time | |
| # Returns: | |
| # str: Encoded JWT token | |
| # """ | |
| # to_encode = data.copy() | |
| # if expires_delta: | |
| # expire = datetime.utcnow() + expires_delta | |
| # else: | |
| # expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) | |
| # to_encode.update({"exp": expire}) | |
| # encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) | |
| # return encoded_jwt | |
| # def decode_access_token(token: str) -> Optional[dict]: | |
| # """ | |
| # Decode and verify a JWT token. | |
| # Args: | |
| # token: JWT token to decode | |
| # Returns: | |
| # dict: Decoded token data or None if invalid | |
| # """ | |
| # try: | |
| # payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) | |
| # return payload | |
| # except JWTError: | |
| # return None | |
| """ | |
| Security utilities for password hashing and JWT tokens | |
| """ | |
| from passlib.context import CryptContext | |
| from datetime import datetime, timedelta | |
| from jose import JWTError, jwt | |
| from app.config import settings | |
| # Password hashing context | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| def hash_password(password: str) -> str: | |
| """ | |
| Hash a password using bcrypt | |
| Args: | |
| password: Plain text password | |
| Returns: | |
| Hashed password | |
| """ | |
| # Bcrypt has a 72 byte limit, truncate if longer | |
| if len(password.encode('utf-8')) > 72: | |
| password = password[:72] | |
| return pwd_context.hash(password) | |
| def verify_password(plain_password: str, hashed_password: str) -> bool: | |
| """ | |
| Verify a password against its hash | |
| Args: | |
| plain_password: Plain text password to verify | |
| hashed_password: Stored hashed password | |
| Returns: | |
| True if password matches, False otherwise | |
| """ | |
| # Truncate to 72 bytes for bcrypt | |
| if len(plain_password.encode('utf-8')) > 72: | |
| plain_password = plain_password[:72] | |
| return pwd_context.verify(plain_password, hashed_password) | |
| def create_access_token(data: dict, expires_delta: timedelta = None) -> str: | |
| """ | |
| Create a JWT access token | |
| Args: | |
| data: Data to encode in the token (usually user_id, email) | |
| expires_delta: Token expiration time (default: from settings) | |
| Returns: | |
| Encoded JWT token string | |
| """ | |
| to_encode = data.copy() | |
| if expires_delta: | |
| expire = datetime.utcnow() + expires_delta | |
| else: | |
| expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) | |
| to_encode.update({"exp": expire}) | |
| encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) | |
| return encoded_jwt | |
| def decode_access_token(token: str) -> dict: | |
| """ | |
| Decode and verify a JWT token | |
| Args: | |
| token: JWT token string | |
| Returns: | |
| Decoded token data (dict) | |
| Raises: | |
| JWTError: If token is invalid or expired | |
| """ | |
| try: | |
| payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) | |
| return payload | |
| except JWTError: | |
| return None | |