import time from typing import Dict import jwt import secrets import logging from fastapi import Depends, HTTPException import base64 from datetime import datetime, timedelta # from repository import UserRepository, UserLoginRepository import string, random # def check_token_is_valid(token): # check = UserRepository.getEmailUserByAccessToken(token) # if check is None: # return False # return True def unique_string(byte: int = 8) -> str: return secrets.token_urlsafe(byte) JWT_SECRET = "404E635266556A586E3272357538782F413F4428472B4B6250645367566B5970" JWT_ALGORITHM = "HS256" SECRET_KEY="404E635266556A586E3272357538782F413F4428472B4B6250645367566B5970" def token_response(token: str): return { "access_token": token } def str_encode(string: str) -> str: return base64.b85encode(string.encode('ascii')).decode('ascii') def get_token_payload(token: str, secret: str, algo: str): try: payload = jwt.decode(token, secret, algorithms=algo) except Exception as jwt_exec: logging.debug(f"JWT Error: {str(jwt_exec)}") payload = None return payload from datetime import datetime def generate_token(payload: dict, secret: str, algo: str, expiry: timedelta): expire = datetime.now() + expiry payload.update({"exp": expire}) return jwt.encode(payload, secret, algorithm=algo) def str_decode(string: str) -> str: return base64.b85decode(string.encode('ascii')).decode('ascii') def generate_random_string(length=12): characters = string.ascii_letters + string.digits random_string = ''.join(random.choice(characters) for i in range(length)) return random_string import pytz from datetime import datetime # def signJWT(user_email: str) -> Dict[str, str]: # rt_expires = timedelta(days=3) # refresh_key = unique_string(100) # access_key = unique_string(50) # at_expires = timedelta(minutes=180) # at_payload = { # "sub": str_encode(str(user_email)), # 'a': access_key, # } # access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires) # rt_payload = {"sub": str_encode(str(user_email)), "t": refresh_key, 'a': access_key} # refresh_token = generate_token(rt_payload, SECRET_KEY,JWT_ALGORITHM, rt_expires) # expires_in = at_expires.seconds # vn_timezone = pytz.timezone('Asia/Ho_Chi_Minh') # current_time = datetime.now().replace(tzinfo=pytz.utc).astimezone(vn_timezone) + timedelta(seconds=expires_in) # formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S ') # existing_user = UserRepository.getUserByEmail(user_email) # if existing_user is None: # UserRepository.addUser(user_email, access_token, refresh_token, formatted_time) # else: # UserRepository.updateUserLogin(user_email, access_token, refresh_token, formatted_time) # user_record = UserRepository.getUserByEmail(user_email) # session_id = "" # if user_record: # session_id = generate_random_string() # existing_userlogin = UserLoginRepository.getUserLogin(user_email) # if existing_userlogin is None: # UserLoginRepository.addUserLogin(user_email,session_id=session_id) # else: # UserLoginRepository.updateUserLogin(user_email, session_id) # return { # "access_token": access_token, # "refresh_token": refresh_token, # "expires_in": at_expires.seconds, # "session_id": session_id # } # def returnAccessToken(user_email: str, refresh_token: str) -> Dict[str, str]: # access_key = unique_string(50) # at_expires = timedelta(minutes=180) # at_payload = { # "sub": str_encode(str(user_email)), # 'a': access_key, # } # access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires) # user_record = UserRepository.getUserByEmail(user_email) # session_id = "" # if user_record: # email1 = user_record.email # if email1: # session_id = generate_random_string() # existing_userlogin = UserLoginRepository.getUserLogin(user_email) # if existing_userlogin is None: # UserLoginRepository.addUserLogin(user_email,session_id=session_id) # else: # UserLoginRepository.updateUserLogin(user_email,session_id) # return { # "access_token": access_token, # "refresh_token": refresh_token, # "expires_in": at_expires.seconds, # "session_id": session_id # } def decodeJWT(token: str) -> dict: try: decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) return decoded_token if decoded_token["exp"] >= time.time() else None except: return {} # def get_refresh_token(refresh_token, email): # token_payload = get_token_payload(refresh_token, SECRET_KEY, JWT_ALGORITHM) # if not token_payload: # raise HTTPException(status_code=400, detail="Invalid Request.") # exp = token_payload.get('exp') # if exp >= time.time() and token_payload: # return returnAccessToken(email,refresh_token) # elif not token_payload: # return signJWT(email)