chatbot_server / decode_token.py
kltn21110's picture
Upload 12 files
b76157c verified
import jwt
import base64
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import os, sys
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
sys.path.insert(0, BASE_DIR)
from repository.MySQL import UserRepository
def verify_accesstoken(token:str):
user_id = UserRepository.getUserIdByAccessToken(token)
if user_id is None:
return False
else:
return True
class JwtService:
SECRET_KEY = "404E635266556A586E3272357538782F413F4428472B4B6250645367566B5970"
ALGORITHM = "HS256"
@staticmethod
def get_secret_key():
"""Chuyển đổi secret key thành dạng bytes nếu cần"""
try:
return base64.b64decode(JwtService.SECRET_KEY)
except Exception:
return JwtService.SECRET_KEY.encode()
@staticmethod
def extract_user_id(token: str):
"""
Giải mã token và lấy UserId
"""
try:
payload = jwt.decode(token, JwtService.get_secret_key(), algorithms=[JwtService.ALGORITHM])
check = verify_accesstoken(token)
if check is False:
raise HTTPException(status_code=404, detail="Not found Token")
return payload.get("UserId", None) # Lấy giá trị "UserId" từ payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token Expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token.")
@staticmethod
def extract_user_role(token: str):
"""
Giải mã token và lấy UserId
"""
try:
payload = jwt.decode(token, JwtService.get_secret_key(), algorithms=[JwtService.ALGORITHM])
check = verify_accesstoken(token)
if check is False:
raise HTTPException(status_code=404, detail="Not found Token")
return payload.get("Roles", None)
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token Expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token.")