norae / docker /deezer-spleeter-env /lib /python3.10 /site-packages /google /oauth2 /gdch_credentials.py
| # Copyright 2022 Google LLC | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """Experimental GDCH credentials support. | |
| """ | |
| import datetime | |
| from google.auth import _helpers | |
| from google.auth import _service_account_info | |
| from google.auth import credentials | |
| from google.auth import exceptions | |
| from google.auth import jwt | |
| from google.oauth2 import _client | |
| TOKEN_EXCHANGE_TYPE = "urn:ietf:params:oauth:token-type:token-exchange" | |
| ACCESS_TOKEN_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token" | |
| SERVICE_ACCOUNT_TOKEN_TYPE = "urn:k8s:params:oauth:token-type:serviceaccount" | |
| JWT_LIFETIME = datetime.timedelta(seconds=3600) # 1 hour | |
| class ServiceAccountCredentials(credentials.Credentials): | |
| """Credentials for GDCH (`Google Distributed Cloud Hosted`_) for service | |
| account users. | |
| .. _Google Distributed Cloud Hosted: | |
| https://cloud.google.com/blog/topics/hybrid-cloud/\ | |
| announcing-google-distributed-cloud-edge-and-hosted | |
| To create a GDCH service account credential, first create a JSON file of | |
| the following format:: | |
| { | |
| "type": "gdch_service_account", | |
| "format_version": "1", | |
| "project": "<project name>", | |
| "private_key_id": "<key id>", | |
| "private_key": "-----BEGIN EC PRIVATE KEY-----\n<key bytes>\n-----END EC PRIVATE KEY-----\n", | |
| "name": "<service identity name>", | |
| "ca_cert_path": "<CA cert path>", | |
| "token_uri": "https://service-identity.<Domain>/authenticate" | |
| } | |
| The "format_version" field stands for the format of the JSON file. For now | |
| it is always "1". The `private_key_id` and `private_key` is used for signing. | |
| The `ca_cert_path` is used for token server TLS certificate verification. | |
| After the JSON file is created, set `GOOGLE_APPLICATION_CREDENTIALS` environment | |
| variable to the JSON file path, then use the following code to create the | |
| credential:: | |
| import google.auth | |
| credential, _ = google.auth.default() | |
| credential = credential.with_gdch_audience("<the audience>") | |
| We can also create the credential directly:: | |
| from google.oauth import gdch_credentials | |
| credential = gdch_credentials.ServiceAccountCredentials.from_service_account_file("<the json file path>") | |
| credential = credential.with_gdch_audience("<the audience>") | |
| The token is obtained in the following way. This class first creates a | |
| self signed JWT. It uses the `name` value as the `iss` and `sub` claim, and | |
| the `token_uri` as the `aud` claim, and signs the JWT with the `private_key`. | |
| It then sends the JWT to the `token_uri` to exchange a final token for | |
| `audience`. | |
| """ | |
| def __init__( | |
| self, signer, service_identity_name, project, audience, token_uri, ca_cert_path | |
| ): | |
| """ | |
| Args: | |
| signer (google.auth.crypt.Signer): The signer used to sign JWTs. | |
| service_identity_name (str): The service identity name. It will be | |
| used as the `iss` and `sub` claim in the self signed JWT. | |
| project (str): The project. | |
| audience (str): The audience for the final token. | |
| token_uri (str): The token server uri. | |
| ca_cert_path (str): The CA cert path for token server side TLS | |
| certificate verification. If the token server uses well known | |
| CA, then this parameter can be `None`. | |
| """ | |
| super(ServiceAccountCredentials, self).__init__() | |
| self._signer = signer | |
| self._service_identity_name = service_identity_name | |
| self._project = project | |
| self._audience = audience | |
| self._token_uri = token_uri | |
| self._ca_cert_path = ca_cert_path | |
| def _create_jwt(self): | |
| now = _helpers.utcnow() | |
| expiry = now + JWT_LIFETIME | |
| iss_sub_value = "system:serviceaccount:{}:{}".format( | |
| self._project, self._service_identity_name | |
| ) | |
| payload = { | |
| "iss": iss_sub_value, | |
| "sub": iss_sub_value, | |
| "aud": self._token_uri, | |
| "iat": _helpers.datetime_to_secs(now), | |
| "exp": _helpers.datetime_to_secs(expiry), | |
| } | |
| return _helpers.from_bytes(jwt.encode(self._signer, payload)) | |
| def refresh(self, request): | |
| import google.auth.transport.requests | |
| if not isinstance(request, google.auth.transport.requests.Request): | |
| raise exceptions.RefreshError( | |
| "For GDCH service account credentials, request must be a google.auth.transport.requests.Request object" | |
| ) | |
| # Create a self signed JWT, and do token exchange. | |
| jwt_token = self._create_jwt() | |
| request_body = { | |
| "grant_type": TOKEN_EXCHANGE_TYPE, | |
| "audience": self._audience, | |
| "requested_token_type": ACCESS_TOKEN_TOKEN_TYPE, | |
| "subject_token": jwt_token, | |
| "subject_token_type": SERVICE_ACCOUNT_TOKEN_TYPE, | |
| } | |
| response_data = _client._token_endpoint_request( | |
| request, | |
| self._token_uri, | |
| request_body, | |
| access_token=None, | |
| use_json=True, | |
| verify=self._ca_cert_path, | |
| ) | |
| self.token, _, self.expiry, _ = _client._handle_refresh_grant_response( | |
| response_data, None | |
| ) | |
| def with_gdch_audience(self, audience): | |
| """Create a copy of GDCH credentials with the specified audience. | |
| Args: | |
| audience (str): The intended audience for GDCH credentials. | |
| """ | |
| return self.__class__( | |
| self._signer, | |
| self._service_identity_name, | |
| self._project, | |
| audience, | |
| self._token_uri, | |
| self._ca_cert_path, | |
| ) | |
| def _from_signer_and_info(cls, signer, info): | |
| """Creates a Credentials instance from a signer and service account | |
| info. | |
| Args: | |
| signer (google.auth.crypt.Signer): The signer used to sign JWTs. | |
| info (Mapping[str, str]): The service account info. | |
| Returns: | |
| google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed | |
| credentials. | |
| Raises: | |
| ValueError: If the info is not in the expected format. | |
| """ | |
| if info["format_version"] != "1": | |
| raise ValueError("Only format version 1 is supported") | |
| return cls( | |
| signer, | |
| info["name"], # service_identity_name | |
| info["project"], | |
| None, # audience | |
| info["token_uri"], | |
| info.get("ca_cert_path", None), | |
| ) | |
| def from_service_account_info(cls, info): | |
| """Creates a Credentials instance from parsed service account info. | |
| Args: | |
| info (Mapping[str, str]): The service account info in Google | |
| format. | |
| kwargs: Additional arguments to pass to the constructor. | |
| Returns: | |
| google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed | |
| credentials. | |
| Raises: | |
| ValueError: If the info is not in the expected format. | |
| """ | |
| signer = _service_account_info.from_dict( | |
| info, | |
| require=[ | |
| "format_version", | |
| "private_key_id", | |
| "private_key", | |
| "name", | |
| "project", | |
| "token_uri", | |
| ], | |
| use_rsa_signer=False, | |
| ) | |
| return cls._from_signer_and_info(signer, info) | |
| def from_service_account_file(cls, filename): | |
| """Creates a Credentials instance from a service account json file. | |
| Args: | |
| filename (str): The path to the service account json file. | |
| kwargs: Additional arguments to pass to the constructor. | |
| Returns: | |
| google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed | |
| credentials. | |
| """ | |
| info, signer = _service_account_info.from_filename( | |
| filename, | |
| require=[ | |
| "format_version", | |
| "private_key_id", | |
| "private_key", | |
| "name", | |
| "project", | |
| "token_uri", | |
| ], | |
| use_rsa_signer=False, | |
| ) | |
| return cls._from_signer_and_info(signer, info) | |