| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import base64 |
| import datetime |
| import getpass |
| import json |
| import logging |
| import os |
| import subprocess |
| import threading |
| import time |
| import uuid |
| from collections import namedtuple |
| from copy import deepcopy |
| from hashlib import sha1, sha256 |
|
|
| import dateutil.parser |
| from dateutil.parser import parse |
| from dateutil.tz import tzlocal, tzutc |
|
|
| import botocore.compat |
| import botocore.configloader |
| from botocore import UNSIGNED |
| from botocore.compat import ( |
| EC, |
| compat_shell_split, |
| total_seconds, |
| ) |
| from botocore.config import Config |
| from botocore.exceptions import ( |
| ConfigNotFound, |
| CredentialRetrievalError, |
| InfiniteLoopConfigError, |
| InvalidConfigError, |
| LoginError, |
| LoginInsufficientPermissions, |
| LoginRefreshRequired, |
| LoginTokenLoadError, |
| MetadataRetrievalError, |
| MissingDependencyException, |
| PartialCredentialsError, |
| RefreshWithMFAUnsupportedError, |
| UnauthorizedSSOTokenError, |
| UnknownCredentialError, |
| ) |
| from botocore.tokens import SSOTokenProvider |
| from botocore.useragent import register_feature_id, register_feature_ids |
| from botocore.utils import ( |
| ArnParser, |
| ContainerMetadataFetcher, |
| FileWebIdentityTokenLoader, |
| InstanceMetadataFetcher, |
| JSONFileCache, |
| LoginTokenLoader, |
| SSOTokenLoader, |
| create_nested_client, |
| get_login_token_cache_directory, |
| parse_key_val_file, |
| resolve_imds_endpoint_mode, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
| ReadOnlyCredentials = namedtuple( |
| 'ReadOnlyCredentials', |
| ['access_key', 'secret_key', 'token', 'account_id'], |
| defaults=(None,), |
| ) |
|
|
| _DEFAULT_MANDATORY_REFRESH_TIMEOUT = 10 * 60 |
| _DEFAULT_ADVISORY_REFRESH_TIMEOUT = 15 * 60 |
|
|
|
|
| def create_credential_resolver(session, cache=None, region_name=None): |
| """Create a default credential resolver. |
| |
| This creates a pre-configured credential resolver |
| that includes the default lookup chain for |
| credentials. |
| |
| """ |
| profile_name = session.get_config_variable('profile') or 'default' |
| metadata_timeout = session.get_config_variable('metadata_service_timeout') |
| num_attempts = session.get_config_variable('metadata_service_num_attempts') |
| disable_env_vars = session.instance_variables().get('profile') is not None |
|
|
| imds_config = { |
| 'ec2_metadata_service_endpoint': session.get_config_variable( |
| 'ec2_metadata_service_endpoint' |
| ), |
| 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode( |
| session |
| ), |
| 'ec2_credential_refresh_window': _DEFAULT_ADVISORY_REFRESH_TIMEOUT, |
| 'ec2_metadata_v1_disabled': session.get_config_variable( |
| 'ec2_metadata_v1_disabled' |
| ), |
| } |
|
|
| if cache is None: |
| cache = {} |
|
|
| env_provider = EnvProvider() |
| container_provider = ContainerProvider() |
| instance_metadata_provider = InstanceMetadataProvider( |
| iam_role_fetcher=InstanceMetadataFetcher( |
| timeout=metadata_timeout, |
| num_attempts=num_attempts, |
| user_agent=session.user_agent(), |
| config=imds_config, |
| ) |
| ) |
|
|
| profile_provider_builder = ProfileProviderBuilder( |
| session, cache=cache, region_name=region_name |
| ) |
| assume_role_provider = AssumeRoleProvider( |
| load_config=lambda: session.full_config, |
| client_creator=_get_client_creator(session, region_name), |
| cache=cache, |
| profile_name=profile_name, |
| credential_sourcer=CanonicalNameCredentialSourcer( |
| [env_provider, container_provider, instance_metadata_provider] |
| ), |
| profile_provider_builder=profile_provider_builder, |
| ) |
|
|
| pre_profile = [ |
| env_provider, |
| assume_role_provider, |
| ] |
| profile_providers = profile_provider_builder.providers( |
| profile_name=profile_name, |
| disable_env_vars=disable_env_vars, |
| ) |
| post_profile = [ |
| OriginalEC2Provider(), |
| BotoProvider(), |
| container_provider, |
| instance_metadata_provider, |
| ] |
| providers = pre_profile + profile_providers + post_profile |
|
|
| if disable_env_vars: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| providers.remove(env_provider) |
| logger.debug( |
| 'Skipping environment variable credential check' |
| ' because profile name was explicitly set.' |
| ) |
|
|
| resolver = CredentialResolver(providers=providers) |
| return resolver |
|
|
|
|
| class ProfileProviderBuilder: |
| """This class handles the creation of profile based providers. |
| |
| NOTE: This class is only intended for internal use. |
| |
| This class handles the creation and ordering of the various credential |
| providers that primarly source their configuration from the shared config. |
| This is needed to enable sharing between the default credential chain and |
| the source profile chain created by the assume role provider. |
| """ |
|
|
| def __init__( |
| self, |
| session, |
| cache=None, |
| region_name=None, |
| sso_token_cache=None, |
| login_token_cache=None, |
| ): |
| self._session = session |
| self._cache = cache |
| self._region_name = region_name |
| self._sso_token_cache = sso_token_cache |
| self._login_token_cache = login_token_cache |
|
|
| def providers(self, profile_name, disable_env_vars=False): |
| return [ |
| self._create_web_identity_provider( |
| profile_name, |
| disable_env_vars, |
| ), |
| self._create_sso_provider(profile_name), |
| self._create_shared_credential_provider(profile_name), |
| self._create_login_provider(profile_name), |
| self._create_process_provider(profile_name), |
| self._create_config_provider(profile_name), |
| ] |
|
|
| def _create_process_provider(self, profile_name): |
| return ProcessProvider( |
| profile_name=profile_name, |
| load_config=lambda: self._session.full_config, |
| ) |
|
|
| def _create_shared_credential_provider(self, profile_name): |
| credential_file = self._session.get_config_variable('credentials_file') |
| return SharedCredentialProvider( |
| profile_name=profile_name, |
| creds_filename=credential_file, |
| ) |
|
|
| def _create_config_provider(self, profile_name): |
| config_file = self._session.get_config_variable('config_file') |
| return ConfigProvider( |
| profile_name=profile_name, |
| config_filename=config_file, |
| ) |
|
|
| def _create_web_identity_provider(self, profile_name, disable_env_vars): |
| return AssumeRoleWithWebIdentityProvider( |
| load_config=lambda: self._session.full_config, |
| client_creator=_get_client_creator( |
| self._session, self._region_name |
| ), |
| cache=self._cache, |
| profile_name=profile_name, |
| disable_env_vars=disable_env_vars, |
| ) |
|
|
| def _create_sso_provider(self, profile_name): |
| return SSOProvider( |
| load_config=lambda: self._session.full_config, |
| client_creator=self._session.create_client, |
| profile_name=profile_name, |
| cache=self._cache, |
| token_cache=self._sso_token_cache, |
| token_provider=SSOTokenProvider( |
| self._session, |
| cache=self._sso_token_cache, |
| profile_name=profile_name, |
| ), |
| ) |
|
|
| def _create_login_provider(self, profile_name): |
| return LoginProvider( |
| load_config=lambda: self._session.full_config, |
| client_creator=self._session.create_client, |
| profile_name=profile_name, |
| token_cache=self._login_token_cache, |
| ) |
|
|
|
|
| def get_credentials(session): |
| resolver = create_credential_resolver(session) |
| return resolver.load_credentials() |
|
|
|
|
| def _local_now(): |
| return datetime.datetime.now(tzlocal()) |
|
|
|
|
| def _parse_if_needed(value): |
| if isinstance(value, datetime.datetime): |
| return value |
| return parse(value) |
|
|
|
|
| def _serialize_if_needed(value, iso=False): |
| if isinstance(value, datetime.datetime): |
| if iso: |
| return value.isoformat() |
| return value.strftime('%Y-%m-%dT%H:%M:%S%Z') |
| return value |
|
|
|
|
| def _get_client_creator(session, region_name): |
| def client_creator(service_name, **kwargs): |
| create_client_kwargs = {'region_name': region_name} |
| create_client_kwargs.update(**kwargs) |
| return create_nested_client( |
| session, service_name, **create_client_kwargs |
| ) |
|
|
| return client_creator |
|
|
|
|
| def create_assume_role_refresher(client, params): |
| def refresh(): |
| response = client.assume_role(**params) |
| credentials = response['Credentials'] |
| |
| |
| return { |
| 'access_key': credentials['AccessKeyId'], |
| 'secret_key': credentials['SecretAccessKey'], |
| 'token': credentials['SessionToken'], |
| 'expiry_time': _serialize_if_needed(credentials['Expiration']), |
| } |
|
|
| return refresh |
|
|
|
|
| def create_mfa_serial_refresher(actual_refresh): |
| class _Refresher: |
| def __init__(self, refresh): |
| self._refresh = refresh |
| self._has_been_called = False |
|
|
| def __call__(self): |
| if self._has_been_called: |
| |
| |
| |
| raise RefreshWithMFAUnsupportedError() |
| self._has_been_called = True |
| return self._refresh() |
|
|
| return _Refresher(actual_refresh) |
|
|
|
|
| class Credentials: |
| """ |
| Holds the credentials needed to authenticate requests. |
| |
| :param str access_key: The access key part of the credentials. |
| :param str secret_key: The secret key part of the credentials. |
| :param str token: The security token, valid only for session credentials. |
| :param str method: A string which identifies where the credentials |
| were found. |
| :param str account_id: (optional) An account ID associated with the credentials. |
| """ |
|
|
| def __init__( |
| self, access_key, secret_key, token=None, method=None, account_id=None |
| ): |
| self.access_key = access_key |
| self.secret_key = secret_key |
| self.token = token |
|
|
| if method is None: |
| method = 'explicit' |
| self.method = method |
| self.account_id = account_id |
|
|
| self._normalize() |
|
|
| def _normalize(self): |
| |
| |
| |
| |
| |
| |
| self.access_key = botocore.compat.ensure_unicode(self.access_key) |
| self.secret_key = botocore.compat.ensure_unicode(self.secret_key) |
|
|
| def get_frozen_credentials(self): |
| return ReadOnlyCredentials( |
| self.access_key, self.secret_key, self.token, self.account_id |
| ) |
|
|
| def get_deferred_property(self, property_name): |
| def get_property(): |
| return getattr(self, property_name, None) |
|
|
| return get_property |
|
|
|
|
| class RefreshableCredentials(Credentials): |
| """ |
| Holds the credentials needed to authenticate requests. In addition, it |
| knows how to refresh itself. |
| |
| :param str access_key: The access key part of the credentials. |
| :param str secret_key: The secret key part of the credentials. |
| :param str token: The security token, valid only for session credentials. |
| :param datetime expiry_time: The expiration time of the credentials. |
| :param function refresh_using: Callback function to refresh the credentials. |
| :param str method: A string which identifies where the credentials |
| were found. |
| :param function time_fetcher: Callback function to retrieve current time. |
| """ |
|
|
| |
| |
| _advisory_refresh_timeout = _DEFAULT_ADVISORY_REFRESH_TIMEOUT |
| |
| |
| _mandatory_refresh_timeout = _DEFAULT_MANDATORY_REFRESH_TIMEOUT |
|
|
| def __init__( |
| self, |
| access_key, |
| secret_key, |
| token, |
| expiry_time, |
| refresh_using, |
| method, |
| time_fetcher=_local_now, |
| advisory_timeout=None, |
| mandatory_timeout=None, |
| account_id=None, |
| ): |
| self._refresh_using = refresh_using |
| self._access_key = access_key |
| self._secret_key = secret_key |
| self._token = token |
| self._account_id = account_id |
| self._expiry_time = expiry_time |
| self._time_fetcher = time_fetcher |
| self._refresh_lock = threading.Lock() |
| self.method = method |
| self._frozen_credentials = ReadOnlyCredentials( |
| access_key, secret_key, token, account_id |
| ) |
| self._normalize() |
| if advisory_timeout is not None: |
| self._advisory_refresh_timeout = advisory_timeout |
| if mandatory_timeout is not None: |
| self._mandatory_refresh_timeout = mandatory_timeout |
|
|
| def _normalize(self): |
| self._access_key = botocore.compat.ensure_unicode(self._access_key) |
| self._secret_key = botocore.compat.ensure_unicode(self._secret_key) |
|
|
| @classmethod |
| def create_from_metadata( |
| cls, |
| metadata, |
| refresh_using, |
| method, |
| advisory_timeout=None, |
| mandatory_timeout=None, |
| ): |
| kwargs = {} |
| if advisory_timeout is not None: |
| kwargs['advisory_timeout'] = advisory_timeout |
| if mandatory_timeout is not None: |
| kwargs['mandatory_timeout'] = mandatory_timeout |
|
|
| instance = cls( |
| access_key=metadata['access_key'], |
| secret_key=metadata['secret_key'], |
| token=metadata['token'], |
| expiry_time=cls._expiry_datetime(metadata['expiry_time']), |
| method=method, |
| refresh_using=refresh_using, |
| account_id=metadata.get('account_id'), |
| **kwargs, |
| ) |
| return instance |
|
|
| @property |
| def access_key(self): |
| """Warning: Using this property can lead to race conditions if you |
| access another property subsequently along the refresh boundary. |
| Please use get_frozen_credentials instead. |
| """ |
| self._refresh() |
| return self._access_key |
|
|
| @access_key.setter |
| def access_key(self, value): |
| self._access_key = value |
|
|
| @property |
| def secret_key(self): |
| """Warning: Using this property can lead to race conditions if you |
| access another property subsequently along the refresh boundary. |
| Please use get_frozen_credentials instead. |
| """ |
| self._refresh() |
| return self._secret_key |
|
|
| @secret_key.setter |
| def secret_key(self, value): |
| self._secret_key = value |
|
|
| @property |
| def token(self): |
| """Warning: Using this property can lead to race conditions if you |
| access another property subsequently along the refresh boundary. |
| Please use get_frozen_credentials instead. |
| """ |
| self._refresh() |
| return self._token |
|
|
| @token.setter |
| def token(self, value): |
| self._token = value |
|
|
| @property |
| def account_id(self): |
| """Warning: Using this property can lead to race conditions if you |
| access another property subsequently along the refresh boundary. |
| Please use get_frozen_credentials instead. |
| """ |
| self._refresh() |
| return self._account_id |
|
|
| @account_id.setter |
| def account_id(self, value): |
| self._account_id = value |
|
|
| def _seconds_remaining(self): |
| delta = self._expiry_time - self._time_fetcher() |
| return total_seconds(delta) |
|
|
| def refresh_needed(self, refresh_in=None): |
| """Check if a refresh is needed. |
| |
| A refresh is needed if the expiry time associated |
| with the temporary credentials is less than the |
| provided ``refresh_in``. If ``time_delta`` is not |
| provided, ``self.advisory_refresh_needed`` will be used. |
| |
| For example, if your temporary credentials expire |
| in 10 minutes and the provided ``refresh_in`` is |
| ``15 * 60``, then this function will return ``True``. |
| |
| :type refresh_in: int |
| :param refresh_in: The number of seconds before the |
| credentials expire in which refresh attempts should |
| be made. |
| |
| :return: True if refresh needed, False otherwise. |
| |
| """ |
| if self._expiry_time is None: |
| |
| return False |
|
|
| if refresh_in is None: |
| refresh_in = self._advisory_refresh_timeout |
| |
| |
| if self._seconds_remaining() >= refresh_in: |
| |
| return False |
| logger.debug("Credentials need to be refreshed.") |
| return True |
|
|
| def _is_expired(self): |
| |
| return self.refresh_needed(refresh_in=0) |
|
|
| def _refresh(self): |
| |
| |
| |
| if not self.refresh_needed(self._advisory_refresh_timeout): |
| return |
|
|
| |
| |
| |
| |
| if self._refresh_lock.acquire(False): |
| try: |
| if not self.refresh_needed(self._advisory_refresh_timeout): |
| return |
| is_mandatory_refresh = self.refresh_needed( |
| self._mandatory_refresh_timeout |
| ) |
| self._protected_refresh(is_mandatory=is_mandatory_refresh) |
| return |
| finally: |
| self._refresh_lock.release() |
| elif self.refresh_needed(self._mandatory_refresh_timeout): |
| |
| |
| with self._refresh_lock: |
| if not self.refresh_needed(self._mandatory_refresh_timeout): |
| return |
| self._protected_refresh(is_mandatory=True) |
|
|
| def _protected_refresh(self, is_mandatory): |
| |
| |
| try: |
| metadata = self._refresh_using() |
| except Exception: |
| period_name = 'mandatory' if is_mandatory else 'advisory' |
| logger.warning( |
| "Refreshing temporary credentials failed " |
| "during %s refresh period.", |
| period_name, |
| exc_info=True, |
| ) |
| if is_mandatory: |
| |
| |
| |
| raise |
| |
| |
| |
| return |
| self._set_from_data(metadata) |
| self._frozen_credentials = ReadOnlyCredentials( |
| self._access_key, self._secret_key, self._token, self._account_id |
| ) |
| if self._is_expired(): |
| |
| |
| |
| |
| |
| msg = ( |
| "Credentials were refreshed, but the " |
| "refreshed credentials are still expired." |
| ) |
| logger.warning(msg) |
| raise RuntimeError(msg) |
|
|
| @staticmethod |
| def _expiry_datetime(time_str): |
| return parse(time_str) |
|
|
| def _set_from_data(self, data): |
| expected_keys = ['access_key', 'secret_key', 'token', 'expiry_time'] |
| if not data: |
| missing_keys = expected_keys |
| else: |
| missing_keys = [k for k in expected_keys if k not in data] |
|
|
| if missing_keys: |
| message = "Credential refresh failed, response did not contain: %s" |
| raise CredentialRetrievalError( |
| provider=self.method, |
| error_msg=message % ', '.join(missing_keys), |
| ) |
|
|
| self.access_key = data['access_key'] |
| self.secret_key = data['secret_key'] |
| self.token = data['token'] |
| self._expiry_time = parse(data['expiry_time']) |
| self.account_id = data.get('account_id') |
| logger.debug( |
| "Retrieved credentials will expire at: %s", self._expiry_time |
| ) |
| self._normalize() |
|
|
| def get_frozen_credentials(self): |
| """Return immutable credentials. |
| |
| The ``access_key``, ``secret_key``, and ``token`` properties |
| on this class will always check and refresh credentials if |
| needed before returning the particular credentials. |
| |
| This has an edge case where you can get inconsistent |
| credentials. Imagine this: |
| |
| # Current creds are "t1" |
| tmp.access_key ---> expired? no, so return t1.access_key |
| # ---- time is now expired, creds need refreshing to "t2" ---- |
| tmp.secret_key ---> expired? yes, refresh and return t2.secret_key |
| |
| This means we're using the access key from t1 with the secret key |
| from t2. To fix this issue, you can request a frozen credential object |
| which is guaranteed not to change. |
| |
| The frozen credentials returned from this method should be used |
| immediately and then discarded. The typical usage pattern would |
| be:: |
| |
| creds = RefreshableCredentials(...) |
| some_code = SomeSignerObject() |
| # I'm about to sign the request. |
| # The frozen credentials are only used for the |
| # duration of generate_presigned_url and will be |
| # immediately thrown away. |
| request = some_code.sign_some_request( |
| with_credentials=creds.get_frozen_credentials()) |
| print("Signed request:", request) |
| |
| """ |
| self._refresh() |
| return self._frozen_credentials |
|
|
|
|
| class DeferredRefreshableCredentials(RefreshableCredentials): |
| """Refreshable credentials that don't require initial credentials. |
| |
| refresh_using will be called upon first access. |
| """ |
|
|
| def __init__(self, refresh_using, method, time_fetcher=_local_now): |
| self._refresh_using = refresh_using |
| self._access_key = None |
| self._secret_key = None |
| self._token = None |
| self._account_id = None |
| self._expiry_time = None |
| self._time_fetcher = time_fetcher |
| self._refresh_lock = threading.Lock() |
| self.method = method |
| self._frozen_credentials = None |
|
|
| def refresh_needed(self, refresh_in=None): |
| if self._frozen_credentials is None: |
| return True |
| return super().refresh_needed(refresh_in) |
|
|
|
|
| class CachedCredentialFetcher: |
| DEFAULT_EXPIRY_WINDOW_SECONDS = 60 * 15 |
|
|
| def __init__(self, cache=None, expiry_window_seconds=None): |
| if cache is None: |
| cache = {} |
| self._cache = cache |
| self._cache_key = self._create_cache_key() |
| if expiry_window_seconds is None: |
| expiry_window_seconds = self.DEFAULT_EXPIRY_WINDOW_SECONDS |
| self._expiry_window_seconds = expiry_window_seconds |
| self.feature_ids = set() |
|
|
| def _create_cache_key(self): |
| raise NotImplementedError('_create_cache_key()') |
|
|
| def _make_file_safe(self, filename): |
| |
| filename = filename.replace(':', '_').replace(os.sep, '_') |
| return filename.replace('/', '_') |
|
|
| def _get_credentials(self): |
| raise NotImplementedError('_get_credentials()') |
|
|
| def fetch_credentials(self): |
| return self._get_cached_credentials() |
|
|
| def _get_cached_credentials(self): |
| """Get up-to-date credentials. |
| |
| This will check the cache for up-to-date credentials, calling assume |
| role if none are available. |
| """ |
| response = self._load_from_cache() |
| if response is None: |
| response = self._get_credentials() |
| self._write_to_cache(response) |
| else: |
| logger.debug("Credentials for role retrieved from cache.") |
|
|
| creds = response['Credentials'] |
| expiration = _serialize_if_needed(creds['Expiration'], iso=True) |
| credentials = { |
| 'access_key': creds['AccessKeyId'], |
| 'secret_key': creds['SecretAccessKey'], |
| 'token': creds['SessionToken'], |
| 'expiry_time': expiration, |
| 'account_id': creds.get('AccountId'), |
| } |
|
|
| return credentials |
|
|
| def _load_from_cache(self): |
| if self._cache_key in self._cache: |
| creds = deepcopy(self._cache[self._cache_key]) |
| if not self._is_expired(creds): |
| return creds |
| else: |
| logger.debug( |
| "Credentials were found in cache, but they are expired." |
| ) |
| return None |
|
|
| def _write_to_cache(self, response): |
| self._cache[self._cache_key] = deepcopy(response) |
|
|
| def _is_expired(self, credentials): |
| """Check if credentials are expired.""" |
| end_time = _parse_if_needed(credentials['Credentials']['Expiration']) |
| seconds = total_seconds(end_time - _local_now()) |
| return seconds < self._expiry_window_seconds |
|
|
|
|
| class BaseAssumeRoleCredentialFetcher(CachedCredentialFetcher): |
| def __init__( |
| self, |
| client_creator, |
| role_arn, |
| extra_args=None, |
| cache=None, |
| expiry_window_seconds=None, |
| ): |
| self._client_creator = client_creator |
| self._role_arn = role_arn |
|
|
| if extra_args is None: |
| self._assume_kwargs = {} |
| else: |
| self._assume_kwargs = deepcopy(extra_args) |
| self._assume_kwargs['RoleArn'] = self._role_arn |
|
|
| self._role_session_name = self._assume_kwargs.get('RoleSessionName') |
| self._using_default_session_name = False |
| if not self._role_session_name: |
| self._generate_assume_role_name() |
|
|
| super().__init__(cache, expiry_window_seconds) |
|
|
| def _generate_assume_role_name(self): |
| self._role_session_name = f'botocore-session-{int(time.time())}' |
| self._assume_kwargs['RoleSessionName'] = self._role_session_name |
| self._using_default_session_name = True |
|
|
| def _create_cache_key(self): |
| """Create a predictable cache key for the current configuration. |
| |
| The cache key is intended to be compatible with file names. |
| """ |
| args = deepcopy(self._assume_kwargs) |
|
|
| |
| |
| if self._using_default_session_name: |
| del args['RoleSessionName'] |
|
|
| if 'Policy' in args: |
| |
| |
| |
| args['Policy'] = json.loads(args['Policy']) |
|
|
| args = json.dumps(args, sort_keys=True) |
| argument_hash = sha1(args.encode('utf-8')).hexdigest() |
| return self._make_file_safe(argument_hash) |
|
|
| def _add_account_id_to_response(self, response): |
| role_arn = response.get('AssumedRoleUser', {}).get('Arn') |
| if ArnParser.is_arn(role_arn): |
| arn_parser = ArnParser() |
| account_id = arn_parser.parse_arn(role_arn)['account'] |
| response['Credentials']['AccountId'] = account_id |
| else: |
| logger.debug("Unable to extract account ID from Arn: %s", role_arn) |
|
|
|
|
| class AssumeRoleCredentialFetcher(BaseAssumeRoleCredentialFetcher): |
| def __init__( |
| self, |
| client_creator, |
| source_credentials, |
| role_arn, |
| extra_args=None, |
| mfa_prompter=None, |
| cache=None, |
| expiry_window_seconds=None, |
| ): |
| """ |
| :type client_creator: callable |
| :param client_creator: A callable that creates a client taking |
| arguments like ``Session.create_client``. |
| |
| :type source_credentials: Credentials |
| :param source_credentials: The credentials to use to create the |
| client for the call to AssumeRole. |
| |
| :type role_arn: str |
| :param role_arn: The ARN of the role to be assumed. |
| |
| :type extra_args: dict |
| :param extra_args: Any additional arguments to add to the assume |
| role request using the format of the botocore operation. |
| Possible keys include, but may not be limited to, |
| DurationSeconds, Policy, SerialNumber, ExternalId and |
| RoleSessionName. |
| |
| :type mfa_prompter: callable |
| :param mfa_prompter: A callable that returns input provided by the |
| user (i.e raw_input, getpass.getpass, etc.). |
| |
| :type cache: dict |
| :param cache: An object that supports ``__getitem__``, |
| ``__setitem__``, and ``__contains__``. An example of this is |
| the ``JSONFileCache`` class in aws-cli. |
| |
| :type expiry_window_seconds: int |
| :param expiry_window_seconds: The amount of time, in seconds, |
| """ |
| self._source_credentials = source_credentials |
| self._mfa_prompter = mfa_prompter |
| if self._mfa_prompter is None: |
| self._mfa_prompter = getpass.getpass |
|
|
| super().__init__( |
| client_creator, |
| role_arn, |
| extra_args=extra_args, |
| cache=cache, |
| expiry_window_seconds=expiry_window_seconds, |
| ) |
|
|
| def _get_credentials(self): |
| """Get credentials by calling assume role.""" |
| register_feature_ids(self.feature_ids) |
| kwargs = self._assume_role_kwargs() |
| client = self._create_client() |
| response = client.assume_role(**kwargs) |
| self._add_account_id_to_response(response) |
| return response |
|
|
| def _assume_role_kwargs(self): |
| """Get the arguments for assume role based on current configuration.""" |
| assume_role_kwargs = deepcopy(self._assume_kwargs) |
|
|
| mfa_serial = assume_role_kwargs.get('SerialNumber') |
|
|
| if mfa_serial is not None: |
| prompt = f'Enter MFA code for {mfa_serial}: ' |
| token_code = self._mfa_prompter(prompt) |
| assume_role_kwargs['TokenCode'] = token_code |
|
|
| duration_seconds = assume_role_kwargs.get('DurationSeconds') |
|
|
| if duration_seconds is not None: |
| assume_role_kwargs['DurationSeconds'] = duration_seconds |
|
|
| return assume_role_kwargs |
|
|
| def _create_client(self): |
| """Create an STS client using the source credentials.""" |
| frozen_credentials = self._source_credentials.get_frozen_credentials() |
| return self._client_creator( |
| 'sts', |
| aws_access_key_id=frozen_credentials.access_key, |
| aws_secret_access_key=frozen_credentials.secret_key, |
| aws_session_token=frozen_credentials.token, |
| ) |
|
|
|
|
| class AssumeRoleWithWebIdentityCredentialFetcher( |
| BaseAssumeRoleCredentialFetcher |
| ): |
| def __init__( |
| self, |
| client_creator, |
| web_identity_token_loader, |
| role_arn, |
| extra_args=None, |
| cache=None, |
| expiry_window_seconds=None, |
| ): |
| """ |
| :type client_creator: callable |
| :param client_creator: A callable that creates a client taking |
| arguments like ``Session.create_client``. |
| |
| :type web_identity_token_loader: callable |
| :param web_identity_token_loader: A callable that takes no arguments |
| and returns a web identity token str. |
| |
| :type role_arn: str |
| :param role_arn: The ARN of the role to be assumed. |
| |
| :type extra_args: dict |
| :param extra_args: Any additional arguments to add to the assume |
| role request using the format of the botocore operation. |
| Possible keys include, but may not be limited to, |
| DurationSeconds, Policy, SerialNumber, ExternalId and |
| RoleSessionName. |
| |
| :type cache: dict |
| :param cache: An object that supports ``__getitem__``, |
| ``__setitem__``, and ``__contains__``. An example of this is |
| the ``JSONFileCache`` class in aws-cli. |
| |
| :type expiry_window_seconds: int |
| :param expiry_window_seconds: The amount of time, in seconds, |
| """ |
| self._web_identity_token_loader = web_identity_token_loader |
|
|
| super().__init__( |
| client_creator, |
| role_arn, |
| extra_args=extra_args, |
| cache=cache, |
| expiry_window_seconds=expiry_window_seconds, |
| ) |
|
|
| def _get_credentials(self): |
| """Get credentials by calling assume role.""" |
| register_feature_ids(self.feature_ids) |
| kwargs = self._assume_role_kwargs() |
| |
| |
| config = Config(signature_version=UNSIGNED) |
| client = self._client_creator('sts', config=config) |
| response = client.assume_role_with_web_identity(**kwargs) |
| self._add_account_id_to_response(response) |
| return response |
|
|
| def _assume_role_kwargs(self): |
| """Get the arguments for assume role based on current configuration.""" |
| assume_role_kwargs = deepcopy(self._assume_kwargs) |
| identity_token = self._web_identity_token_loader() |
| assume_role_kwargs['WebIdentityToken'] = identity_token |
|
|
| return assume_role_kwargs |
|
|
|
|
| class CredentialProvider: |
| |
| METHOD = None |
|
|
| |
| |
| |
| |
| |
| |
| CANONICAL_NAME = None |
|
|
| def __init__(self, session=None): |
| self.session = session |
|
|
| def load(self): |
| """ |
| Loads the credentials from their source & sets them on the object. |
| |
| Subclasses should implement this method (by reading from disk, the |
| environment, the network or wherever), returning ``True`` if they were |
| found & loaded. |
| |
| If not found, this method should return ``False``, indicating that the |
| ``CredentialResolver`` should fall back to the next available method. |
| |
| The default implementation does nothing, assuming the user has set the |
| ``access_key/secret_key/token`` themselves. |
| |
| :returns: Whether credentials were found & set |
| :rtype: Credentials |
| """ |
| return True |
|
|
| def _extract_creds_from_mapping(self, mapping, *key_names): |
| found = [] |
| for key_name in key_names: |
| try: |
| found.append(mapping[key_name]) |
| except KeyError: |
| raise PartialCredentialsError( |
| provider=self.METHOD, cred_var=key_name |
| ) |
| return found |
|
|
|
|
| class ProcessProvider(CredentialProvider): |
| METHOD = 'custom-process' |
|
|
| def __init__(self, profile_name, load_config, popen=subprocess.Popen): |
| self._profile_name = profile_name |
| self._load_config = load_config |
| self._loaded_config = None |
| self._popen = popen |
|
|
| def load(self): |
| credential_process = self._credential_process |
| if credential_process is None: |
| return |
|
|
| register_feature_id('CREDENTIALS_PROFILE_PROCESS') |
| creds_dict = self._retrieve_credentials_using(credential_process) |
| register_feature_id('CREDENTIALS_PROCESS') |
| if creds_dict.get('expiry_time') is not None: |
| return RefreshableCredentials.create_from_metadata( |
| creds_dict, |
| lambda: self._retrieve_credentials_using(credential_process), |
| self.METHOD, |
| ) |
|
|
| return Credentials( |
| access_key=creds_dict['access_key'], |
| secret_key=creds_dict['secret_key'], |
| token=creds_dict.get('token'), |
| method=self.METHOD, |
| account_id=creds_dict.get('account_id'), |
| ) |
|
|
| def _retrieve_credentials_using(self, credential_process): |
| |
| |
| process_list = compat_shell_split(credential_process) |
| p = self._popen( |
| process_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE |
| ) |
| stdout, stderr = p.communicate() |
| if p.returncode != 0: |
| raise CredentialRetrievalError( |
| provider=self.METHOD, error_msg=stderr.decode('utf-8') |
| ) |
| parsed = botocore.compat.json.loads(stdout.decode('utf-8')) |
| version = parsed.get('Version', '<Version key not provided>') |
| if version != 1: |
| raise CredentialRetrievalError( |
| provider=self.METHOD, |
| error_msg=( |
| f"Unsupported version '{version}' for credential process " |
| f"provider, supported versions: 1" |
| ), |
| ) |
| try: |
| return { |
| 'access_key': parsed['AccessKeyId'], |
| 'secret_key': parsed['SecretAccessKey'], |
| 'token': parsed.get('SessionToken'), |
| 'expiry_time': parsed.get('Expiration'), |
| 'account_id': self._get_account_id(parsed), |
| } |
| except KeyError as e: |
| raise CredentialRetrievalError( |
| provider=self.METHOD, |
| error_msg=f"Missing required key in response: {e}", |
| ) |
|
|
| @property |
| def _credential_process(self): |
| return self.profile_config.get('credential_process') |
|
|
| @property |
| def profile_config(self): |
| if self._loaded_config is None: |
| self._loaded_config = self._load_config() |
| profile_config = self._loaded_config.get('profiles', {}).get( |
| self._profile_name, {} |
| ) |
| return profile_config |
|
|
| def _get_account_id(self, parsed): |
| account_id = parsed.get('AccountId') |
| return account_id or self.profile_config.get('aws_account_id') |
|
|
|
|
| class InstanceMetadataProvider(CredentialProvider): |
| METHOD = 'iam-role' |
| CANONICAL_NAME = 'Ec2InstanceMetadata' |
|
|
| def __init__(self, iam_role_fetcher): |
| self._role_fetcher = iam_role_fetcher |
|
|
| def load(self): |
| fetcher = self._role_fetcher |
| |
| |
| |
| metadata = fetcher.retrieve_iam_role_credentials() |
| if not metadata: |
| return None |
| register_feature_id('CREDENTIALS_IMDS') |
| logger.info( |
| 'Found credentials from IAM Role: %s', metadata['role_name'] |
| ) |
| |
| |
| |
| creds = RefreshableCredentials.create_from_metadata( |
| metadata, |
| method=self.METHOD, |
| refresh_using=fetcher.retrieve_iam_role_credentials, |
| ) |
| return creds |
|
|
|
|
| class EnvProvider(CredentialProvider): |
| METHOD = 'env' |
| CANONICAL_NAME = 'Environment' |
| ACCESS_KEY = 'AWS_ACCESS_KEY_ID' |
| SECRET_KEY = 'AWS_SECRET_ACCESS_KEY' |
| |
| |
| TOKENS = ['AWS_SECURITY_TOKEN', 'AWS_SESSION_TOKEN'] |
| EXPIRY_TIME = 'AWS_CREDENTIAL_EXPIRATION' |
| ACCOUNT_ID = 'AWS_ACCOUNT_ID' |
|
|
| def __init__(self, environ=None, mapping=None): |
| """ |
| |
| :param environ: The environment variables (defaults to |
| ``os.environ`` if no value is provided). |
| :param mapping: An optional mapping of variable names to |
| environment variable names. Use this if you want to |
| change the mapping of access_key->AWS_ACCESS_KEY_ID, etc. |
| The dict can have up to 5 keys: |
| * ``access_key`` |
| * ``secret_key`` |
| * ``token`` |
| * ``expiry_time`` |
| * ``account_id`` |
| """ |
| if environ is None: |
| environ = os.environ |
| self.environ = environ |
| self._mapping = self._build_mapping(mapping) |
|
|
| def _build_mapping(self, mapping): |
| |
| var_mapping = {} |
| if mapping is None: |
| |
| var_mapping['access_key'] = self.ACCESS_KEY |
| var_mapping['secret_key'] = self.SECRET_KEY |
| var_mapping['token'] = self.TOKENS |
| var_mapping['expiry_time'] = self.EXPIRY_TIME |
| var_mapping['account_id'] = self.ACCOUNT_ID |
| else: |
| var_mapping['access_key'] = mapping.get( |
| 'access_key', self.ACCESS_KEY |
| ) |
| var_mapping['secret_key'] = mapping.get( |
| 'secret_key', self.SECRET_KEY |
| ) |
| var_mapping['token'] = mapping.get('token', self.TOKENS) |
| if not isinstance(var_mapping['token'], list): |
| var_mapping['token'] = [var_mapping['token']] |
| var_mapping['expiry_time'] = mapping.get( |
| 'expiry_time', self.EXPIRY_TIME |
| ) |
| var_mapping['account_id'] = mapping.get( |
| 'account_id', self.ACCOUNT_ID |
| ) |
| return var_mapping |
|
|
| def load(self): |
| """ |
| Search for credentials in explicit environment variables. |
| """ |
|
|
| access_key = self.environ.get(self._mapping['access_key'], '') |
|
|
| if access_key: |
| logger.info('Found credentials in environment variables.') |
| fetcher = self._create_credentials_fetcher() |
| credentials = fetcher(require_expiry=False) |
| register_feature_id('CREDENTIALS_ENV_VARS') |
|
|
| expiry_time = credentials['expiry_time'] |
| if expiry_time is not None: |
| expiry_time = parse(expiry_time) |
| return RefreshableCredentials( |
| credentials['access_key'], |
| credentials['secret_key'], |
| credentials['token'], |
| expiry_time, |
| refresh_using=fetcher, |
| method=self.METHOD, |
| account_id=credentials['account_id'], |
| ) |
|
|
| return Credentials( |
| credentials['access_key'], |
| credentials['secret_key'], |
| credentials['token'], |
| method=self.METHOD, |
| account_id=credentials['account_id'], |
| ) |
| else: |
| return None |
|
|
| def _create_credentials_fetcher(self): |
| mapping = self._mapping |
| method = self.METHOD |
| environ = self.environ |
|
|
| def fetch_credentials(require_expiry=True): |
| credentials = {} |
|
|
| access_key = environ.get(mapping['access_key'], '') |
| if not access_key: |
| raise PartialCredentialsError( |
| provider=method, cred_var=mapping['access_key'] |
| ) |
| credentials['access_key'] = access_key |
|
|
| secret_key = environ.get(mapping['secret_key'], '') |
| if not secret_key: |
| raise PartialCredentialsError( |
| provider=method, cred_var=mapping['secret_key'] |
| ) |
| credentials['secret_key'] = secret_key |
|
|
| credentials['token'] = None |
| for token_env_var in mapping['token']: |
| token = environ.get(token_env_var, '') |
| if token: |
| credentials['token'] = token |
| break |
|
|
| credentials['expiry_time'] = None |
| expiry_time = environ.get(mapping['expiry_time'], '') |
| if expiry_time: |
| credentials['expiry_time'] = expiry_time |
| if require_expiry and not expiry_time: |
| raise PartialCredentialsError( |
| provider=method, cred_var=mapping['expiry_time'] |
| ) |
|
|
| credentials['account_id'] = None |
| account_id = environ.get(mapping['account_id'], '') |
| if account_id: |
| credentials['account_id'] = account_id |
|
|
| return credentials |
|
|
| return fetch_credentials |
|
|
|
|
| class OriginalEC2Provider(CredentialProvider): |
| METHOD = 'ec2-credentials-file' |
| CANONICAL_NAME = 'Ec2Config' |
|
|
| CRED_FILE_ENV = 'AWS_CREDENTIAL_FILE' |
| ACCESS_KEY = 'AWSAccessKeyId' |
| SECRET_KEY = 'AWSSecretKey' |
|
|
| def __init__(self, environ=None, parser=None): |
| if environ is None: |
| environ = os.environ |
| if parser is None: |
| parser = parse_key_val_file |
| self._environ = environ |
| self._parser = parser |
|
|
| def load(self): |
| """ |
| Search for a credential file used by original EC2 CLI tools. |
| """ |
| if 'AWS_CREDENTIAL_FILE' in self._environ: |
| full_path = os.path.expanduser( |
| self._environ['AWS_CREDENTIAL_FILE'] |
| ) |
| creds = self._parser(full_path) |
| if self.ACCESS_KEY in creds: |
| logger.info('Found credentials in AWS_CREDENTIAL_FILE.') |
| access_key = creds[self.ACCESS_KEY] |
| secret_key = creds[self.SECRET_KEY] |
| |
| return Credentials(access_key, secret_key, method=self.METHOD) |
| else: |
| return None |
|
|
|
|
| class SharedCredentialProvider(CredentialProvider): |
| METHOD = 'shared-credentials-file' |
| CANONICAL_NAME = 'SharedCredentials' |
|
|
| ACCESS_KEY = 'aws_access_key_id' |
| SECRET_KEY = 'aws_secret_access_key' |
| |
| |
| |
| TOKENS = ['aws_security_token', 'aws_session_token'] |
| ACCOUNT_ID = 'aws_account_id' |
|
|
| def __init__(self, creds_filename, profile_name=None, ini_parser=None): |
| self._creds_filename = creds_filename |
| if profile_name is None: |
| profile_name = 'default' |
| self._profile_name = profile_name |
| if ini_parser is None: |
| ini_parser = botocore.configloader.raw_config_parse |
| self._ini_parser = ini_parser |
|
|
| def load(self): |
| try: |
| available_creds = self._ini_parser(self._creds_filename) |
| except ConfigNotFound: |
| return None |
| if self._profile_name in available_creds: |
| config = available_creds[self._profile_name] |
| if self.ACCESS_KEY in config: |
| logger.info( |
| "Found credentials in shared credentials file: %s", |
| self._creds_filename, |
| ) |
| access_key, secret_key = self._extract_creds_from_mapping( |
| config, self.ACCESS_KEY, self.SECRET_KEY |
| ) |
| token = self._get_session_token(config) |
| account_id = self._get_account_id(config) |
| register_feature_id('CREDENTIALS_PROFILE') |
| return Credentials( |
| access_key, |
| secret_key, |
| token, |
| method=self.METHOD, |
| account_id=account_id, |
| ) |
|
|
| def _get_session_token(self, config): |
| for token_envvar in self.TOKENS: |
| if token_envvar in config: |
| return config[token_envvar] |
|
|
| def _get_account_id(self, config): |
| return config.get(self.ACCOUNT_ID) |
|
|
|
|
| class ConfigProvider(CredentialProvider): |
| """INI based config provider with profile sections.""" |
|
|
| METHOD = 'config-file' |
| CANONICAL_NAME = 'SharedConfig' |
|
|
| ACCESS_KEY = 'aws_access_key_id' |
| SECRET_KEY = 'aws_secret_access_key' |
| |
| |
| |
| TOKENS = ['aws_security_token', 'aws_session_token'] |
| ACCOUNT_ID = 'aws_account_id' |
|
|
| def __init__(self, config_filename, profile_name, config_parser=None): |
| """ |
| |
| :param config_filename: The session configuration scoped to the current |
| profile. This is available via ``session.config``. |
| :param profile_name: The name of the current profile. |
| :param config_parser: A config parser callable. |
| |
| """ |
| self._config_filename = config_filename |
| self._profile_name = profile_name |
| if config_parser is None: |
| config_parser = botocore.configloader.load_config |
| self._config_parser = config_parser |
|
|
| def load(self): |
| """ |
| If there is are credentials in the configuration associated with |
| the session, use those. |
| """ |
| try: |
| full_config = self._config_parser(self._config_filename) |
| except ConfigNotFound: |
| return None |
| if self._profile_name in full_config['profiles']: |
| profile_config = full_config['profiles'][self._profile_name] |
| if self.ACCESS_KEY in profile_config: |
| logger.info( |
| "Credentials found in config file: %s", |
| self._config_filename, |
| ) |
| access_key, secret_key = self._extract_creds_from_mapping( |
| profile_config, self.ACCESS_KEY, self.SECRET_KEY |
| ) |
| token = self._get_session_token(profile_config) |
| account_id = self._get_account_id(profile_config) |
| register_feature_id('CREDENTIALS_PROFILE') |
| return Credentials( |
| access_key, |
| secret_key, |
| token, |
| method=self.METHOD, |
| account_id=account_id, |
| ) |
| else: |
| return None |
|
|
| def _get_session_token(self, profile_config): |
| for token_name in self.TOKENS: |
| if token_name in profile_config: |
| return profile_config[token_name] |
|
|
| def _get_account_id(self, config): |
| return config.get(self.ACCOUNT_ID) |
|
|
|
|
| class BotoProvider(CredentialProvider): |
| METHOD = 'boto-config' |
| CANONICAL_NAME = 'Boto2Config' |
|
|
| BOTO_CONFIG_ENV = 'BOTO_CONFIG' |
| DEFAULT_CONFIG_FILENAMES = ['/etc/boto.cfg', '~/.boto'] |
| ACCESS_KEY = 'aws_access_key_id' |
| SECRET_KEY = 'aws_secret_access_key' |
|
|
| def __init__(self, environ=None, ini_parser=None): |
| if environ is None: |
| environ = os.environ |
| if ini_parser is None: |
| ini_parser = botocore.configloader.raw_config_parse |
| self._environ = environ |
| self._ini_parser = ini_parser |
|
|
| def load(self): |
| """ |
| Look for credentials in boto config file. |
| """ |
| if self.BOTO_CONFIG_ENV in self._environ: |
| potential_locations = [self._environ[self.BOTO_CONFIG_ENV]] |
| else: |
| potential_locations = self.DEFAULT_CONFIG_FILENAMES |
| for filename in potential_locations: |
| try: |
| config = self._ini_parser(filename) |
| except ConfigNotFound: |
| |
| continue |
| if 'Credentials' in config: |
| credentials = config['Credentials'] |
| if self.ACCESS_KEY in credentials: |
| logger.info( |
| "Found credentials in boto config file: %s", filename |
| ) |
| access_key, secret_key = self._extract_creds_from_mapping( |
| credentials, self.ACCESS_KEY, self.SECRET_KEY |
| ) |
| register_feature_id('CREDENTIALS_BOTO2_CONFIG_FILE') |
| return Credentials( |
| access_key, secret_key, method=self.METHOD |
| ) |
|
|
|
|
| class AssumeRoleProvider(CredentialProvider): |
| METHOD = 'assume-role' |
| |
| |
| |
| |
| |
| CANONICAL_NAME = None |
| ROLE_CONFIG_VAR = 'role_arn' |
| WEB_IDENTITY_TOKE_FILE_VAR = 'web_identity_token_file' |
| |
| |
| |
| EXPIRY_WINDOW_SECONDS = 60 * 15 |
| NAMED_PROVIDER_FEATURE_MAP = { |
| 'Ec2InstanceMetadata': 'CREDENTIALS_IMDS', |
| 'Environment': 'CREDENTIALS_ENV_VARS', |
| 'EcsContainer': 'CREDENTIALS_HTTP', |
| } |
|
|
| def __init__( |
| self, |
| load_config, |
| client_creator, |
| cache, |
| profile_name, |
| prompter=getpass.getpass, |
| credential_sourcer=None, |
| profile_provider_builder=None, |
| ): |
| """ |
| :type load_config: callable |
| :param load_config: A function that accepts no arguments, and |
| when called, will return the full configuration dictionary |
| for the session (``session.full_config``). |
| |
| :type client_creator: callable |
| :param client_creator: A factory function that will create |
| a client when called. Has the same interface as |
| ``botocore.session.Session.create_client``. |
| |
| :type cache: dict |
| :param cache: An object that supports ``__getitem__``, |
| ``__setitem__``, and ``__contains__``. An example |
| of this is the ``JSONFileCache`` class in the CLI. |
| |
| :type profile_name: str |
| :param profile_name: The name of the profile. |
| |
| :type prompter: callable |
| :param prompter: A callable that returns input provided |
| by the user (i.e raw_input, getpass.getpass, etc.). |
| |
| :type credential_sourcer: CanonicalNameCredentialSourcer |
| :param credential_sourcer: A credential provider that takes a |
| configuration, which is used to provide the source credentials |
| for the STS call. |
| """ |
| |
| |
| |
| |
| |
| self.cache = cache |
| self._load_config = load_config |
| |
| |
| self._client_creator = client_creator |
| self._profile_name = profile_name |
| self._prompter = prompter |
| |
| |
| |
| |
| |
| |
| |
| self._loaded_config = {} |
| self._credential_sourcer = credential_sourcer |
| self._profile_provider_builder = profile_provider_builder |
| self._visited_profiles = [self._profile_name] |
| self._feature_ids = set() |
|
|
| def load(self): |
| self._loaded_config = self._load_config() |
| profiles = self._loaded_config.get('profiles', {}) |
| profile = profiles.get(self._profile_name, {}) |
| if self._has_assume_role_config_vars(profile): |
| return self._load_creds_via_assume_role(self._profile_name) |
|
|
| def _has_assume_role_config_vars(self, profile): |
| return ( |
| self.ROLE_CONFIG_VAR in profile |
| and |
| |
| |
| |
| |
| self.WEB_IDENTITY_TOKE_FILE_VAR not in profile |
| ) |
|
|
| def _load_creds_via_assume_role(self, profile_name): |
| role_config = self._get_role_config(profile_name) |
| source_credentials = self._resolve_source_credentials( |
| role_config, profile_name |
| ) |
|
|
| extra_args = {} |
| role_session_name = role_config.get('role_session_name') |
| if role_session_name is not None: |
| extra_args['RoleSessionName'] = role_session_name |
|
|
| external_id = role_config.get('external_id') |
| if external_id is not None: |
| extra_args['ExternalId'] = external_id |
|
|
| mfa_serial = role_config.get('mfa_serial') |
| if mfa_serial is not None: |
| extra_args['SerialNumber'] = mfa_serial |
|
|
| duration_seconds = role_config.get('duration_seconds') |
| if duration_seconds is not None: |
| extra_args['DurationSeconds'] = duration_seconds |
|
|
| fetcher = AssumeRoleCredentialFetcher( |
| client_creator=self._client_creator, |
| source_credentials=source_credentials, |
| role_arn=role_config['role_arn'], |
| extra_args=extra_args, |
| mfa_prompter=self._prompter, |
| cache=self.cache, |
| ) |
| fetcher.feature_ids = self._feature_ids.copy() |
| refresher = fetcher.fetch_credentials |
| if mfa_serial is not None: |
| refresher = create_mfa_serial_refresher(refresher) |
|
|
| self._feature_ids.add('CREDENTIALS_STS_ASSUME_ROLE') |
| register_feature_ids(self._feature_ids) |
| |
| |
| |
| return DeferredRefreshableCredentials( |
| method=self.METHOD, |
| refresh_using=refresher, |
| time_fetcher=_local_now, |
| ) |
|
|
| def _get_role_config(self, profile_name): |
| """Retrieves and validates the role configuration for the profile.""" |
| profiles = self._loaded_config.get('profiles', {}) |
|
|
| profile = profiles[profile_name] |
| source_profile = profile.get('source_profile') |
| role_arn = profile['role_arn'] |
| credential_source = profile.get('credential_source') |
| mfa_serial = profile.get('mfa_serial') |
| external_id = profile.get('external_id') |
| role_session_name = profile.get('role_session_name') |
| duration_seconds = profile.get('duration_seconds') |
|
|
| role_config = { |
| 'role_arn': role_arn, |
| 'external_id': external_id, |
| 'mfa_serial': mfa_serial, |
| 'role_session_name': role_session_name, |
| 'source_profile': source_profile, |
| 'credential_source': credential_source, |
| } |
|
|
| if duration_seconds is not None: |
| try: |
| role_config['duration_seconds'] = int(duration_seconds) |
| except ValueError: |
| pass |
|
|
| |
| |
| if credential_source is not None and source_profile is not None: |
| raise InvalidConfigError( |
| error_msg=( |
| f'The profile "{profile_name}" contains both ' |
| 'source_profile and credential_source.' |
| ) |
| ) |
| elif credential_source is None and source_profile is None: |
| raise PartialCredentialsError( |
| provider=self.METHOD, |
| cred_var='source_profile or credential_source', |
| ) |
| elif credential_source is not None: |
| self._validate_credential_source(profile_name, credential_source) |
| else: |
| self._validate_source_profile(profile_name, source_profile) |
|
|
| return role_config |
|
|
| def _validate_credential_source(self, parent_profile, credential_source): |
| if self._credential_sourcer is None: |
| raise InvalidConfigError( |
| error_msg=( |
| f"The credential_source \"{credential_source}\" is specified " |
| f"in profile \"{parent_profile}\", " |
| f"but no source provider was configured." |
| ) |
| ) |
| if not self._credential_sourcer.is_supported(credential_source): |
| raise InvalidConfigError( |
| error_msg=( |
| f"The credential source \"{credential_source}\" referenced " |
| f"in profile \"{parent_profile}\" is not valid." |
| ) |
| ) |
|
|
| def _source_profile_has_credentials(self, profile): |
| return any( |
| [ |
| self._has_static_credentials(profile), |
| self._has_assume_role_config_vars(profile), |
| ] |
| ) |
|
|
| def _validate_source_profile( |
| self, parent_profile_name, source_profile_name |
| ): |
| profiles = self._loaded_config.get('profiles', {}) |
| if source_profile_name not in profiles: |
| raise InvalidConfigError( |
| error_msg=( |
| f"The source_profile \"{source_profile_name}\" referenced in " |
| f"the profile \"{parent_profile_name}\" does not exist." |
| ) |
| ) |
|
|
| source_profile = profiles[source_profile_name] |
|
|
| |
| |
| if source_profile_name not in self._visited_profiles: |
| return |
|
|
| |
| |
| if source_profile_name != parent_profile_name: |
| raise InfiniteLoopConfigError( |
| source_profile=source_profile_name, |
| visited_profiles=self._visited_profiles, |
| ) |
|
|
| |
| |
| |
| |
| |
| if not self._has_static_credentials(source_profile): |
| raise InfiniteLoopConfigError( |
| source_profile=source_profile_name, |
| visited_profiles=self._visited_profiles, |
| ) |
|
|
| def _has_static_credentials(self, profile): |
| static_keys = ['aws_secret_access_key', 'aws_access_key_id'] |
| return any(static_key in profile for static_key in static_keys) |
|
|
| def _resolve_source_credentials(self, role_config, profile_name): |
| credential_source = role_config.get('credential_source') |
| if credential_source is not None: |
| self._feature_ids.add('CREDENTIALS_PROFILE_NAMED_PROVIDER') |
| return self._resolve_credentials_from_source( |
| credential_source, profile_name |
| ) |
|
|
| source_profile = role_config['source_profile'] |
| self._visited_profiles.append(source_profile) |
| self._feature_ids.add('CREDENTIALS_PROFILE_SOURCE_PROFILE') |
| return self._resolve_credentials_from_profile(source_profile) |
|
|
| def _resolve_credentials_from_profile(self, profile_name): |
| profiles = self._loaded_config.get('profiles', {}) |
| profile = profiles[profile_name] |
| self._feature_ids.add('CREDENTIALS_PROFILE') |
| if ( |
| self._has_static_credentials(profile) |
| and not self._profile_provider_builder |
| ): |
| |
| |
| |
| |
| return self._resolve_static_credentials_from_profile(profile) |
| elif self._has_static_credentials( |
| profile |
| ) or not self._has_assume_role_config_vars(profile): |
| profile_providers = self._profile_provider_builder.providers( |
| profile_name=profile_name, |
| disable_env_vars=True, |
| ) |
| profile_chain = CredentialResolver(profile_providers) |
| credentials = profile_chain.load_credentials() |
| if credentials is None: |
| error_message = ( |
| 'The source profile "%s" must have credentials.' |
| ) |
| raise InvalidConfigError( |
| error_msg=error_message % profile_name, |
| ) |
| return credentials |
|
|
| return self._load_creds_via_assume_role(profile_name) |
|
|
| def _resolve_static_credentials_from_profile(self, profile): |
| try: |
| return Credentials( |
| access_key=profile['aws_access_key_id'], |
| secret_key=profile['aws_secret_access_key'], |
| token=profile.get('aws_session_token'), |
| ) |
| except KeyError as e: |
| raise PartialCredentialsError( |
| provider=self.METHOD, cred_var=str(e) |
| ) |
|
|
| def _resolve_credentials_from_source( |
| self, credential_source, profile_name |
| ): |
| credentials = self._credential_sourcer.source_credentials( |
| credential_source |
| ) |
| if credentials is None: |
| raise CredentialRetrievalError( |
| provider=credential_source, |
| error_msg=( |
| 'No credentials found in credential_source referenced ' |
| f'in profile {profile_name}' |
| ), |
| ) |
| named_provider_feature_id = self.NAMED_PROVIDER_FEATURE_MAP.get( |
| credential_source |
| ) |
| if named_provider_feature_id: |
| self._feature_ids.add(named_provider_feature_id) |
| return credentials |
|
|
|
|
| class AssumeRoleWithWebIdentityProvider(CredentialProvider): |
| METHOD = 'assume-role-with-web-identity' |
| CANONICAL_NAME = None |
| _CONFIG_TO_ENV_VAR = { |
| 'web_identity_token_file': 'AWS_WEB_IDENTITY_TOKEN_FILE', |
| 'role_session_name': 'AWS_ROLE_SESSION_NAME', |
| 'role_arn': 'AWS_ROLE_ARN', |
| } |
|
|
| def __init__( |
| self, |
| load_config, |
| client_creator, |
| profile_name, |
| cache=None, |
| disable_env_vars=False, |
| token_loader_cls=None, |
| ): |
| self.cache = cache |
| self._load_config = load_config |
| self._client_creator = client_creator |
| self._profile_name = profile_name |
| self._profile_config = None |
| self._disable_env_vars = disable_env_vars |
| if token_loader_cls is None: |
| token_loader_cls = FileWebIdentityTokenLoader |
| self._token_loader_cls = token_loader_cls |
| self._feature_ids = set() |
|
|
| def load(self): |
| return self._assume_role_with_web_identity() |
|
|
| def _get_profile_config(self, key): |
| if self._profile_config is None: |
| loaded_config = self._load_config() |
| profiles = loaded_config.get('profiles', {}) |
| self._profile_config = profiles.get(self._profile_name, {}) |
| return self._profile_config.get(key) |
|
|
| def _get_env_config(self, key): |
| if self._disable_env_vars: |
| return None |
| env_key = self._CONFIG_TO_ENV_VAR.get(key) |
| if env_key and env_key in os.environ: |
| return os.environ[env_key] |
| return None |
|
|
| def _get_config(self, key): |
| env_value = self._get_env_config(key) |
| if env_value is not None: |
| self._feature_ids.add('CREDENTIALS_ENV_VARS_STS_WEB_ID_TOKEN') |
| return env_value |
|
|
| config_value = self._get_profile_config(key) |
| if config_value is not None: |
| self._feature_ids.add('CREDENTIALS_PROFILE_STS_WEB_ID_TOKEN') |
| return config_value |
|
|
| return None |
|
|
| def _assume_role_with_web_identity(self): |
| token_path = self._get_config('web_identity_token_file') |
| if not token_path: |
| return None |
| token_loader = self._token_loader_cls(token_path) |
|
|
| role_arn = self._get_config('role_arn') |
| if not role_arn: |
| error_msg = ( |
| 'The provided profile or the current environment is ' |
| 'configured to assume role with web identity but has no ' |
| 'role ARN configured. Ensure that the profile has the role_arn' |
| 'configuration set or the AWS_ROLE_ARN env var is set.' |
| ) |
| raise InvalidConfigError(error_msg=error_msg) |
|
|
| extra_args = {} |
| role_session_name = self._get_config('role_session_name') |
| if role_session_name is not None: |
| extra_args['RoleSessionName'] = role_session_name |
|
|
| fetcher = AssumeRoleWithWebIdentityCredentialFetcher( |
| client_creator=self._client_creator, |
| web_identity_token_loader=token_loader, |
| role_arn=role_arn, |
| extra_args=extra_args, |
| cache=self.cache, |
| ) |
| fetcher.feature_ids = self._feature_ids.copy() |
|
|
| self._feature_ids.add('CREDENTIALS_STS_ASSUME_ROLE_WEB_ID') |
| register_feature_ids(self._feature_ids) |
| |
| |
| |
| return DeferredRefreshableCredentials( |
| method=self.METHOD, |
| refresh_using=fetcher.fetch_credentials, |
| ) |
|
|
|
|
| class CanonicalNameCredentialSourcer: |
| def __init__(self, providers): |
| self._providers = providers |
|
|
| def is_supported(self, source_name): |
| """Validates a given source name. |
| |
| :type source_name: str |
| :param source_name: The value of credential_source in the config |
| file. This is the canonical name of the credential provider. |
| |
| :rtype: bool |
| :returns: True if the credential provider is supported, |
| False otherwise. |
| """ |
| return source_name in [p.CANONICAL_NAME for p in self._providers] |
|
|
| def source_credentials(self, source_name): |
| """Loads source credentials based on the provided configuration. |
| |
| :type source_name: str |
| :param source_name: The value of credential_source in the config |
| file. This is the canonical name of the credential provider. |
| |
| :rtype: Credentials |
| """ |
| source = self._get_provider(source_name) |
| if isinstance(source, CredentialResolver): |
| return source.load_credentials() |
| return source.load() |
|
|
| def _get_provider(self, canonical_name): |
| """Return a credential provider by its canonical name. |
| |
| :type canonical_name: str |
| :param canonical_name: The canonical name of the provider. |
| |
| :raises UnknownCredentialError: Raised if no |
| credential provider by the provided name |
| is found. |
| """ |
| provider = self._get_provider_by_canonical_name(canonical_name) |
|
|
| |
| |
| |
| |
| |
| |
| if canonical_name.lower() in ['sharedconfig', 'sharedcredentials']: |
| assume_role_provider = self._get_provider_by_method('assume-role') |
| if assume_role_provider is not None: |
| |
| |
| |
| |
| if provider is None: |
| return assume_role_provider |
|
|
| |
| |
| |
| return CredentialResolver([assume_role_provider, provider]) |
|
|
| if provider is None: |
| raise UnknownCredentialError(name=canonical_name) |
|
|
| return provider |
|
|
| def _get_provider_by_canonical_name(self, canonical_name): |
| """Return a credential provider by its canonical name. |
| |
| This function is strict, it does not attempt to address |
| compatibility issues. |
| """ |
| for provider in self._providers: |
| name = provider.CANONICAL_NAME |
| |
| if name and name.lower() == canonical_name.lower(): |
| return provider |
|
|
| def _get_provider_by_method(self, method): |
| """Return a credential provider by its METHOD name.""" |
| for provider in self._providers: |
| if provider.METHOD == method: |
| return provider |
|
|
|
|
| class ContainerProvider(CredentialProvider): |
| METHOD = 'container-role' |
| CANONICAL_NAME = 'EcsContainer' |
| ENV_VAR = 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI' |
| ENV_VAR_FULL = 'AWS_CONTAINER_CREDENTIALS_FULL_URI' |
| ENV_VAR_AUTH_TOKEN = 'AWS_CONTAINER_AUTHORIZATION_TOKEN' |
| ENV_VAR_AUTH_TOKEN_FILE = 'AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE' |
|
|
| def __init__(self, environ=None, fetcher=None): |
| if environ is None: |
| environ = os.environ |
| if fetcher is None: |
| fetcher = ContainerMetadataFetcher() |
| self._environ = environ |
| self._fetcher = fetcher |
|
|
| def load(self): |
| |
| |
| if self.ENV_VAR in self._environ or self.ENV_VAR_FULL in self._environ: |
| return self._retrieve_or_fail() |
|
|
| def _retrieve_or_fail(self): |
| if self._provided_relative_uri(): |
| full_uri = self._fetcher.full_url(self._environ[self.ENV_VAR]) |
| else: |
| full_uri = self._environ[self.ENV_VAR_FULL] |
| fetcher = self._create_fetcher(full_uri) |
| creds = fetcher() |
| return RefreshableCredentials( |
| access_key=creds['access_key'], |
| secret_key=creds['secret_key'], |
| token=creds['token'], |
| method=self.METHOD, |
| expiry_time=_parse_if_needed(creds['expiry_time']), |
| refresh_using=fetcher, |
| account_id=creds.get('account_id'), |
| ) |
|
|
| def _build_headers(self): |
| auth_token = None |
| if self.ENV_VAR_AUTH_TOKEN_FILE in self._environ: |
| auth_token_file_path = self._environ[self.ENV_VAR_AUTH_TOKEN_FILE] |
| with open(auth_token_file_path) as token_file: |
| auth_token = token_file.read() |
| elif self.ENV_VAR_AUTH_TOKEN in self._environ: |
| auth_token = self._environ[self.ENV_VAR_AUTH_TOKEN] |
| if auth_token is not None: |
| self._validate_auth_token(auth_token) |
| return {'Authorization': auth_token} |
|
|
| def _validate_auth_token(self, auth_token): |
| if "\r" in auth_token or "\n" in auth_token: |
| raise ValueError("Auth token value is not a legal header value") |
|
|
| def _create_fetcher(self, full_uri, *args, **kwargs): |
| def fetch_creds(): |
| try: |
| headers = self._build_headers() |
| response = self._fetcher.retrieve_full_uri( |
| full_uri, headers=headers |
| ) |
| register_feature_id('CREDENTIALS_HTTP') |
| except MetadataRetrievalError as e: |
| logger.debug( |
| "Error retrieving container metadata: %s", e, exc_info=True |
| ) |
| raise CredentialRetrievalError( |
| provider=self.METHOD, error_msg=str(e) |
| ) |
| return { |
| 'access_key': response['AccessKeyId'], |
| 'secret_key': response['SecretAccessKey'], |
| 'token': response['Token'], |
| 'expiry_time': response['Expiration'], |
| 'account_id': response.get('AccountId'), |
| } |
|
|
| return fetch_creds |
|
|
| def _provided_relative_uri(self): |
| return self.ENV_VAR in self._environ |
|
|
|
|
| class CredentialResolver: |
| def __init__(self, providers): |
| """ |
| |
| :param providers: A list of ``CredentialProvider`` instances. |
| |
| """ |
| self.providers = providers |
|
|
| def insert_before(self, name, credential_provider): |
| """ |
| Inserts a new instance of ``CredentialProvider`` into the chain that |
| will be tried before an existing one. |
| |
| :param name: The short name of the credentials you'd like to insert the |
| new credentials before. (ex. ``env`` or ``config``). Existing names |
| & ordering can be discovered via ``self.available_methods``. |
| :type name: string |
| |
| :param cred_instance: An instance of the new ``Credentials`` object |
| you'd like to add to the chain. |
| :type cred_instance: A subclass of ``Credentials`` |
| """ |
| try: |
| offset = [p.METHOD for p in self.providers].index(name) |
| except ValueError: |
| raise UnknownCredentialError(name=name) |
| self.providers.insert(offset, credential_provider) |
|
|
| def insert_after(self, name, credential_provider): |
| """ |
| Inserts a new type of ``Credentials`` instance into the chain that will |
| be tried after an existing one. |
| |
| :param name: The short name of the credentials you'd like to insert the |
| new credentials after. (ex. ``env`` or ``config``). Existing names |
| & ordering can be discovered via ``self.available_methods``. |
| :type name: string |
| |
| :param cred_instance: An instance of the new ``Credentials`` object |
| you'd like to add to the chain. |
| :type cred_instance: A subclass of ``Credentials`` |
| """ |
| offset = self._get_provider_offset(name) |
| self.providers.insert(offset + 1, credential_provider) |
|
|
| def remove(self, name): |
| """ |
| Removes a given ``Credentials`` instance from the chain. |
| |
| :param name: The short name of the credentials instance to remove. |
| :type name: string |
| """ |
| available_methods = [p.METHOD for p in self.providers] |
| if name not in available_methods: |
| |
| return |
|
|
| offset = available_methods.index(name) |
| self.providers.pop(offset) |
|
|
| def get_provider(self, name): |
| """Return a credential provider by name. |
| |
| :type name: str |
| :param name: The name of the provider. |
| |
| :raises UnknownCredentialError: Raised if no |
| credential provider by the provided name |
| is found. |
| """ |
| return self.providers[self._get_provider_offset(name)] |
|
|
| def _get_provider_offset(self, name): |
| try: |
| return [p.METHOD for p in self.providers].index(name) |
| except ValueError: |
| raise UnknownCredentialError(name=name) |
|
|
| def load_credentials(self): |
| """ |
| Goes through the credentials chain, returning the first ``Credentials`` |
| that could be loaded. |
| """ |
| |
| for provider in self.providers: |
| logger.debug("Looking for credentials via: %s", provider.METHOD) |
| creds = provider.load() |
| if creds is not None: |
| return creds |
|
|
| |
| |
| |
| |
| |
| |
| return None |
|
|
|
|
| class SSOCredentialFetcher(CachedCredentialFetcher): |
| _UTC_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' |
|
|
| def __init__( |
| self, |
| start_url, |
| sso_region, |
| role_name, |
| account_id, |
| client_creator, |
| token_loader=None, |
| cache=None, |
| expiry_window_seconds=None, |
| token_provider=None, |
| sso_session_name=None, |
| time_fetcher=_local_now, |
| ): |
| self._client_creator = client_creator |
| self._sso_region = sso_region |
| self._role_name = role_name |
| self._account_id = account_id |
| self._start_url = start_url |
| self._token_loader = token_loader |
| self._token_provider = token_provider |
| self._sso_session_name = sso_session_name |
| self._time_fetcher = time_fetcher |
| super().__init__(cache, expiry_window_seconds) |
|
|
| def _create_cache_key(self): |
| """Create a predictable cache key for the current configuration. |
| |
| The cache key is intended to be compatible with file names. |
| """ |
| args = { |
| 'roleName': self._role_name, |
| 'accountId': self._account_id, |
| } |
| if self._sso_session_name: |
| args['sessionName'] = self._sso_session_name |
| else: |
| args['startUrl'] = self._start_url |
| |
| |
| |
| |
| |
| args = json.dumps(args, sort_keys=True, separators=(',', ':')) |
| argument_hash = sha1(args.encode('utf-8')).hexdigest() |
| return self._make_file_safe(argument_hash) |
|
|
| def _parse_timestamp(self, timestamp_ms): |
| |
| timestamp_seconds = timestamp_ms / 1000.0 |
| timestamp = datetime.datetime.fromtimestamp(timestamp_seconds, tzutc()) |
| return timestamp.strftime(self._UTC_DATE_FORMAT) |
|
|
| def _get_credentials(self): |
| """Get credentials by calling SSO get role credentials.""" |
| config = Config( |
| signature_version=UNSIGNED, |
| region_name=self._sso_region, |
| ) |
| client = self._client_creator('sso', config=config) |
| if self._token_provider: |
| initial_token_data = self._token_provider.load_token() |
| token = initial_token_data.get_frozen_token().token |
| else: |
| token_dict = self._token_loader(self._start_url) |
| token = token_dict['accessToken'] |
|
|
| |
| |
| |
| expiration = dateutil.parser.parse(token_dict['expiresAt']) |
| remaining = total_seconds(expiration - self._time_fetcher()) |
| if remaining <= 0: |
| raise UnauthorizedSSOTokenError() |
|
|
| kwargs = { |
| 'roleName': self._role_name, |
| 'accountId': self._account_id, |
| 'accessToken': token, |
| } |
| try: |
| register_feature_ids(self.feature_ids) |
| response = client.get_role_credentials(**kwargs) |
| except client.exceptions.UnauthorizedException: |
| raise UnauthorizedSSOTokenError() |
| credentials = response['roleCredentials'] |
|
|
| credentials = { |
| 'ProviderType': 'sso', |
| 'Credentials': { |
| 'AccessKeyId': credentials['accessKeyId'], |
| 'SecretAccessKey': credentials['secretAccessKey'], |
| 'SessionToken': credentials['sessionToken'], |
| 'Expiration': self._parse_timestamp(credentials['expiration']), |
| 'AccountId': self._account_id, |
| }, |
| } |
| return credentials |
|
|
|
|
| class SSOProvider(CredentialProvider): |
| METHOD = 'sso' |
|
|
| _SSO_TOKEN_CACHE_DIR = os.path.expanduser( |
| os.path.join('~', '.aws', 'sso', 'cache') |
| ) |
| _PROFILE_REQUIRED_CONFIG_VARS = ( |
| 'sso_role_name', |
| 'sso_account_id', |
| ) |
| _SSO_REQUIRED_CONFIG_VARS = ( |
| 'sso_start_url', |
| 'sso_region', |
| ) |
| _ALL_REQUIRED_CONFIG_VARS = ( |
| _PROFILE_REQUIRED_CONFIG_VARS + _SSO_REQUIRED_CONFIG_VARS |
| ) |
|
|
| def __init__( |
| self, |
| load_config, |
| client_creator, |
| profile_name, |
| cache=None, |
| token_cache=None, |
| token_provider=None, |
| ): |
| if token_cache is None: |
| token_cache = JSONFileCache(self._SSO_TOKEN_CACHE_DIR) |
| self._token_cache = token_cache |
| self._token_provider = token_provider |
| if cache is None: |
| cache = {} |
| self.cache = cache |
| self._load_config = load_config |
| self._client_creator = client_creator |
| self._profile_name = profile_name |
| self._feature_ids = set() |
|
|
| def _load_sso_config(self): |
| loaded_config = self._load_config() |
| profiles = loaded_config.get('profiles', {}) |
| profile_name = self._profile_name |
| profile_config = profiles.get(self._profile_name, {}) |
| sso_sessions = loaded_config.get('sso_sessions', {}) |
|
|
| |
| if all( |
| c not in profile_config for c in self._PROFILE_REQUIRED_CONFIG_VARS |
| ): |
| return None |
|
|
| resolved_config, extra_reqs = self._resolve_sso_session_reference( |
| profile_config, sso_sessions |
| ) |
|
|
| config = {} |
| missing_config_vars = [] |
| all_required_configs = self._ALL_REQUIRED_CONFIG_VARS + extra_reqs |
| for config_var in all_required_configs: |
| if config_var in resolved_config: |
| config[config_var] = resolved_config[config_var] |
| else: |
| missing_config_vars.append(config_var) |
|
|
| if missing_config_vars: |
| missing = ', '.join(missing_config_vars) |
| raise InvalidConfigError( |
| error_msg=( |
| f'The profile "{profile_name}" is configured to use SSO ' |
| f'but is missing required configuration: {missing}' |
| ) |
| ) |
| return config |
|
|
| def _resolve_sso_session_reference(self, profile_config, sso_sessions): |
| sso_session_name = profile_config.get('sso_session') |
| if sso_session_name is None: |
| |
| return profile_config, () |
|
|
| if sso_session_name not in sso_sessions: |
| error_msg = f'The specified sso-session does not exist: "{sso_session_name}"' |
| raise InvalidConfigError(error_msg=error_msg) |
|
|
| config = profile_config.copy() |
| session = sso_sessions[sso_session_name] |
| for config_var, val in session.items(): |
| |
| if config.get(config_var, val) != val: |
| error_msg = ( |
| f"The value for {config_var} is inconsistent between " |
| f"profile ({config[config_var]}) and sso-session ({val})." |
| ) |
| raise InvalidConfigError(error_msg=error_msg) |
| config[config_var] = val |
| return config, ('sso_session',) |
|
|
| def load(self): |
| sso_config = self._load_sso_config() |
| if not sso_config: |
| return None |
|
|
| fetcher_kwargs = { |
| 'start_url': sso_config['sso_start_url'], |
| 'sso_region': sso_config['sso_region'], |
| 'role_name': sso_config['sso_role_name'], |
| 'account_id': sso_config['sso_account_id'], |
| 'client_creator': self._client_creator, |
| 'token_loader': SSOTokenLoader(cache=self._token_cache), |
| 'cache': self.cache, |
| } |
| sso_session_in_config = 'sso_session' in sso_config |
| if sso_session_in_config: |
| fetcher_kwargs['sso_session_name'] = sso_config['sso_session'] |
| fetcher_kwargs['token_provider'] = self._token_provider |
| self._feature_ids.add('CREDENTIALS_PROFILE_SSO') |
| else: |
| self._feature_ids.add('CREDENTIALS_PROFILE_SSO_LEGACY') |
|
|
| sso_fetcher = SSOCredentialFetcher(**fetcher_kwargs) |
| sso_fetcher.feature_ids = self._feature_ids.copy() |
|
|
| if sso_session_in_config: |
| self._feature_ids.add('CREDENTIALS_SSO') |
| else: |
| self._feature_ids.add('CREDENTIALS_SSO_LEGACY') |
|
|
| register_feature_ids(self._feature_ids) |
| return DeferredRefreshableCredentials( |
| method=self.METHOD, |
| refresh_using=sso_fetcher.fetch_credentials, |
| ) |
|
|
|
|
| def _base64_url_encode_no_padding(data): |
| return base64.urlsafe_b64encode(data).rstrip(b'=').decode('ascii') |
|
|
|
|
| def _build_dpop_header(private_key, uri, uid=None, ts=None): |
| if EC is None: |
| raise MissingDependencyException( |
| msg=( |
| "This operation requires an additional dependency. You" |
| " will need to pip install \"botocore[crt]\" before proceeding." |
| ) |
| ) |
| x, y = private_key.get_public_coords() |
| jwk = { |
| "kty": "EC", |
| "x": _base64_url_encode_no_padding(x), |
| "y": _base64_url_encode_no_padding(y), |
| "crv": "P-256", |
| } |
|
|
| header = { |
| "typ": "dpop+jwt", |
| "alg": "ES256", |
| "jwk": jwk, |
| } |
|
|
| payload = { |
| "htm": "POST", |
| "htu": uri, |
| "iat": ts or int(time.time()), |
| "jti": uid or str(uuid.uuid4()), |
| } |
| header_b64 = _base64_url_encode_no_padding( |
| json.dumps(header, separators=(',', ':')).encode() |
| ) |
| payload_b64 = _base64_url_encode_no_padding( |
| json.dumps(payload, separators=(',', ':')).encode() |
| ) |
| signing_input = f"{header_b64}.{payload_b64}".encode() |
| signature = private_key.sign(sha256(signing_input).digest()) |
| signature_bytes = EC.decode_der_signature_to_padded_pair( |
| signature, pad_to=32 |
| ) |
| signature_b64 = _base64_url_encode_no_padding(signature_bytes) |
|
|
| return f"{header_b64}.{payload_b64}.{signature_b64}" |
|
|
|
|
| def _build_add_dpop_header_handler(private_key): |
| """Builds a before-call handler for calculating and setting the DPoP header""" |
|
|
| def _add_dpop_header_handler(**kwargs): |
| kwargs['params']['headers']['DPoP'] = _build_dpop_header( |
| private_key, kwargs['params']['url'] |
| ) |
|
|
| return _add_dpop_header_handler |
|
|
|
|
| class LoginCredentialFetcher: |
| """ |
| Converts login access tokens from the cached token to |
| credentials, and supports refreshing them. |
| """ |
|
|
| _REFRESH_THRESHOLD = 5 * 60 |
| _REQUIRED_TOKEN_FIELDS = ( |
| 'accessToken', |
| 'refreshToken', |
| 'dpopKey', |
| 'clientId', |
| ) |
|
|
| def __init__( |
| self, |
| session_name, |
| token_loader, |
| client_creator, |
| time_fetcher=_local_now, |
| feature_ids=None, |
| ): |
| self._session_name = session_name |
| self._token_loader = token_loader |
| self._client_creator = client_creator |
| self._time_fetcher = time_fetcher |
| if feature_ids is None: |
| feature_ids = set() |
| self.feature_ids = feature_ids |
|
|
| def load_cached_credentials(self): |
| """Loads cached credentials without checking their expiry.""" |
| token = self._token_loader.load_token(self._session_name) |
|
|
| if token is None: |
| raise LoginTokenLoadError( |
| error_msg='Unable to load a existing login session for session ' |
| f'{self._session_name}. Please reauthenticate with ' |
| "'aws login'.", |
| ) |
|
|
| missing_fields = [ |
| key for key in self._REQUIRED_TOKEN_FIELDS if key not in token |
| ] |
| if missing_fields: |
| raise LoginTokenLoadError( |
| error_msg=f'Failed to load access token from token cache, missing required fields: {", ".join(missing_fields)}.' |
| ) |
|
|
| return self._token_to_credentials(token) |
|
|
| def refresh_credentials(self): |
| """Refreshes login credentials, including saving them to the cache.""" |
| if self.feature_ids: |
| register_feature_ids(self.feature_ids) |
| |
| token = self._token_loader.load_token(self._session_name) |
| private_key = self._load_private_key(token) |
|
|
| |
| if ( |
| token |
| and 'accessToken' in token |
| and 'expiresAt' in token['accessToken'] |
| ): |
| expiry_time = _parse_if_needed(token['accessToken']['expiresAt']) |
| remaining_time = total_seconds(expiry_time - self._time_fetcher()) |
| if remaining_time > self._REFRESH_THRESHOLD: |
| return self._token_to_credentials(token) |
|
|
| config = botocore.config.Config( |
| signature_version=botocore.UNSIGNED, |
| ) |
| client = self._client_creator( |
| 'signin', |
| config=config, |
| ) |
|
|
| client.meta.events.register( |
| 'before-call.signin.CreateOAuth2Token', |
| _build_add_dpop_header_handler(private_key), |
| ) |
|
|
| try: |
| response = client.create_o_auth2_token( |
| tokenInput={ |
| 'clientId': token['clientId'], |
| 'refreshToken': token['refreshToken'], |
| 'grantType': 'refresh_token', |
| }, |
| ) |
| except client.exceptions.AccessDeniedException as e: |
| error_type = e.response.get('error', '') |
| if error_type in ('TOKEN_EXPIRED', 'USER_CREDENTIALS_CHANGED'): |
| raise LoginRefreshRequired() from e |
| elif error_type == 'INSUFFICIENT_PERMISSIONS': |
| raise LoginInsufficientPermissions() from e |
| raise LoginError() from e |
|
|
| if response is None or 'tokenOutput' not in response: |
| raise LoginTokenLoadError( |
| error_msg=( |
| "Unable to refresh access token due to an invalid service response. " |
| "Please try running 'aws login' again. If the issue persists, there " |
| "may be a temporary signin service problem." |
| ) |
| ) |
|
|
| output = response.get('tokenOutput') |
|
|
| expires_timestamp = self._time_fetcher().astimezone( |
| tzutc() |
| ) + datetime.timedelta(seconds=output['expiresIn']) |
|
|
| |
| token.update( |
| { |
| 'accessToken': { |
| 'accessKeyId': output['accessToken']['accessKeyId'], |
| 'secretAccessKey': output['accessToken'][ |
| 'secretAccessKey' |
| ], |
| 'sessionToken': output['accessToken']['sessionToken'], |
| 'accountId': token['accessToken']['accountId'], |
| 'expiresAt': expires_timestamp.strftime( |
| '%Y-%m-%dT%H:%M:%SZ' |
| ), |
| }, |
| 'refreshToken': output['refreshToken'], |
| } |
| ) |
| self._token_loader.save_token(self._session_name, token) |
|
|
| return self._token_to_credentials(token) |
|
|
| @staticmethod |
| def _token_to_credentials(token): |
| return { |
| 'access_key': token['accessToken']['accessKeyId'], |
| 'secret_key': token['accessToken']['secretAccessKey'], |
| 'token': token['accessToken']['sessionToken'], |
| 'expiry_time': token['accessToken']['expiresAt'], |
| 'account_id': token['accessToken']['accountId'], |
| } |
|
|
| @staticmethod |
| def _load_private_key(token): |
| if 'dpopKey' not in token: |
| raise LoginTokenLoadError( |
| error_msg='Private key not found in cached token.' |
| ) |
|
|
| |
| lines = token['dpopKey'].splitlines() |
| content_lines = [ |
| line |
| for line in lines |
| if not line.startswith('-----BEGIN') |
| and not line.startswith('-----END') |
| ] |
|
|
| |
| contents = ''.join(content_lines).strip() |
|
|
| try: |
| return EC.new_key_from_der_data(base64.b64decode(contents)) |
| except ValueError as e: |
| raise LoginTokenLoadError( |
| error_msg='Unable to load private key from cached token.' |
| ) from e |
|
|
|
|
| class LoginProvider(CredentialProvider): |
| METHOD = 'login' |
|
|
| def __init__( |
| self, |
| load_config, |
| client_creator, |
| profile_name, |
| token_cache=None, |
| ): |
| super().__init__() |
| if token_cache is None: |
| token_cache = JSONFileCache(get_login_token_cache_directory()) |
| self._token_cache = token_cache |
|
|
| self._load_config = load_config |
| self._client_creator = client_creator |
| self._profile_name = profile_name |
| self._feature_ids = {'CREDENTIALS_PROFILE_LOGIN', 'CREDENTIALS_LOGIN'} |
|
|
| def load(self): |
| loaded_config = self._load_config() |
| profiles = loaded_config.get('profiles', {}) |
| profile_config = profiles.get(self._profile_name, {}) |
|
|
| if 'login_session' not in profile_config: |
| return None |
|
|
| if EC is None: |
| raise MissingDependencyException( |
| msg=( |
| "Using the login credential provider requires an " |
| "additional dependency. You will need to pip install " |
| "\"botocore[crt]\" before proceeding." |
| ) |
| ) |
|
|
| fetcher = LoginCredentialFetcher( |
| session_name=profile_config['login_session'], |
| token_loader=LoginTokenLoader(self._token_cache), |
| client_creator=self._client_creator, |
| time_fetcher=_local_now, |
| feature_ids=self._feature_ids, |
| ) |
|
|
| register_feature_ids(self._feature_ids) |
|
|
| |
| |
| cached_credentials = fetcher.load_cached_credentials() |
|
|
| return RefreshableCredentials( |
| access_key=cached_credentials['access_key'], |
| secret_key=cached_credentials['secret_key'], |
| token=cached_credentials['token'], |
| expiry_time=_parse_if_needed(cached_credentials['expiry_time']), |
| account_id=cached_credentials['account_id'], |
| method=self.METHOD, |
| refresh_using=fetcher.refresh_credentials, |
| time_fetcher=_local_now, |
| ) |
|
|