| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import functools |
| import logging |
| import random |
| from binascii import crc32 |
|
|
| from botocore.exceptions import ( |
| ChecksumError, |
| ConnectionClosedError, |
| ConnectionError, |
| EndpointConnectionError, |
| ReadTimeoutError, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
| |
| |
| |
| |
| EXCEPTION_MAP = { |
| 'GENERAL_CONNECTION_ERROR': [ |
| ConnectionError, |
| ConnectionClosedError, |
| ReadTimeoutError, |
| EndpointConnectionError, |
| ], |
| } |
|
|
|
|
| def delay_exponential(base, growth_factor, attempts): |
| """Calculate time to sleep based on exponential function. |
| |
| The format is:: |
| |
| base * growth_factor ^ (attempts - 1) |
| |
| If ``base`` is set to 'rand' then a random number between |
| 0 and 1 will be used as the base. |
| Base must be greater than 0, otherwise a ValueError will be |
| raised. |
| |
| """ |
| if base == 'rand': |
| base = random.random() |
| elif base <= 0: |
| raise ValueError( |
| f"The 'base' param must be greater than 0, got: {base}" |
| ) |
| time_to_sleep = base * (growth_factor ** (attempts - 1)) |
| return time_to_sleep |
|
|
|
|
| def create_exponential_delay_function(base, growth_factor): |
| """Create an exponential delay function based on the attempts. |
| |
| This is used so that you only have to pass it the attempts |
| parameter to calculate the delay. |
| |
| """ |
| return functools.partial( |
| delay_exponential, base=base, growth_factor=growth_factor |
| ) |
|
|
|
|
| def create_retry_handler(config, operation_name=None): |
| checker = create_checker_from_retry_config( |
| config, operation_name=operation_name |
| ) |
| action = create_retry_action_from_config( |
| config, operation_name=operation_name |
| ) |
| return RetryHandler(checker=checker, action=action) |
|
|
|
|
| def create_retry_action_from_config(config, operation_name=None): |
| |
| |
| |
| |
| delay_config = config['__default__']['delay'] |
| if delay_config['type'] == 'exponential': |
| return create_exponential_delay_function( |
| base=delay_config['base'], |
| growth_factor=delay_config['growth_factor'], |
| ) |
|
|
|
|
| def create_checker_from_retry_config(config, operation_name=None): |
| checkers = [] |
| max_attempts = None |
| retryable_exceptions = [] |
| if '__default__' in config: |
| policies = config['__default__'].get('policies', []) |
| max_attempts = config['__default__']['max_attempts'] |
| for key in policies: |
| current_config = policies[key] |
| checkers.append(_create_single_checker(current_config)) |
| retry_exception = _extract_retryable_exception(current_config) |
| if retry_exception is not None: |
| retryable_exceptions.extend(retry_exception) |
| if operation_name is not None and config.get(operation_name) is not None: |
| operation_policies = config[operation_name]['policies'] |
| for key in operation_policies: |
| checkers.append(_create_single_checker(operation_policies[key])) |
| retry_exception = _extract_retryable_exception( |
| operation_policies[key] |
| ) |
| if retry_exception is not None: |
| retryable_exceptions.extend(retry_exception) |
| if len(checkers) == 1: |
| |
| return MaxAttemptsDecorator(checkers[0], max_attempts=max_attempts) |
| else: |
| multi_checker = MultiChecker(checkers) |
| return MaxAttemptsDecorator( |
| multi_checker, |
| max_attempts=max_attempts, |
| retryable_exceptions=tuple(retryable_exceptions), |
| ) |
|
|
|
|
| def _create_single_checker(config): |
| if 'response' in config['applies_when']: |
| return _create_single_response_checker( |
| config['applies_when']['response'] |
| ) |
| elif 'socket_errors' in config['applies_when']: |
| return ExceptionRaiser() |
|
|
|
|
| def _create_single_response_checker(response): |
| if 'service_error_code' in response: |
| checker = ServiceErrorCodeChecker( |
| status_code=response['http_status_code'], |
| error_code=response['service_error_code'], |
| ) |
| elif 'http_status_code' in response: |
| checker = HTTPStatusCodeChecker( |
| status_code=response['http_status_code'] |
| ) |
| elif 'crc32body' in response: |
| checker = CRC32Checker(header=response['crc32body']) |
| else: |
| |
| raise ValueError("Unknown retry policy") |
| return checker |
|
|
|
|
| def _extract_retryable_exception(config): |
| applies_when = config['applies_when'] |
| if 'crc32body' in applies_when.get('response', {}): |
| return [ChecksumError] |
| elif 'socket_errors' in applies_when: |
| exceptions = [] |
| for name in applies_when['socket_errors']: |
| exceptions.extend(EXCEPTION_MAP[name]) |
| return exceptions |
|
|
|
|
| class RetryHandler: |
| """Retry handler. |
| |
| The retry handler takes two params, ``checker`` object |
| and an ``action`` object. |
| |
| The ``checker`` object must be a callable object and based on a response |
| and an attempt number, determines whether or not sufficient criteria for |
| a retry has been met. If this is the case then the ``action`` object |
| (which also is a callable) determines what needs to happen in the event |
| of a retry. |
| |
| """ |
|
|
| def __init__(self, checker, action): |
| self._checker = checker |
| self._action = action |
|
|
| def __call__(self, attempts, response, caught_exception, **kwargs): |
| """Handler for a retry. |
| |
| Intended to be hooked up to an event handler (hence the **kwargs), |
| this will process retries appropriately. |
| |
| """ |
| checker_kwargs = { |
| 'attempt_number': attempts, |
| 'response': response, |
| 'caught_exception': caught_exception, |
| } |
| if isinstance(self._checker, MaxAttemptsDecorator): |
| retries_context = kwargs['request_dict']['context'].get('retries') |
| checker_kwargs.update({'retries_context': retries_context}) |
|
|
| if self._checker(**checker_kwargs): |
| result = self._action(attempts=attempts) |
| logger.debug("Retry needed, action of: %s", result) |
| return result |
| logger.debug("No retry needed.") |
|
|
|
|
| class BaseChecker: |
| """Base class for retry checkers. |
| |
| Each class is responsible for checking a single criteria that determines |
| whether or not a retry should not happen. |
| |
| """ |
|
|
| def __call__(self, attempt_number, response, caught_exception): |
| """Determine if retry criteria matches. |
| |
| Note that either ``response`` is not None and ``caught_exception`` is |
| None or ``response`` is None and ``caught_exception`` is not None. |
| |
| :type attempt_number: int |
| :param attempt_number: The total number of times we've attempted |
| to send the request. |
| |
| :param response: The HTTP response (if one was received). |
| |
| :type caught_exception: Exception |
| :param caught_exception: Any exception that was caught while trying to |
| send the HTTP response. |
| |
| :return: True, if the retry criteria matches (and therefore a retry |
| should occur. False if the criteria does not match. |
| |
| """ |
| |
| |
| if response is not None: |
| return self._check_response(attempt_number, response) |
| elif caught_exception is not None: |
| return self._check_caught_exception( |
| attempt_number, caught_exception |
| ) |
| else: |
| raise ValueError("Both response and caught_exception are None.") |
|
|
| def _check_response(self, attempt_number, response): |
| pass |
|
|
| def _check_caught_exception(self, attempt_number, caught_exception): |
| pass |
|
|
|
|
| class MaxAttemptsDecorator(BaseChecker): |
| """Allow retries up to a maximum number of attempts. |
| |
| This will pass through calls to the decorated retry checker, provided |
| that the number of attempts does not exceed max_attempts. It will |
| also catch any retryable_exceptions passed in. Once max_attempts has |
| been exceeded, then False will be returned or the retryable_exceptions |
| that was previously being caught will be raised. |
| |
| """ |
|
|
| def __init__(self, checker, max_attempts, retryable_exceptions=None): |
| self._checker = checker |
| self._max_attempts = max_attempts |
| self._retryable_exceptions = retryable_exceptions |
|
|
| def __call__( |
| self, attempt_number, response, caught_exception, retries_context |
| ): |
| if retries_context: |
| retries_context['max'] = max( |
| retries_context.get('max', 0), self._max_attempts |
| ) |
|
|
| should_retry = self._should_retry( |
| attempt_number, response, caught_exception |
| ) |
| if should_retry: |
| if attempt_number >= self._max_attempts: |
| |
| if response is not None and 'ResponseMetadata' in response[1]: |
| response[1]['ResponseMetadata']['MaxAttemptsReached'] = ( |
| True |
| ) |
| logger.debug( |
| "Reached the maximum number of retry attempts: %s", |
| attempt_number, |
| ) |
| return False |
| else: |
| return should_retry |
| else: |
| return False |
|
|
| def _should_retry(self, attempt_number, response, caught_exception): |
| if self._retryable_exceptions and attempt_number < self._max_attempts: |
| try: |
| return self._checker( |
| attempt_number, response, caught_exception |
| ) |
| except self._retryable_exceptions as e: |
| logger.debug( |
| "retry needed, retryable exception caught: %s", |
| e, |
| exc_info=True, |
| ) |
| return True |
| else: |
| |
| |
| return self._checker(attempt_number, response, caught_exception) |
|
|
|
|
| class HTTPStatusCodeChecker(BaseChecker): |
| def __init__(self, status_code): |
| self._status_code = status_code |
|
|
| def _check_response(self, attempt_number, response): |
| if response[0].status_code == self._status_code: |
| logger.debug( |
| "retry needed: retryable HTTP status code received: %s", |
| self._status_code, |
| ) |
| return True |
| else: |
| return False |
|
|
|
|
| class ServiceErrorCodeChecker(BaseChecker): |
| def __init__(self, status_code, error_code): |
| self._status_code = status_code |
| self._error_code = error_code |
|
|
| def _check_response(self, attempt_number, response): |
| if response[0].status_code == self._status_code: |
| actual_error_code = response[1].get('Error', {}).get('Code') |
| if actual_error_code == self._error_code: |
| logger.debug( |
| "retry needed: matching HTTP status and error code seen: " |
| "%s, %s", |
| self._status_code, |
| self._error_code, |
| ) |
| return True |
| return False |
|
|
|
|
| class MultiChecker(BaseChecker): |
| def __init__(self, checkers): |
| self._checkers = checkers |
|
|
| def __call__(self, attempt_number, response, caught_exception): |
| for checker in self._checkers: |
| checker_response = checker( |
| attempt_number, response, caught_exception |
| ) |
| if checker_response: |
| return checker_response |
| return False |
|
|
|
|
| class CRC32Checker(BaseChecker): |
| def __init__(self, header): |
| |
| self._header_name = header |
|
|
| def _check_response(self, attempt_number, response): |
| http_response = response[0] |
| expected_crc = http_response.headers.get(self._header_name) |
| if expected_crc is None: |
| logger.debug( |
| "crc32 check skipped, the %s header is not " |
| "in the http response.", |
| self._header_name, |
| ) |
| else: |
| actual_crc32 = crc32(response[0].content) & 0xFFFFFFFF |
| if not actual_crc32 == int(expected_crc): |
| logger.debug( |
| "retry needed: crc32 check failed, expected != actual: " |
| "%s != %s", |
| int(expected_crc), |
| actual_crc32, |
| ) |
| raise ChecksumError( |
| checksum_type='crc32', |
| expected_checksum=int(expected_crc), |
| actual_checksum=actual_crc32, |
| ) |
|
|
|
|
| class ExceptionRaiser(BaseChecker): |
| """Raise any caught exceptions. |
| |
| This class will raise any non None ``caught_exception``. |
| |
| """ |
|
|
| def _check_caught_exception(self, attempt_number, caught_exception): |
| |
| |
| |
| |
| |
| |
| |
| raise caught_exception |
|
|