import jwt import base64 from fastapi import FastAPI, HTTPException, Depends, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import os, sys BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) sys.path.insert(0, BASE_DIR) from repository.MySQL import UserRepository def verify_accesstoken(token:str): user_id = UserRepository.getUserIdByAccessToken(token) if user_id is None: return False else: return True class JwtService: SECRET_KEY = "404E635266556A586E3272357538782F413F4428472B4B6250645367566B5970" ALGORITHM = "HS256" @staticmethod def get_secret_key(): """Chuyển đổi secret key thành dạng bytes nếu cần""" try: return base64.b64decode(JwtService.SECRET_KEY) except Exception: return JwtService.SECRET_KEY.encode() @staticmethod def extract_user_id(token: str): """ Giải mã token và lấy UserId """ try: payload = jwt.decode(token, JwtService.get_secret_key(), algorithms=[JwtService.ALGORITHM]) check = verify_accesstoken(token) if check is False: raise HTTPException(status_code=404, detail="Not found Token") return payload.get("UserId", None) # Lấy giá trị "UserId" từ payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token Expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token.") @staticmethod def extract_user_role(token: str): """ Giải mã token và lấy UserId """ try: payload = jwt.decode(token, JwtService.get_secret_key(), algorithms=[JwtService.ALGORITHM]) check = verify_accesstoken(token) if check is False: raise HTTPException(status_code=404, detail="Not found Token") return payload.get("Roles", None) except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token Expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token.")