| | """ |
| | JWT verification service for Better Auth integration. |
| | |
| | Per @specs/001-auth-api-bridge/research.md |
| | """ |
| | from datetime import datetime, timedelta |
| | from typing import Optional, Dict |
| | from jose import JWTError, jwt |
| | from config import settings |
| | import uuid |
| |
|
| | SECRET_KEY = settings.better_auth_secret |
| | ALGORITHM = "HS256" |
| |
|
| | |
| | PASSWORD_RESET_TOKENS: Dict[str, dict] = {} |
| |
|
| |
|
| | def create_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str: |
| | """ |
| | Create a JWT token for a user. |
| | |
| | Args: |
| | user_id: User UUID to embed in the token |
| | expires_delta: Optional custom expiration time |
| | |
| | Returns: |
| | Encoded JWT token string |
| | """ |
| | if expires_delta: |
| | expire = datetime.utcnow() + expires_delta |
| | else: |
| | expire = datetime.utcnow() + timedelta(hours=24) |
| |
|
| | payload = { |
| | "sub": user_id, |
| | "iat": datetime.utcnow(), |
| | "exp": expire |
| | } |
| |
|
| | return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) |
| |
|
| |
|
| | async def verify_token(token: str) -> Optional[str]: |
| | """ |
| | Verify JWT token and return user_id. |
| | |
| | Args: |
| | token: JWT token string |
| | |
| | Returns: |
| | User ID (UUID string) if token is valid, None otherwise |
| | """ |
| | try: |
| | payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) |
| | user_id: str = payload.get("sub") |
| | if user_id is None: |
| | return None |
| | return user_id |
| | except JWTError: |
| | return None |
| |
|
| |
|
| | def create_password_reset_token(email: str) -> str: |
| | """ |
| | Create a password reset token for an email. |
| | |
| | Args: |
| | email: User's email address |
| | |
| | Returns: |
| | Password reset token |
| | """ |
| | reset_token = str(uuid.uuid4()) |
| | expiry = datetime.utcnow() + timedelta(hours=1) |
| |
|
| | PASSWORD_RESET_TOKENS[reset_token] = { |
| | "email": email, |
| | "expires": expiry |
| | } |
| |
|
| | return reset_token |
| |
|
| |
|
| | def verify_password_reset_token(token: str) -> Optional[str]: |
| | """ |
| | Verify password reset token and return email. |
| | |
| | Args: |
| | token: Password reset token |
| | |
| | Returns: |
| | Email if token is valid, None otherwise |
| | """ |
| | if token not in PASSWORD_RESET_TOKENS: |
| | return None |
| |
|
| | token_data = PASSWORD_RESET_TOKENS[token] |
| |
|
| | |
| | if datetime.utcnow() > token_data["expires"]: |
| | del PASSWORD_RESET_TOKENS[token] |
| | return None |
| |
|
| | return token_data["email"] |
| |
|
| |
|
| | def consume_password_reset_token(token: str) -> bool: |
| | """ |
| | Consume (invalidate) a password reset token after use. |
| | |
| | Args: |
| | token: Password reset token to consume |
| | |
| | Returns: |
| | True if token was valid and consumed, False otherwise |
| | """ |
| | if token in PASSWORD_RESET_TOKENS: |
| | del PASSWORD_RESET_TOKENS[token] |
| | return True |
| | return False |
| |
|