| """ |
| Security utilities for JWT token handling and password hashing. |
| """ |
| from datetime import datetime, timedelta, timezone |
| from typing import Optional, Dict, Any |
| from jose import jwt, JWTError |
| import bcrypt |
| from app.config import get_settings |
|
|
|
|
| def verify_password(plain_password: str, hashed_password: str) -> bool: |
| """Verify a password against its hash.""" |
| try: |
| password_bytes = plain_password.encode('utf-8') |
| hash_bytes = hashed_password.encode('utf-8') |
| return bcrypt.checkpw(password_bytes, hash_bytes) |
| except Exception as e: |
| print(f"Error verifying password: {e}") |
| return False |
|
|
|
|
| def get_password_hash(password: str) -> str: |
| """Generate a password hash.""" |
| password_bytes = password.encode('utf-8') |
| salt = bcrypt.gensalt() |
| hashed = bcrypt.hashpw(password_bytes, salt) |
| return hashed.decode('utf-8') |
|
|
|
|
| def create_access_token( |
| data: Dict[str, Any], |
| expires_delta: Optional[timedelta] = None |
| ) -> str: |
| """ |
| Create a JWT access token. |
| |
| Args: |
| data: Payload data to encode in the token |
| expires_delta: Optional custom expiration time |
| |
| Returns: |
| Encoded JWT token string |
| """ |
| settings = get_settings() |
| to_encode = data.copy() |
| |
| if expires_delta: |
| expire = datetime.now(timezone.utc) + expires_delta |
| else: |
| expire = datetime.now(timezone.utc) + timedelta(minutes=settings.jwt_expiration_minutes) |
| |
| to_encode.update({"exp": expire}) |
| encoded_jwt = jwt.encode( |
| to_encode, |
| settings.jwt_secret, |
| algorithm=settings.jwt_algorithm |
| ) |
| return encoded_jwt |
|
|
|
|
| def decode_access_token(token: str) -> Optional[Dict[str, Any]]: |
| """ |
| Decode and validate a JWT access token. |
| |
| Args: |
| token: JWT token string |
| |
| Returns: |
| Decoded payload if valid, None otherwise |
| """ |
| settings = get_settings() |
| try: |
| payload = jwt.decode( |
| token, |
| settings.jwt_secret, |
| algorithms=[settings.jwt_algorithm] |
| ) |
| return payload |
| except JWTError: |
| return None |
|
|
|
|
| def create_refresh_token(user_id: str) -> str: |
| """ |
| Create a refresh token with extended expiration. |
| |
| Args: |
| user_id: User ID to encode in the token |
| |
| Returns: |
| Encoded JWT refresh token |
| """ |
| settings = get_settings() |
| expire = datetime.now(timezone.utc) + timedelta(days=7) |
| |
| to_encode = { |
| "sub": user_id, |
| "type": "refresh", |
| "exp": expire |
| } |
| |
| return jwt.encode( |
| to_encode, |
| settings.jwt_secret, |
| algorithm=settings.jwt_algorithm |
| ) |
|
|