File size: 2,717 Bytes
c6abe34 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | """
Security utilities for JWT token handling and password hashing.
"""
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any
from jose import jwt, JWTError
import bcrypt
from app.config import get_settings
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
try:
password_bytes = plain_password.encode('utf-8')
hash_bytes = hashed_password.encode('utf-8')
return bcrypt.checkpw(password_bytes, hash_bytes)
except Exception as e:
print(f"Error verifying password: {e}")
return False
def get_password_hash(password: str) -> str:
"""Generate a password hash."""
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password_bytes, salt)
return hashed.decode('utf-8')
def create_access_token(
data: Dict[str, Any],
expires_delta: Optional[timedelta] = None
) -> str:
"""
Create a JWT access token.
Args:
data: Payload data to encode in the token
expires_delta: Optional custom expiration time
Returns:
Encoded JWT token string
"""
settings = get_settings()
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.jwt_expiration_minutes)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode,
settings.jwt_secret,
algorithm=settings.jwt_algorithm
)
return encoded_jwt
def decode_access_token(token: str) -> Optional[Dict[str, Any]]:
"""
Decode and validate a JWT access token.
Args:
token: JWT token string
Returns:
Decoded payload if valid, None otherwise
"""
settings = get_settings()
try:
payload = jwt.decode(
token,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm]
)
return payload
except JWTError:
return None
def create_refresh_token(user_id: str) -> str:
"""
Create a refresh token with extended expiration.
Args:
user_id: User ID to encode in the token
Returns:
Encoded JWT refresh token
"""
settings = get_settings()
expire = datetime.now(timezone.utc) + timedelta(days=7) # 7 days for refresh token
to_encode = {
"sub": user_id,
"type": "refresh",
"exp": expire
}
return jwt.encode(
to_encode,
settings.jwt_secret,
algorithm=settings.jwt_algorithm
)
|