|
|
from botocore.retryhandler import ( |
|
|
ChecksumError, |
|
|
CRC32Checker, |
|
|
ExceptionRaiser, |
|
|
HTTPStatusCodeChecker, |
|
|
MaxAttemptsDecorator, |
|
|
MultiChecker, |
|
|
RetryHandler, |
|
|
ServiceErrorCodeChecker, |
|
|
_extract_retryable_exception, |
|
|
crc32, |
|
|
create_retry_action_from_config, |
|
|
logger, |
|
|
) |
|
|
|
|
|
from ._helpers import resolve_awaitable |
|
|
|
|
|
|
|
|
def create_retry_handler(config, operation_name=None): |
|
|
checker = create_checker_from_retry_config( |
|
|
config, operation_name=operation_name |
|
|
) |
|
|
action = create_retry_action_from_config( |
|
|
config, operation_name=operation_name |
|
|
) |
|
|
return AioRetryHandler(checker=checker, action=action) |
|
|
|
|
|
|
|
|
def create_checker_from_retry_config(config, operation_name=None): |
|
|
checkers = [] |
|
|
max_attempts = None |
|
|
retryable_exceptions = [] |
|
|
if '__default__' in config: |
|
|
policies = config['__default__'].get('policies', []) |
|
|
max_attempts = config['__default__']['max_attempts'] |
|
|
for key in policies: |
|
|
current_config = policies[key] |
|
|
checkers.append(_create_single_checker(current_config)) |
|
|
retry_exception = _extract_retryable_exception(current_config) |
|
|
if retry_exception is not None: |
|
|
retryable_exceptions.extend(retry_exception) |
|
|
if operation_name is not None and config.get(operation_name) is not None: |
|
|
operation_policies = config[operation_name]['policies'] |
|
|
for key in operation_policies: |
|
|
checkers.append(_create_single_checker(operation_policies[key])) |
|
|
retry_exception = _extract_retryable_exception( |
|
|
operation_policies[key] |
|
|
) |
|
|
if retry_exception is not None: |
|
|
retryable_exceptions.extend(retry_exception) |
|
|
if len(checkers) == 1: |
|
|
|
|
|
return AioMaxAttemptsDecorator(checkers[0], max_attempts=max_attempts) |
|
|
else: |
|
|
multi_checker = AioMultiChecker(checkers) |
|
|
return AioMaxAttemptsDecorator( |
|
|
multi_checker, |
|
|
max_attempts=max_attempts, |
|
|
retryable_exceptions=tuple(retryable_exceptions), |
|
|
) |
|
|
|
|
|
|
|
|
def _create_single_checker(config): |
|
|
if 'response' in config['applies_when']: |
|
|
return _create_single_response_checker( |
|
|
config['applies_when']['response'] |
|
|
) |
|
|
elif 'socket_errors' in config['applies_when']: |
|
|
return ExceptionRaiser() |
|
|
|
|
|
|
|
|
def _create_single_response_checker(response): |
|
|
if 'service_error_code' in response: |
|
|
checker = ServiceErrorCodeChecker( |
|
|
status_code=response['http_status_code'], |
|
|
error_code=response['service_error_code'], |
|
|
) |
|
|
elif 'http_status_code' in response: |
|
|
checker = HTTPStatusCodeChecker( |
|
|
status_code=response['http_status_code'] |
|
|
) |
|
|
elif 'crc32body' in response: |
|
|
checker = AioCRC32Checker(header=response['crc32body']) |
|
|
else: |
|
|
|
|
|
raise ValueError("Unknown retry policy") |
|
|
return checker |
|
|
|
|
|
|
|
|
class AioRetryHandler(RetryHandler): |
|
|
async def _call(self, attempts, response, caught_exception, **kwargs): |
|
|
"""Handler for a retry. |
|
|
|
|
|
Intended to be hooked up to an event handler (hence the **kwargs), |
|
|
this will process retries appropriately. |
|
|
|
|
|
""" |
|
|
checker_kwargs = { |
|
|
'attempt_number': attempts, |
|
|
'response': response, |
|
|
'caught_exception': caught_exception, |
|
|
} |
|
|
if isinstance(self._checker, MaxAttemptsDecorator): |
|
|
retries_context = kwargs['request_dict']['context'].get('retries') |
|
|
checker_kwargs.update({'retries_context': retries_context}) |
|
|
|
|
|
if await resolve_awaitable(self._checker(**checker_kwargs)): |
|
|
result = self._action(attempts=attempts) |
|
|
logger.debug("Retry needed, action of: %s", result) |
|
|
return result |
|
|
logger.debug("No retry needed.") |
|
|
|
|
|
def __call__(self, *args, **kwargs): |
|
|
return self._call(*args, **kwargs) |
|
|
|
|
|
|
|
|
class AioMaxAttemptsDecorator(MaxAttemptsDecorator): |
|
|
async def _call( |
|
|
self, attempt_number, response, caught_exception, retries_context |
|
|
): |
|
|
if retries_context: |
|
|
retries_context['max'] = max( |
|
|
retries_context.get('max', 0), self._max_attempts |
|
|
) |
|
|
|
|
|
should_retry = await self._should_retry( |
|
|
attempt_number, response, caught_exception |
|
|
) |
|
|
if should_retry: |
|
|
if attempt_number >= self._max_attempts: |
|
|
|
|
|
if response is not None and 'ResponseMetadata' in response[1]: |
|
|
response[1]['ResponseMetadata'][ |
|
|
'MaxAttemptsReached' |
|
|
] = True |
|
|
logger.debug( |
|
|
"Reached the maximum number of retry attempts: %s", |
|
|
attempt_number, |
|
|
) |
|
|
return False |
|
|
else: |
|
|
return should_retry |
|
|
else: |
|
|
return False |
|
|
|
|
|
def __call__(self, *args, **kwargs): |
|
|
return self._call(*args, **kwargs) |
|
|
|
|
|
async def _should_retry(self, attempt_number, response, caught_exception): |
|
|
if self._retryable_exceptions and attempt_number < self._max_attempts: |
|
|
try: |
|
|
return await resolve_awaitable( |
|
|
self._checker(attempt_number, response, caught_exception) |
|
|
) |
|
|
except self._retryable_exceptions as e: |
|
|
logger.debug( |
|
|
"retry needed, retryable exception caught: %s", |
|
|
e, |
|
|
exc_info=True, |
|
|
) |
|
|
return True |
|
|
else: |
|
|
|
|
|
|
|
|
return await resolve_awaitable( |
|
|
self._checker(attempt_number, response, caught_exception) |
|
|
) |
|
|
|
|
|
|
|
|
class AioMultiChecker(MultiChecker): |
|
|
async def _call(self, attempt_number, response, caught_exception): |
|
|
for checker in self._checkers: |
|
|
checker_response = await resolve_awaitable( |
|
|
checker(attempt_number, response, caught_exception) |
|
|
) |
|
|
if checker_response: |
|
|
return checker_response |
|
|
return False |
|
|
|
|
|
def __call__(self, *args, **kwargs): |
|
|
return self._call(*args, **kwargs) |
|
|
|
|
|
|
|
|
class AioCRC32Checker(CRC32Checker): |
|
|
async def _call(self, attempt_number, response, caught_exception): |
|
|
if response is not None: |
|
|
return await self._check_response(attempt_number, response) |
|
|
elif caught_exception is not None: |
|
|
return self._check_caught_exception( |
|
|
attempt_number, caught_exception |
|
|
) |
|
|
else: |
|
|
raise ValueError("Both response and caught_exception are None.") |
|
|
|
|
|
def __call__(self, *args, **kwargs): |
|
|
return self._call(*args, **kwargs) |
|
|
|
|
|
async def _check_response(self, attempt_number, response): |
|
|
http_response = response[0] |
|
|
expected_crc = http_response.headers.get(self._header_name) |
|
|
if expected_crc is None: |
|
|
logger.debug( |
|
|
"crc32 check skipped, the %s header is not " |
|
|
"in the http response.", |
|
|
self._header_name, |
|
|
) |
|
|
else: |
|
|
actual_crc32 = crc32(await response[0].content) & 0xFFFFFFFF |
|
|
if not actual_crc32 == int(expected_crc): |
|
|
logger.debug( |
|
|
"retry needed: crc32 check failed, expected != actual: " |
|
|
"%s != %s", |
|
|
int(expected_crc), |
|
|
actual_crc32, |
|
|
) |
|
|
raise ChecksumError( |
|
|
checksum_type='crc32', |
|
|
expected_checksum=int(expected_crc), |
|
|
actual_checksum=actual_crc32, |
|
|
) |
|
|
|