chatbot_server / auth /authentication.py
kltn21110's picture
Upload 14 files
5db8fd5 verified
import time
from typing import Dict
import jwt
import secrets
import logging
from fastapi import Depends, HTTPException
import base64
from datetime import datetime, timedelta
# from repository import UserRepository, UserLoginRepository
import string, random
# def check_token_is_valid(token):
# check = UserRepository.getEmailUserByAccessToken(token)
# if check is None:
# return False
# return True
def unique_string(byte: int = 8) -> str:
return secrets.token_urlsafe(byte)
JWT_SECRET = "404E635266556A586E3272357538782F413F4428472B4B6250645367566B5970"
JWT_ALGORITHM = "HS256"
SECRET_KEY="404E635266556A586E3272357538782F413F4428472B4B6250645367566B5970"
def token_response(token: str):
return {
"access_token": token
}
def str_encode(string: str) -> str:
return base64.b85encode(string.encode('ascii')).decode('ascii')
def get_token_payload(token: str, secret: str, algo: str):
try:
payload = jwt.decode(token, secret, algorithms=algo)
except Exception as jwt_exec:
logging.debug(f"JWT Error: {str(jwt_exec)}")
payload = None
return payload
from datetime import datetime
def generate_token(payload: dict, secret: str, algo: str, expiry: timedelta):
expire = datetime.now() + expiry
payload.update({"exp": expire})
return jwt.encode(payload, secret, algorithm=algo)
def str_decode(string: str) -> str:
return base64.b85decode(string.encode('ascii')).decode('ascii')
def generate_random_string(length=12):
characters = string.ascii_letters + string.digits
random_string = ''.join(random.choice(characters) for i in range(length))
return random_string
import pytz
from datetime import datetime
# def signJWT(user_email: str) -> Dict[str, str]:
# rt_expires = timedelta(days=3)
# refresh_key = unique_string(100)
# access_key = unique_string(50)
# at_expires = timedelta(minutes=180)
# at_payload = {
# "sub": str_encode(str(user_email)),
# 'a': access_key,
# }
# access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
# rt_payload = {"sub": str_encode(str(user_email)), "t": refresh_key, 'a': access_key}
# refresh_token = generate_token(rt_payload, SECRET_KEY,JWT_ALGORITHM, rt_expires)
# expires_in = at_expires.seconds
# vn_timezone = pytz.timezone('Asia/Ho_Chi_Minh')
# current_time = datetime.now().replace(tzinfo=pytz.utc).astimezone(vn_timezone) + timedelta(seconds=expires_in)
# formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S ')
# existing_user = UserRepository.getUserByEmail(user_email)
# if existing_user is None:
# UserRepository.addUser(user_email, access_token, refresh_token, formatted_time)
# else:
# UserRepository.updateUserLogin(user_email, access_token, refresh_token, formatted_time)
# user_record = UserRepository.getUserByEmail(user_email)
# session_id = ""
# if user_record:
# session_id = generate_random_string()
# existing_userlogin = UserLoginRepository.getUserLogin(user_email)
# if existing_userlogin is None:
# UserLoginRepository.addUserLogin(user_email,session_id=session_id)
# else:
# UserLoginRepository.updateUserLogin(user_email, session_id)
# return {
# "access_token": access_token,
# "refresh_token": refresh_token,
# "expires_in": at_expires.seconds,
# "session_id": session_id
# }
# def returnAccessToken(user_email: str, refresh_token: str) -> Dict[str, str]:
# access_key = unique_string(50)
# at_expires = timedelta(minutes=180)
# at_payload = {
# "sub": str_encode(str(user_email)),
# 'a': access_key,
# }
# access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
# user_record = UserRepository.getUserByEmail(user_email)
# session_id = ""
# if user_record:
# email1 = user_record.email
# if email1:
# session_id = generate_random_string()
# existing_userlogin = UserLoginRepository.getUserLogin(user_email)
# if existing_userlogin is None:
# UserLoginRepository.addUserLogin(user_email,session_id=session_id)
# else:
# UserLoginRepository.updateUserLogin(user_email,session_id)
# return {
# "access_token": access_token,
# "refresh_token": refresh_token,
# "expires_in": at_expires.seconds,
# "session_id": session_id
# }
def decodeJWT(token: str) -> dict:
try:
decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return decoded_token if decoded_token["exp"] >= time.time() else None
except:
return {}
# def get_refresh_token(refresh_token, email):
# token_payload = get_token_payload(refresh_token, SECRET_KEY, JWT_ALGORITHM)
# if not token_payload:
# raise HTTPException(status_code=400, detail="Invalid Request.")
# exp = token_payload.get('exp')
# if exp >= time.time() and token_payload:
# return returnAccessToken(email,refresh_token)
# elif not token_payload:
# return signJWT(email)