File size: 2,873 Bytes
67f8819 425e1d7 67f8819 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | """
JWT verification service for Better Auth integration.
Per @specs/001-auth-api-bridge/research.md
"""
from datetime import datetime, timedelta
from typing import Optional, Dict
from jose import JWTError, jwt
from config import settings
import uuid
SECRET_KEY = settings.better_auth_secret
ALGORITHM = "HS256"
# In-memory store for password reset tokens (in production, use Redis or database)
PASSWORD_RESET_TOKENS: Dict[str, dict] = {}
def create_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
"""
Create a JWT token for a user.
Args:
user_id: User UUID to embed in the token
expires_delta: Optional custom expiration time
Returns:
Encoded JWT token string
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(hours=24)
payload = {
"sub": user_id,
"iat": datetime.utcnow(),
"exp": expire
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
async def verify_token(token: str) -> Optional[str]:
"""
Verify JWT token and return user_id.
Args:
token: JWT token string
Returns:
User ID (UUID string) if token is valid, None otherwise
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
return None
return user_id
except JWTError:
return None
def create_password_reset_token(email: str) -> str:
"""
Create a password reset token for an email.
Args:
email: User's email address
Returns:
Password reset token
"""
reset_token = str(uuid.uuid4())
expiry = datetime.utcnow() + timedelta(hours=1) # Token valid for 1 hour
PASSWORD_RESET_TOKENS[reset_token] = {
"email": email,
"expires": expiry
}
return reset_token
def verify_password_reset_token(token: str) -> Optional[str]:
"""
Verify password reset token and return email.
Args:
token: Password reset token
Returns:
Email if token is valid, None otherwise
"""
if token not in PASSWORD_RESET_TOKENS:
return None
token_data = PASSWORD_RESET_TOKENS[token]
# Check if token has expired
if datetime.utcnow() > token_data["expires"]:
del PASSWORD_RESET_TOKENS[token]
return None
return token_data["email"]
def consume_password_reset_token(token: str) -> bool:
"""
Consume (invalidate) a password reset token after use.
Args:
token: Password reset token to consume
Returns:
True if token was valid and consumed, False otherwise
"""
if token in PASSWORD_RESET_TOKENS:
del PASSWORD_RESET_TOKENS[token]
return True
return False
|