File size: 5,312 Bytes
5db8fd5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import time
from typing import Dict
import jwt
import secrets
import logging
from fastapi import Depends, HTTPException
import base64
from datetime import datetime, timedelta
# from repository import UserRepository, UserLoginRepository
import string, random

# def check_token_is_valid(token):
#     check = UserRepository.getEmailUserByAccessToken(token)
#     if check is None:
#         return False
#     return True

def unique_string(byte: int = 8) -> str:
    return secrets.token_urlsafe(byte)
JWT_SECRET = "404E635266556A586E3272357538782F413F4428472B4B6250645367566B5970"
JWT_ALGORITHM = "HS256"
SECRET_KEY="404E635266556A586E3272357538782F413F4428472B4B6250645367566B5970"

def token_response(token: str):
    return {
        "access_token": token
    }
def str_encode(string: str) -> str:
    return base64.b85encode(string.encode('ascii')).decode('ascii')

def get_token_payload(token: str, secret: str, algo: str):
    try:
        payload = jwt.decode(token, secret, algorithms=algo)
    except Exception as jwt_exec:
        logging.debug(f"JWT Error: {str(jwt_exec)}")
        payload = None
    return payload

from datetime import datetime
def generate_token(payload: dict, secret: str, algo: str, expiry: timedelta):
    expire = datetime.now() + expiry
    payload.update({"exp": expire})
    return jwt.encode(payload, secret, algorithm=algo)

def str_decode(string: str) -> str:
    return base64.b85decode(string.encode('ascii')).decode('ascii')

def generate_random_string(length=12):
    characters = string.ascii_letters + string.digits
    random_string = ''.join(random.choice(characters) for i in range(length))
    return random_string

import pytz
from datetime import datetime
# def signJWT(user_email: str) -> Dict[str, str]:
#     rt_expires = timedelta(days=3)
#     refresh_key = unique_string(100)
#     access_key = unique_string(50)
#     at_expires = timedelta(minutes=180)
#     at_payload = {
#         "sub": str_encode(str(user_email)),
#         'a': access_key,
#     }
#     access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
#     rt_payload = {"sub": str_encode(str(user_email)), "t": refresh_key, 'a': access_key}
#     refresh_token = generate_token(rt_payload, SECRET_KEY,JWT_ALGORITHM, rt_expires)
#     expires_in = at_expires.seconds
#     vn_timezone = pytz.timezone('Asia/Ho_Chi_Minh')
#     current_time = datetime.now().replace(tzinfo=pytz.utc).astimezone(vn_timezone) + timedelta(seconds=expires_in)
#     formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S ')
#     existing_user = UserRepository.getUserByEmail(user_email)
#     if existing_user is None:
#         UserRepository.addUser(user_email, access_token, refresh_token, formatted_time)
#     else:
#         UserRepository.updateUserLogin(user_email, access_token, refresh_token, formatted_time)
#     user_record = UserRepository.getUserByEmail(user_email)
#     session_id = ""
#     if user_record:
#         session_id = generate_random_string()
#         existing_userlogin = UserLoginRepository.getUserLogin(user_email)
#         if existing_userlogin is None:
#             UserLoginRepository.addUserLogin(user_email,session_id=session_id)
#         else:
#             UserLoginRepository.updateUserLogin(user_email, session_id)
#     return {
#         "access_token": access_token,
#         "refresh_token": refresh_token,
#         "expires_in": at_expires.seconds,
#         "session_id": session_id
#      }

# def returnAccessToken(user_email: str, refresh_token: str) -> Dict[str, str]:
#     access_key = unique_string(50)
#     at_expires = timedelta(minutes=180)
#     at_payload = {
#         "sub": str_encode(str(user_email)),
#         'a': access_key,
#     }
#     access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
#     user_record = UserRepository.getUserByEmail(user_email)
#     session_id = ""
#     if user_record:
#         email1 = user_record.email
#     if email1:
#         session_id = generate_random_string()
#         existing_userlogin = UserLoginRepository.getUserLogin(user_email)
#         if existing_userlogin is None:
#             UserLoginRepository.addUserLogin(user_email,session_id=session_id)
#         else:
#             UserLoginRepository.updateUserLogin(user_email,session_id)
#     return {
#         "access_token": access_token,
#         "refresh_token": refresh_token,
#         "expires_in": at_expires.seconds,
#         "session_id": session_id
#     }

def decodeJWT(token: str) -> dict:
    try:
        decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
        return decoded_token if decoded_token["exp"] >= time.time() else None
    except:
        return {}
    
# def get_refresh_token(refresh_token, email):
#     token_payload = get_token_payload(refresh_token, SECRET_KEY, JWT_ALGORITHM)
#     if not token_payload:
#         raise HTTPException(status_code=400, detail="Invalid Request.")
#     exp = token_payload.get('exp')
#     if exp >= time.time() and token_payload:
#        return returnAccessToken(email,refresh_token)
#     elif not token_payload:
#        return signJWT(email)