File size: 2,873 Bytes
67f8819
 
 
 
 
 
 
 
425e1d7
67f8819
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""
JWT verification service for Better Auth integration.

Per @specs/001-auth-api-bridge/research.md
"""
from datetime import datetime, timedelta
from typing import Optional, Dict
from jose import JWTError, jwt
from config import settings
import uuid

SECRET_KEY = settings.better_auth_secret
ALGORITHM = "HS256"

# In-memory store for password reset tokens (in production, use Redis or database)
PASSWORD_RESET_TOKENS: Dict[str, dict] = {}


def create_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
    """
    Create a JWT token for a user.

    Args:
        user_id: User UUID to embed in the token
        expires_delta: Optional custom expiration time

    Returns:
        Encoded JWT token string
    """
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(hours=24)

    payload = {
        "sub": user_id,
        "iat": datetime.utcnow(),
        "exp": expire
    }

    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)


async def verify_token(token: str) -> Optional[str]:
    """
    Verify JWT token and return user_id.

    Args:
        token: JWT token string

    Returns:
        User ID (UUID string) if token is valid, None otherwise
    """
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            return None
        return user_id
    except JWTError:
        return None


def create_password_reset_token(email: str) -> str:
    """
    Create a password reset token for an email.

    Args:
        email: User's email address

    Returns:
        Password reset token
    """
    reset_token = str(uuid.uuid4())
    expiry = datetime.utcnow() + timedelta(hours=1)  # Token valid for 1 hour

    PASSWORD_RESET_TOKENS[reset_token] = {
        "email": email,
        "expires": expiry
    }

    return reset_token


def verify_password_reset_token(token: str) -> Optional[str]:
    """
    Verify password reset token and return email.

    Args:
        token: Password reset token

    Returns:
        Email if token is valid, None otherwise
    """
    if token not in PASSWORD_RESET_TOKENS:
        return None

    token_data = PASSWORD_RESET_TOKENS[token]

    # Check if token has expired
    if datetime.utcnow() > token_data["expires"]:
        del PASSWORD_RESET_TOKENS[token]
        return None

    return token_data["email"]


def consume_password_reset_token(token: str) -> bool:
    """
    Consume (invalidate) a password reset token after use.

    Args:
        token: Password reset token to consume

    Returns:
        True if token was valid and consumed, False otherwise
    """
    if token in PASSWORD_RESET_TOKENS:
        del PASSWORD_RESET_TOKENS[token]
        return True
    return False