Spaces:
Sleeping
Sleeping
| import jwt | |
| import base64 | |
| from fastapi import FastAPI, HTTPException, Depends, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| import os, sys | |
| BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) | |
| sys.path.insert(0, BASE_DIR) | |
| from repository.MySQL import UserRepository | |
| def verify_accesstoken(token:str): | |
| user_id = UserRepository.getUserIdByAccessToken(token) | |
| if user_id is None: | |
| return False | |
| else: | |
| return True | |
| class JwtService: | |
| SECRET_KEY = "404E635266556A586E3272357538782F413F4428472B4B6250645367566B5970" | |
| ALGORITHM = "HS256" | |
| def get_secret_key(): | |
| """Chuyển đổi secret key thành dạng bytes nếu cần""" | |
| try: | |
| return base64.b64decode(JwtService.SECRET_KEY) | |
| except Exception: | |
| return JwtService.SECRET_KEY.encode() | |
| def extract_user_id(token: str): | |
| """ | |
| Giải mã token và lấy UserId | |
| """ | |
| try: | |
| payload = jwt.decode(token, JwtService.get_secret_key(), algorithms=[JwtService.ALGORITHM]) | |
| check = verify_accesstoken(token) | |
| if check is False: | |
| raise HTTPException(status_code=404, detail="Not found Token") | |
| return payload.get("UserId", None) # Lấy giá trị "UserId" từ payload | |
| except jwt.ExpiredSignatureError: | |
| raise HTTPException(status_code=401, detail="Token Expired") | |
| except jwt.InvalidTokenError: | |
| raise HTTPException(status_code=401, detail="Invalid token.") | |
| def extract_user_role(token: str): | |
| """ | |
| Giải mã token và lấy UserId | |
| """ | |
| try: | |
| payload = jwt.decode(token, JwtService.get_secret_key(), algorithms=[JwtService.ALGORITHM]) | |
| check = verify_accesstoken(token) | |
| if check is False: | |
| raise HTTPException(status_code=404, detail="Not found Token") | |
| return payload.get("Roles", None) | |
| except jwt.ExpiredSignatureError: | |
| raise HTTPException(status_code=401, detail="Token Expired") | |
| except jwt.InvalidTokenError: | |
| raise HTTPException(status_code=401, detail="Invalid token.") |