File size: 2,257 Bytes
2cd3d4e
 
1a3a466
adb1c6b
 
1a3a466
 
 
adb1c6b
 
 
2cd3d4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
adb1c6b
 
41c0b3a
adb1c6b
 
 
41c0b3a
 
 
 
 
 
 
 
adb1c6b
 
 
 
 
1a3a466
adb1c6b
1a3a466
adb1c6b
 
1a3a466
adb1c6b
 
 
1a3a466
 
 
adb1c6b
 
1a3a466
 
adb1c6b
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import re
import logging
from fastapi import HTTPException, Security  # Import Security here
from jose import JWTError, jwt
from datetime import datetime, timedelta
from fastapi.security import HTTPBearer

security = HTTPBearer()

SECRET_KEY = "B@@kMy$er^!(e"
ALGORITHM = "HS256"

def validate_email(email: str) -> bool:
    """

    Validate the email address format.



    Args:

        email (str): The email address to validate.



    Returns:

        bool: True if valid, False otherwise.

    """
    email_regex = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
    return re.match(email_regex, email) is not None

def validate_mobile(mobile: str) -> bool:
    """

    Validate the mobile number format.



    Args:

        mobile (str): The mobile number to validate.



    Returns:

        bool: True if valid, False otherwise.

    """
    return mobile.isdigit() and len(mobile) == 10

def generate_tokens(identifier: str, merchant_id: str = None, role: str = None) -> dict:
    access_token_expiry = datetime.utcnow() + timedelta(minutes=30)
    refresh_token_expiry = datetime.utcnow() + timedelta(days=1)

    payload = {"sub": identifier, "exp": access_token_expiry}
    
    if merchant_id:
        payload["merchant_id"] = merchant_id
    if role:
        payload["role"] = role

    access_token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
    refresh_token = jwt.encode({"sub": identifier, "exp": refresh_token_expiry}, SECRET_KEY, algorithm=ALGORITHM)

    return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}


def verify_token(token: str = Security(security)) -> dict:
    """

    Verify the access token.



    Args:

        token (str): The access token.



    Returns:

        dict: Decoded token data.



    Raises:

        HTTPException: If the token is invalid or expired.

    """
    try:
        payload = jwt.decode(token.credentials, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token has expired")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")