File size: 2,891 Bytes
50553ea
 
 
 
 
 
 
 
c6cc0f2
50553ea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c6cc0f2
 
50553ea
 
 
 
 
 
 
 
c6cc0f2
50553ea
 
 
8e611c3
 
50553ea
c6cc0f2
 
50553ea
c6cc0f2
50553ea
c6cc0f2
50553ea
 
c6cc0f2
 
50553ea
c6cc0f2
 
50553ea
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from datetime import timedelta, datetime

import anyio
from fastapi import Depends, HTTPException
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import jwt, JWTError
from passlib.context import CryptContext

from trauma.api.account.dto import AccountType
from trauma.api.account.model import AccountModel
from trauma.core.config import settings


def verify_password(plain_password, hashed_password) -> bool:
    result = CryptContext(schemes=["bcrypt"], deprecated="auto").verify(plain_password, hashed_password)
    return result


def create_access_token(email: str, account_id: str):
    payload = {
        "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name": email,
        "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": account_id,
        "accountId": account_id,
        "iss": settings.Issuer,
        "aud": settings.Audience,
        "exp": datetime.utcnow() + timedelta(days=30)
    }
    encoded_jwt = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
    return encoded_jwt


class PermissionDependency:
    def __init__(self, account_types: list[AccountType]):
        self.account_types = account_types

    def __call__(
            self, credentials: HTTPAuthorizationCredentials | None = Depends(HTTPBearer(auto_error=False))
    ) -> AccountModel | None:
        try:
            account_id = self.authenticate_jwt_token(credentials.credentials)
            account_data = anyio.from_thread.run(self.get_account_by_id, account_id)
            self.check_account_health(account_data)
            return AccountModel.from_mongo(account_data)

        except JWTError:
            raise HTTPException(status_code=403, detail="Permission denied")
        except Exception:
            raise HTTPException(status_code=403, detail="Permission denied")

    @staticmethod
    async def get_account_by_id(account_id: str) -> dict:
        account = await settings.DB_CLIENT.accounts.find_one({"id": account_id})
        return account

    def check_account_health(self, account: dict):
        if not account:
            raise HTTPException(status_code=403, detail="Permission denied")
        if account['accountType'] not in [i.value for i in self.account_types]:
            raise HTTPException(status_code=403, detail="Permission denied")

    @staticmethod
    def authenticate_jwt_token(token: str) -> str:
        payload = jwt.decode(token,
                             settings.SECRET_KEY,
                             algorithms="HS256",
                             audience=settings.Audience)
        email: str = payload.get("http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name")
        account_id = payload.get("accountId")

        if email is None or account_id is None:
            raise HTTPException(status_code=403, detail="Permission denied")

        return account_id