| import jwt | |
| from fastapi import HTTPException, status | |
| from fastapi.security import SecurityScopes, HTTPBearer | |
| from fastapi import Depends, Request | |
| from app.config.env import env | |
| LOAN_OFFICER_PERMISSIONS = ["get:lead", "create:loan", "update:loan"] | |
| class UnauthorizedException(HTTPException): | |
| def __init__(self, detail: str, **kwargs): | |
| super().__init__(status.HTTP_403_FORBIDDEN, detail=detail) | |
| class UnauthenticatedException(HTTPException): | |
| def __init__(self): | |
| super().__init__( | |
| status_code=status.HTTP_401_UNAUTHORIZED, detail="Requires authentication" | |
| ) | |
| class Auth0HTTPBearer(HTTPBearer): | |
| async def __call__(self, request: Request): | |
| return await super().__call__(request) | |
| class VerifyToken: | |
| USER_PERMISSIONS = [] | |
| def __init__(self): | |
| jwks_url = f"https://{env.AUTH0_DOMAIN}/.well-known/jwks.json" | |
| self.jwks_client = jwt.PyJWKClient(jwks_url) | |
| async def verify( | |
| self, | |
| security_scopes: SecurityScopes, | |
| token=Depends(Auth0HTTPBearer(auto_error=False)), | |
| ): | |
| if token is None: | |
| raise UnauthenticatedException | |
| token = token.credentials | |
| try: | |
| signing_key = self.jwks_client.get_signing_key_from_jwt(token).key | |
| except jwt.exceptions.PyJWKClientError as error: | |
| raise UnauthorizedException(str(error)) | |
| except jwt.exceptions.DecodeError as error: | |
| raise UnauthorizedException(str(error)) | |
| try: | |
| payload = jwt.decode( | |
| token, | |
| signing_key, | |
| algorithms=env.AUTH0_ALGORITHMS, | |
| audience=env.AUTH0_API_AUDIENCE, | |
| issuer=env.AUTH0_ISSUER, | |
| ) | |
| except Exception as error: | |
| raise UnauthorizedException(str(error)) | |
| if len(security_scopes.scopes) > 0: | |
| self._check_claims(payload, "scope", security_scopes.scopes) | |
| self.USER_PERMISSIONS = payload.get("permissions") | |
| return payload | |
| def _check_claims(self, payload, claim_name, expected_value): | |
| if claim_name not in payload: | |
| raise UnauthorizedException( | |
| detail=f'No claim "{claim_name}" found in token' | |
| ) | |
| payload_claim = payload[claim_name] | |
| if claim_name == "scope": | |
| payload_claim = payload[claim_name].split(" ") | |
| for value in expected_value: | |
| if value not in payload_claim: | |
| raise UnauthorizedException(detail=f'Missing "{claim_name}" scope') | |
| def is_loan_officer(self): | |
| for permission in LOAN_OFFICER_PERMISSIONS: | |
| if permission not in self.USER_PERMISSIONS: | |
| raise UnauthorizedException(detail="Permission denied") | |