Spaces:
Sleeping
Sleeping
| from datetime import timedelta, datetime | |
| import anyio | |
| from fastapi import Depends, HTTPException | |
| from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer | |
| from jose import jwt, JWTError | |
| from passlib.context import CryptContext | |
| from trauma.api.account.dto import AccountType | |
| from trauma.api.account.model import AccountModel | |
| from trauma.core.config import settings | |
| def verify_password(plain_password, hashed_password) -> bool: | |
| result = CryptContext(schemes=["bcrypt"], deprecated="auto").verify(plain_password, hashed_password) | |
| return result | |
| def create_access_token(email: str, account_id: str): | |
| payload = { | |
| "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name": email, | |
| "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": account_id, | |
| "accountId": account_id, | |
| "iss": settings.Issuer, | |
| "aud": settings.Audience, | |
| "exp": datetime.utcnow() + timedelta(days=30) | |
| } | |
| encoded_jwt = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256") | |
| return encoded_jwt | |
| class PermissionDependency: | |
| def __init__(self, account_types: list[AccountType]): | |
| self.account_types = account_types | |
| def __call__( | |
| self, credentials: HTTPAuthorizationCredentials | None = Depends(HTTPBearer(auto_error=False)) | |
| ) -> AccountModel | None: | |
| try: | |
| account_id = self.authenticate_jwt_token(credentials.credentials) | |
| account_data = anyio.from_thread.run(self.get_account_by_id, account_id) | |
| self.check_account_health(account_data) | |
| return AccountModel.from_mongo(account_data) | |
| except JWTError: | |
| raise HTTPException(status_code=403, detail="Permission denied") | |
| except Exception: | |
| raise HTTPException(status_code=403, detail="Permission denied") | |
| async def get_account_by_id(account_id: str) -> dict: | |
| account = await settings.DB_CLIENT.accounts.find_one({"id": account_id}) | |
| return account | |
| def check_account_health(self, account: dict): | |
| if not account: | |
| raise HTTPException(status_code=403, detail="Permission denied") | |
| if account['accountType'] not in [i.value for i in self.account_types]: | |
| raise HTTPException(status_code=403, detail="Permission denied") | |
| def authenticate_jwt_token(token: str) -> str: | |
| payload = jwt.decode(token, | |
| settings.SECRET_KEY, | |
| algorithms="HS256", | |
| audience=settings.Audience) | |
| email: str = payload.get("http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name") | |
| account_id = payload.get("accountId") | |
| if email is None or account_id is None: | |
| raise HTTPException(status_code=403, detail="Permission denied") | |
| return account_id | |