Spaces:
Paused
Paused
| from authlib.consts import default_json_headers | |
| from ..requests import OAuth2Request | |
| from ..errors import InvalidRequestError | |
| class BaseGrant: | |
| #: Allowed client auth methods for token endpoint | |
| TOKEN_ENDPOINT_AUTH_METHODS = ['client_secret_basic'] | |
| #: Designed for which "grant_type" | |
| GRANT_TYPE = None | |
| # NOTE: there is no charset for application/json, since | |
| # application/json should always in UTF-8. | |
| # The example on RFC is incorrect. | |
| # https://tools.ietf.org/html/rfc4627 | |
| TOKEN_RESPONSE_HEADER = default_json_headers | |
| def __init__(self, request: OAuth2Request, server): | |
| self.prompt = None | |
| self.redirect_uri = None | |
| self.request = request | |
| self.server = server | |
| self._hooks = { | |
| 'after_validate_authorization_request': set(), | |
| 'after_validate_consent_request': set(), | |
| 'after_validate_token_request': set(), | |
| 'process_token': set(), | |
| } | |
| def client(self): | |
| return self.request.client | |
| def generate_token(self, user=None, scope=None, grant_type=None, | |
| expires_in=None, include_refresh_token=True): | |
| if grant_type is None: | |
| grant_type = self.GRANT_TYPE | |
| return self.server.generate_token( | |
| client=self.request.client, | |
| grant_type=grant_type, | |
| user=user, | |
| scope=scope, | |
| expires_in=expires_in, | |
| include_refresh_token=include_refresh_token, | |
| ) | |
| def authenticate_token_endpoint_client(self): | |
| """Authenticate client with the given methods for token endpoint. | |
| For example, the client makes the following HTTP request using TLS: | |
| .. code-block:: http | |
| POST /token HTTP/1.1 | |
| Host: server.example.com | |
| Authorization: Basic czZCaGRSa3F0MzpnWDFmQmF0M2JW | |
| Content-Type: application/x-www-form-urlencoded | |
| grant_type=authorization_code&code=SplxlOBeZQQYbYS6WxSbIA | |
| &redirect_uri=https%3A%2F%2Fclient%2Eexample%2Ecom%2Fcb | |
| Default available methods are: "none", "client_secret_basic" and | |
| "client_secret_post". | |
| :return: client | |
| """ | |
| client = self.server.authenticate_client( | |
| self.request, self.TOKEN_ENDPOINT_AUTH_METHODS) | |
| self.server.send_signal( | |
| 'after_authenticate_client', | |
| client=client, grant=self) | |
| return client | |
| def save_token(self, token): | |
| """A method to save token into database.""" | |
| return self.server.save_token(token, self.request) | |
| def validate_requested_scope(self): | |
| """Validate if requested scope is supported by Authorization Server.""" | |
| scope = self.request.scope | |
| state = self.request.state | |
| return self.server.validate_requested_scope(scope, state) | |
| def register_hook(self, hook_type, hook): | |
| if hook_type not in self._hooks: | |
| raise ValueError('Hook type %s is not in %s.', | |
| hook_type, self._hooks) | |
| self._hooks[hook_type].add(hook) | |
| def execute_hook(self, hook_type, *args, **kwargs): | |
| for hook in self._hooks[hook_type]: | |
| hook(self, *args, **kwargs) | |
| class TokenEndpointMixin: | |
| #: Allowed HTTP methods of this token endpoint | |
| TOKEN_ENDPOINT_HTTP_METHODS = ['POST'] | |
| #: Designed for which "grant_type" | |
| GRANT_TYPE = None | |
| def check_token_endpoint(cls, request: OAuth2Request): | |
| return request.grant_type == cls.GRANT_TYPE and \ | |
| request.method in cls.TOKEN_ENDPOINT_HTTP_METHODS | |
| def validate_token_request(self): | |
| raise NotImplementedError() | |
| def create_token_response(self): | |
| raise NotImplementedError() | |
| class AuthorizationEndpointMixin: | |
| RESPONSE_TYPES = set() | |
| ERROR_RESPONSE_FRAGMENT = False | |
| def check_authorization_endpoint(cls, request: OAuth2Request): | |
| return request.response_type in cls.RESPONSE_TYPES | |
| def validate_authorization_redirect_uri(request: OAuth2Request, client): | |
| if request.redirect_uri: | |
| if not client.check_redirect_uri(request.redirect_uri): | |
| raise InvalidRequestError( | |
| f'Redirect URI {request.redirect_uri} is not supported by client.', | |
| state=request.state) | |
| return request.redirect_uri | |
| else: | |
| redirect_uri = client.get_default_redirect_uri() | |
| if not redirect_uri: | |
| raise InvalidRequestError( | |
| 'Missing "redirect_uri" in request.', | |
| state=request.state) | |
| return redirect_uri | |
| def validate_consent_request(self): | |
| redirect_uri = self.validate_authorization_request() | |
| self.execute_hook('after_validate_consent_request', redirect_uri) | |
| self.redirect_uri = redirect_uri | |
| def validate_authorization_request(self): | |
| raise NotImplementedError() | |
| def create_authorization_response(self, redirect_uri: str, grant_user): | |
| raise NotImplementedError() | |