| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import functools |
| | import logging |
| | import random |
| | from binascii import crc32 |
| |
|
| | from botocore.exceptions import ( |
| | ChecksumError, |
| | ConnectionClosedError, |
| | ConnectionError, |
| | EndpointConnectionError, |
| | ReadTimeoutError, |
| | ) |
| |
|
| | logger = logging.getLogger(__name__) |
| | |
| | |
| | |
| | |
| | EXCEPTION_MAP = { |
| | 'GENERAL_CONNECTION_ERROR': [ |
| | ConnectionError, |
| | ConnectionClosedError, |
| | ReadTimeoutError, |
| | EndpointConnectionError, |
| | ], |
| | } |
| |
|
| |
|
| | def delay_exponential(base, growth_factor, attempts): |
| | """Calculate time to sleep based on exponential function. |
| | |
| | The format is:: |
| | |
| | base * growth_factor ^ (attempts - 1) |
| | |
| | If ``base`` is set to 'rand' then a random number between |
| | 0 and 1 will be used as the base. |
| | Base must be greater than 0, otherwise a ValueError will be |
| | raised. |
| | |
| | """ |
| | if base == 'rand': |
| | base = random.random() |
| | elif base <= 0: |
| | raise ValueError( |
| | f"The 'base' param must be greater than 0, got: {base}" |
| | ) |
| | time_to_sleep = base * (growth_factor ** (attempts - 1)) |
| | return time_to_sleep |
| |
|
| |
|
| | def create_exponential_delay_function(base, growth_factor): |
| | """Create an exponential delay function based on the attempts. |
| | |
| | This is used so that you only have to pass it the attempts |
| | parameter to calculate the delay. |
| | |
| | """ |
| | return functools.partial( |
| | delay_exponential, base=base, growth_factor=growth_factor |
| | ) |
| |
|
| |
|
| | def create_retry_handler(config, operation_name=None): |
| | checker = create_checker_from_retry_config( |
| | config, operation_name=operation_name |
| | ) |
| | action = create_retry_action_from_config( |
| | config, operation_name=operation_name |
| | ) |
| | return RetryHandler(checker=checker, action=action) |
| |
|
| |
|
| | def create_retry_action_from_config(config, operation_name=None): |
| | |
| | |
| | |
| | |
| | delay_config = config['__default__']['delay'] |
| | if delay_config['type'] == 'exponential': |
| | return create_exponential_delay_function( |
| | base=delay_config['base'], |
| | growth_factor=delay_config['growth_factor'], |
| | ) |
| |
|
| |
|
| | def create_checker_from_retry_config(config, operation_name=None): |
| | checkers = [] |
| | max_attempts = None |
| | retryable_exceptions = [] |
| | if '__default__' in config: |
| | policies = config['__default__'].get('policies', []) |
| | max_attempts = config['__default__']['max_attempts'] |
| | for key in policies: |
| | current_config = policies[key] |
| | checkers.append(_create_single_checker(current_config)) |
| | retry_exception = _extract_retryable_exception(current_config) |
| | if retry_exception is not None: |
| | retryable_exceptions.extend(retry_exception) |
| | if operation_name is not None and config.get(operation_name) is not None: |
| | operation_policies = config[operation_name]['policies'] |
| | for key in operation_policies: |
| | checkers.append(_create_single_checker(operation_policies[key])) |
| | retry_exception = _extract_retryable_exception( |
| | operation_policies[key] |
| | ) |
| | if retry_exception is not None: |
| | retryable_exceptions.extend(retry_exception) |
| | if len(checkers) == 1: |
| | |
| | return MaxAttemptsDecorator(checkers[0], max_attempts=max_attempts) |
| | else: |
| | multi_checker = MultiChecker(checkers) |
| | return MaxAttemptsDecorator( |
| | multi_checker, |
| | max_attempts=max_attempts, |
| | retryable_exceptions=tuple(retryable_exceptions), |
| | ) |
| |
|
| |
|
| | def _create_single_checker(config): |
| | if 'response' in config['applies_when']: |
| | return _create_single_response_checker( |
| | config['applies_when']['response'] |
| | ) |
| | elif 'socket_errors' in config['applies_when']: |
| | return ExceptionRaiser() |
| |
|
| |
|
| | def _create_single_response_checker(response): |
| | if 'service_error_code' in response: |
| | checker = ServiceErrorCodeChecker( |
| | status_code=response['http_status_code'], |
| | error_code=response['service_error_code'], |
| | ) |
| | elif 'http_status_code' in response: |
| | checker = HTTPStatusCodeChecker( |
| | status_code=response['http_status_code'] |
| | ) |
| | elif 'crc32body' in response: |
| | checker = CRC32Checker(header=response['crc32body']) |
| | else: |
| | |
| | raise ValueError("Unknown retry policy") |
| | return checker |
| |
|
| |
|
| | def _extract_retryable_exception(config): |
| | applies_when = config['applies_when'] |
| | if 'crc32body' in applies_when.get('response', {}): |
| | return [ChecksumError] |
| | elif 'socket_errors' in applies_when: |
| | exceptions = [] |
| | for name in applies_when['socket_errors']: |
| | exceptions.extend(EXCEPTION_MAP[name]) |
| | return exceptions |
| |
|
| |
|
| | class RetryHandler: |
| | """Retry handler. |
| | |
| | The retry handler takes two params, ``checker`` object |
| | and an ``action`` object. |
| | |
| | The ``checker`` object must be a callable object and based on a response |
| | and an attempt number, determines whether or not sufficient criteria for |
| | a retry has been met. If this is the case then the ``action`` object |
| | (which also is a callable) determines what needs to happen in the event |
| | of a retry. |
| | |
| | """ |
| |
|
| | def __init__(self, checker, action): |
| | self._checker = checker |
| | self._action = action |
| |
|
| | def __call__(self, attempts, response, caught_exception, **kwargs): |
| | """Handler for a retry. |
| | |
| | Intended to be hooked up to an event handler (hence the **kwargs), |
| | this will process retries appropriately. |
| | |
| | """ |
| | checker_kwargs = { |
| | 'attempt_number': attempts, |
| | 'response': response, |
| | 'caught_exception': caught_exception, |
| | } |
| | if isinstance(self._checker, MaxAttemptsDecorator): |
| | retries_context = kwargs['request_dict']['context'].get('retries') |
| | checker_kwargs.update({'retries_context': retries_context}) |
| |
|
| | if self._checker(**checker_kwargs): |
| | result = self._action(attempts=attempts) |
| | logger.debug("Retry needed, action of: %s", result) |
| | return result |
| | logger.debug("No retry needed.") |
| |
|
| |
|
| | class BaseChecker: |
| | """Base class for retry checkers. |
| | |
| | Each class is responsible for checking a single criteria that determines |
| | whether or not a retry should not happen. |
| | |
| | """ |
| |
|
| | def __call__(self, attempt_number, response, caught_exception): |
| | """Determine if retry criteria matches. |
| | |
| | Note that either ``response`` is not None and ``caught_exception`` is |
| | None or ``response`` is None and ``caught_exception`` is not None. |
| | |
| | :type attempt_number: int |
| | :param attempt_number: The total number of times we've attempted |
| | to send the request. |
| | |
| | :param response: The HTTP response (if one was received). |
| | |
| | :type caught_exception: Exception |
| | :param caught_exception: Any exception that was caught while trying to |
| | send the HTTP response. |
| | |
| | :return: True, if the retry criteria matches (and therefore a retry |
| | should occur. False if the criteria does not match. |
| | |
| | """ |
| | |
| | |
| | if response is not None: |
| | return self._check_response(attempt_number, response) |
| | elif caught_exception is not None: |
| | return self._check_caught_exception( |
| | attempt_number, caught_exception |
| | ) |
| | else: |
| | raise ValueError("Both response and caught_exception are None.") |
| |
|
| | def _check_response(self, attempt_number, response): |
| | pass |
| |
|
| | def _check_caught_exception(self, attempt_number, caught_exception): |
| | pass |
| |
|
| |
|
| | class MaxAttemptsDecorator(BaseChecker): |
| | """Allow retries up to a maximum number of attempts. |
| | |
| | This will pass through calls to the decorated retry checker, provided |
| | that the number of attempts does not exceed max_attempts. It will |
| | also catch any retryable_exceptions passed in. Once max_attempts has |
| | been exceeded, then False will be returned or the retryable_exceptions |
| | that was previously being caught will be raised. |
| | |
| | """ |
| |
|
| | def __init__(self, checker, max_attempts, retryable_exceptions=None): |
| | self._checker = checker |
| | self._max_attempts = max_attempts |
| | self._retryable_exceptions = retryable_exceptions |
| |
|
| | def __call__( |
| | self, attempt_number, response, caught_exception, retries_context |
| | ): |
| | if retries_context: |
| | retries_context['max'] = max( |
| | retries_context.get('max', 0), self._max_attempts |
| | ) |
| |
|
| | should_retry = self._should_retry( |
| | attempt_number, response, caught_exception |
| | ) |
| | if should_retry: |
| | if attempt_number >= self._max_attempts: |
| | |
| | if response is not None and 'ResponseMetadata' in response[1]: |
| | response[1]['ResponseMetadata'][ |
| | 'MaxAttemptsReached' |
| | ] = True |
| | logger.debug( |
| | "Reached the maximum number of retry attempts: %s", |
| | attempt_number, |
| | ) |
| | return False |
| | else: |
| | return should_retry |
| | else: |
| | return False |
| |
|
| | def _should_retry(self, attempt_number, response, caught_exception): |
| | if self._retryable_exceptions and attempt_number < self._max_attempts: |
| | try: |
| | return self._checker( |
| | attempt_number, response, caught_exception |
| | ) |
| | except self._retryable_exceptions as e: |
| | logger.debug( |
| | "retry needed, retryable exception caught: %s", |
| | e, |
| | exc_info=True, |
| | ) |
| | return True |
| | else: |
| | |
| | |
| | return self._checker(attempt_number, response, caught_exception) |
| |
|
| |
|
| | class HTTPStatusCodeChecker(BaseChecker): |
| | def __init__(self, status_code): |
| | self._status_code = status_code |
| |
|
| | def _check_response(self, attempt_number, response): |
| | if response[0].status_code == self._status_code: |
| | logger.debug( |
| | "retry needed: retryable HTTP status code received: %s", |
| | self._status_code, |
| | ) |
| | return True |
| | else: |
| | return False |
| |
|
| |
|
| | class ServiceErrorCodeChecker(BaseChecker): |
| | def __init__(self, status_code, error_code): |
| | self._status_code = status_code |
| | self._error_code = error_code |
| |
|
| | def _check_response(self, attempt_number, response): |
| | if response[0].status_code == self._status_code: |
| | actual_error_code = response[1].get('Error', {}).get('Code') |
| | if actual_error_code == self._error_code: |
| | logger.debug( |
| | "retry needed: matching HTTP status and error code seen: " |
| | "%s, %s", |
| | self._status_code, |
| | self._error_code, |
| | ) |
| | return True |
| | return False |
| |
|
| |
|
| | class MultiChecker(BaseChecker): |
| | def __init__(self, checkers): |
| | self._checkers = checkers |
| |
|
| | def __call__(self, attempt_number, response, caught_exception): |
| | for checker in self._checkers: |
| | checker_response = checker( |
| | attempt_number, response, caught_exception |
| | ) |
| | if checker_response: |
| | return checker_response |
| | return False |
| |
|
| |
|
| | class CRC32Checker(BaseChecker): |
| | def __init__(self, header): |
| | |
| | self._header_name = header |
| |
|
| | def _check_response(self, attempt_number, response): |
| | http_response = response[0] |
| | expected_crc = http_response.headers.get(self._header_name) |
| | if expected_crc is None: |
| | logger.debug( |
| | "crc32 check skipped, the %s header is not " |
| | "in the http response.", |
| | self._header_name, |
| | ) |
| | else: |
| | actual_crc32 = crc32(response[0].content) & 0xFFFFFFFF |
| | if not actual_crc32 == int(expected_crc): |
| | logger.debug( |
| | "retry needed: crc32 check failed, expected != actual: " |
| | "%s != %s", |
| | int(expected_crc), |
| | actual_crc32, |
| | ) |
| | raise ChecksumError( |
| | checksum_type='crc32', |
| | expected_checksum=int(expected_crc), |
| | actual_checksum=actual_crc32, |
| | ) |
| |
|
| |
|
| | class ExceptionRaiser(BaseChecker): |
| | """Raise any caught exceptions. |
| | |
| | This class will raise any non None ``caught_exception``. |
| | |
| | """ |
| |
|
| | def _check_caught_exception(self, attempt_number, caught_exception): |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | raise caught_exception |
| |
|