Kris8an's picture
Upload folder using huggingface_hub
a06facb verified
from botocore.retryhandler import (
ChecksumError,
CRC32Checker,
ExceptionRaiser,
HTTPStatusCodeChecker,
MaxAttemptsDecorator,
MultiChecker,
RetryHandler,
ServiceErrorCodeChecker,
_extract_retryable_exception,
crc32,
create_retry_action_from_config,
logger,
)
from ._helpers import resolve_awaitable
def create_retry_handler(config, operation_name=None):
checker = create_checker_from_retry_config(
config, operation_name=operation_name
)
action = create_retry_action_from_config(
config, operation_name=operation_name
)
return AioRetryHandler(checker=checker, action=action)
def create_checker_from_retry_config(config, operation_name=None):
checkers = []
max_attempts = None
retryable_exceptions = []
if '__default__' in config:
policies = config['__default__'].get('policies', [])
max_attempts = config['__default__']['max_attempts']
for key in policies:
current_config = policies[key]
checkers.append(_create_single_checker(current_config))
retry_exception = _extract_retryable_exception(current_config)
if retry_exception is not None:
retryable_exceptions.extend(retry_exception)
if operation_name is not None and config.get(operation_name) is not None:
operation_policies = config[operation_name]['policies']
for key in operation_policies:
checkers.append(_create_single_checker(operation_policies[key]))
retry_exception = _extract_retryable_exception(
operation_policies[key]
)
if retry_exception is not None:
retryable_exceptions.extend(retry_exception)
if len(checkers) == 1:
# Don't need to use a MultiChecker
return AioMaxAttemptsDecorator(checkers[0], max_attempts=max_attempts)
else:
multi_checker = AioMultiChecker(checkers)
return AioMaxAttemptsDecorator(
multi_checker,
max_attempts=max_attempts,
retryable_exceptions=tuple(retryable_exceptions),
)
def _create_single_checker(config):
if 'response' in config['applies_when']:
return _create_single_response_checker(
config['applies_when']['response']
)
elif 'socket_errors' in config['applies_when']:
return ExceptionRaiser()
def _create_single_response_checker(response):
if 'service_error_code' in response:
checker = ServiceErrorCodeChecker(
status_code=response['http_status_code'],
error_code=response['service_error_code'],
)
elif 'http_status_code' in response:
checker = HTTPStatusCodeChecker(
status_code=response['http_status_code']
)
elif 'crc32body' in response:
checker = AioCRC32Checker(header=response['crc32body'])
else:
# TODO: send a signal.
raise ValueError("Unknown retry policy")
return checker
class AioRetryHandler(RetryHandler):
async def _call(self, attempts, response, caught_exception, **kwargs):
"""Handler for a retry.
Intended to be hooked up to an event handler (hence the **kwargs),
this will process retries appropriately.
"""
checker_kwargs = {
'attempt_number': attempts,
'response': response,
'caught_exception': caught_exception,
}
if isinstance(self._checker, MaxAttemptsDecorator):
retries_context = kwargs['request_dict']['context'].get('retries')
checker_kwargs.update({'retries_context': retries_context})
if await resolve_awaitable(self._checker(**checker_kwargs)):
result = self._action(attempts=attempts)
logger.debug("Retry needed, action of: %s", result)
return result
logger.debug("No retry needed.")
def __call__(self, *args, **kwargs):
return self._call(*args, **kwargs) # return awaitable
class AioMaxAttemptsDecorator(MaxAttemptsDecorator):
async def _call(
self, attempt_number, response, caught_exception, retries_context
):
if retries_context:
retries_context['max'] = max(
retries_context.get('max', 0), self._max_attempts
)
should_retry = await self._should_retry(
attempt_number, response, caught_exception
)
if should_retry:
if attempt_number >= self._max_attempts:
# explicitly set MaxAttemptsReached
if response is not None and 'ResponseMetadata' in response[1]:
response[1]['ResponseMetadata'][
'MaxAttemptsReached'
] = True
logger.debug(
"Reached the maximum number of retry attempts: %s",
attempt_number,
)
return False
else:
return should_retry
else:
return False
def __call__(self, *args, **kwargs):
return self._call(*args, **kwargs)
async def _should_retry(self, attempt_number, response, caught_exception):
if self._retryable_exceptions and attempt_number < self._max_attempts:
try:
return await resolve_awaitable(
self._checker(attempt_number, response, caught_exception)
)
except self._retryable_exceptions as e:
logger.debug(
"retry needed, retryable exception caught: %s",
e,
exc_info=True,
)
return True
else:
# If we've exceeded the max attempts we just let the exception
# propagate if one has occurred.
return await resolve_awaitable(
self._checker(attempt_number, response, caught_exception)
)
class AioMultiChecker(MultiChecker):
async def _call(self, attempt_number, response, caught_exception):
for checker in self._checkers:
checker_response = await resolve_awaitable(
checker(attempt_number, response, caught_exception)
)
if checker_response:
return checker_response
return False
def __call__(self, *args, **kwargs):
return self._call(*args, **kwargs)
class AioCRC32Checker(CRC32Checker):
async def _call(self, attempt_number, response, caught_exception):
if response is not None:
return await self._check_response(attempt_number, response)
elif caught_exception is not None:
return self._check_caught_exception(
attempt_number, caught_exception
)
else:
raise ValueError("Both response and caught_exception are None.")
def __call__(self, *args, **kwargs):
return self._call(*args, **kwargs)
async def _check_response(self, attempt_number, response):
http_response = response[0]
expected_crc = http_response.headers.get(self._header_name)
if expected_crc is None:
logger.debug(
"crc32 check skipped, the %s header is not "
"in the http response.",
self._header_name,
)
else:
actual_crc32 = crc32(await response[0].content) & 0xFFFFFFFF
if not actual_crc32 == int(expected_crc):
logger.debug(
"retry needed: crc32 check failed, expected != actual: "
"%s != %s",
int(expected_crc),
actual_crc32,
)
raise ChecksumError(
checksum_type='crc32',
expected_checksum=int(expected_crc),
actual_checksum=actual_crc32,
)