Spaces:
Sleeping
Sleeping
| import time | |
| from typing import Dict | |
| import jwt | |
| import secrets | |
| import logging | |
| from fastapi import Depends, HTTPException | |
| import base64 | |
| from datetime import datetime, timedelta | |
| # from repository import UserRepository, UserLoginRepository | |
| import string, random | |
| # def check_token_is_valid(token): | |
| # check = UserRepository.getEmailUserByAccessToken(token) | |
| # if check is None: | |
| # return False | |
| # return True | |
| def unique_string(byte: int = 8) -> str: | |
| return secrets.token_urlsafe(byte) | |
| JWT_SECRET = "404E635266556A586E3272357538782F413F4428472B4B6250645367566B5970" | |
| JWT_ALGORITHM = "HS256" | |
| SECRET_KEY="404E635266556A586E3272357538782F413F4428472B4B6250645367566B5970" | |
| def token_response(token: str): | |
| return { | |
| "access_token": token | |
| } | |
| def str_encode(string: str) -> str: | |
| return base64.b85encode(string.encode('ascii')).decode('ascii') | |
| def get_token_payload(token: str, secret: str, algo: str): | |
| try: | |
| payload = jwt.decode(token, secret, algorithms=algo) | |
| except Exception as jwt_exec: | |
| logging.debug(f"JWT Error: {str(jwt_exec)}") | |
| payload = None | |
| return payload | |
| from datetime import datetime | |
| def generate_token(payload: dict, secret: str, algo: str, expiry: timedelta): | |
| expire = datetime.now() + expiry | |
| payload.update({"exp": expire}) | |
| return jwt.encode(payload, secret, algorithm=algo) | |
| def str_decode(string: str) -> str: | |
| return base64.b85decode(string.encode('ascii')).decode('ascii') | |
| def generate_random_string(length=12): | |
| characters = string.ascii_letters + string.digits | |
| random_string = ''.join(random.choice(characters) for i in range(length)) | |
| return random_string | |
| import pytz | |
| from datetime import datetime | |
| # def signJWT(user_email: str) -> Dict[str, str]: | |
| # rt_expires = timedelta(days=3) | |
| # refresh_key = unique_string(100) | |
| # access_key = unique_string(50) | |
| # at_expires = timedelta(minutes=180) | |
| # at_payload = { | |
| # "sub": str_encode(str(user_email)), | |
| # 'a': access_key, | |
| # } | |
| # access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires) | |
| # rt_payload = {"sub": str_encode(str(user_email)), "t": refresh_key, 'a': access_key} | |
| # refresh_token = generate_token(rt_payload, SECRET_KEY,JWT_ALGORITHM, rt_expires) | |
| # expires_in = at_expires.seconds | |
| # vn_timezone = pytz.timezone('Asia/Ho_Chi_Minh') | |
| # current_time = datetime.now().replace(tzinfo=pytz.utc).astimezone(vn_timezone) + timedelta(seconds=expires_in) | |
| # formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S ') | |
| # existing_user = UserRepository.getUserByEmail(user_email) | |
| # if existing_user is None: | |
| # UserRepository.addUser(user_email, access_token, refresh_token, formatted_time) | |
| # else: | |
| # UserRepository.updateUserLogin(user_email, access_token, refresh_token, formatted_time) | |
| # user_record = UserRepository.getUserByEmail(user_email) | |
| # session_id = "" | |
| # if user_record: | |
| # session_id = generate_random_string() | |
| # existing_userlogin = UserLoginRepository.getUserLogin(user_email) | |
| # if existing_userlogin is None: | |
| # UserLoginRepository.addUserLogin(user_email,session_id=session_id) | |
| # else: | |
| # UserLoginRepository.updateUserLogin(user_email, session_id) | |
| # return { | |
| # "access_token": access_token, | |
| # "refresh_token": refresh_token, | |
| # "expires_in": at_expires.seconds, | |
| # "session_id": session_id | |
| # } | |
| # def returnAccessToken(user_email: str, refresh_token: str) -> Dict[str, str]: | |
| # access_key = unique_string(50) | |
| # at_expires = timedelta(minutes=180) | |
| # at_payload = { | |
| # "sub": str_encode(str(user_email)), | |
| # 'a': access_key, | |
| # } | |
| # access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires) | |
| # user_record = UserRepository.getUserByEmail(user_email) | |
| # session_id = "" | |
| # if user_record: | |
| # email1 = user_record.email | |
| # if email1: | |
| # session_id = generate_random_string() | |
| # existing_userlogin = UserLoginRepository.getUserLogin(user_email) | |
| # if existing_userlogin is None: | |
| # UserLoginRepository.addUserLogin(user_email,session_id=session_id) | |
| # else: | |
| # UserLoginRepository.updateUserLogin(user_email,session_id) | |
| # return { | |
| # "access_token": access_token, | |
| # "refresh_token": refresh_token, | |
| # "expires_in": at_expires.seconds, | |
| # "session_id": session_id | |
| # } | |
| def decodeJWT(token: str) -> dict: | |
| try: | |
| decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) | |
| return decoded_token if decoded_token["exp"] >= time.time() else None | |
| except: | |
| return {} | |
| # def get_refresh_token(refresh_token, email): | |
| # token_payload = get_token_payload(refresh_token, SECRET_KEY, JWT_ALGORITHM) | |
| # if not token_payload: | |
| # raise HTTPException(status_code=400, detail="Invalid Request.") | |
| # exp = token_payload.get('exp') | |
| # if exp >= time.time() and token_payload: | |
| # return returnAccessToken(email,refresh_token) | |
| # elif not token_payload: | |
| # return signJWT(email) |