Kris8an's picture
Upload folder using huggingface_hub
a06facb verified
import asyncio
from botocore.endpoint import (
DEFAULT_TIMEOUT,
MAX_POOL_CONNECTIONS,
Endpoint,
EndpointCreator,
HTTPClientError,
create_request_object,
history_recorder,
is_valid_endpoint_url,
is_valid_ipv6_endpoint_url,
logger,
)
from botocore.hooks import first_non_none_response
from urllib3.response import HTTPHeaderDict
from aiobotocore.httpchecksum import handle_checksum_body
from aiobotocore.httpsession import AIOHTTPSession
from aiobotocore.response import StreamingBody
async def convert_to_response_dict(http_response, operation_model):
"""Convert an HTTP response object to a request dict.
This converts the requests library's HTTP response object to
a dictionary.
:type http_response: botocore.vendored.requests.model.Response
:param http_response: The HTTP response from an AWS service request.
:rtype: dict
:return: A response dictionary which will contain the following keys:
* headers (dict)
* status_code (int)
* body (string or file-like object)
"""
response_dict = {
# botocore converts keys to str, so make sure that they are in
# the expected case. See detailed discussion here:
# https://github.com/aio-libs/aiobotocore/pull/116
# aiohttp's CIMultiDict camel cases the headers :(
'headers': HTTPHeaderDict(
{
k.decode('utf-8').lower(): v.decode('utf-8')
for k, v in http_response.raw.raw_headers
}
),
'status_code': http_response.status_code,
'context': {
'operation_name': operation_model.name,
},
}
if response_dict['status_code'] >= 300:
response_dict['body'] = await http_response.content
elif operation_model.has_event_stream_output:
response_dict['body'] = http_response.raw
elif operation_model.has_streaming_output:
length = response_dict['headers'].get('content-length')
response_dict['body'] = StreamingBody(http_response.raw, length)
else:
response_dict['body'] = await http_response.content
return response_dict
class AioEndpoint(Endpoint):
async def close(self):
await self.http_session.close()
async def create_request(self, params, operation_model=None):
request = create_request_object(params)
if operation_model:
request.stream_output = any(
[
operation_model.has_streaming_output,
operation_model.has_event_stream_output,
]
)
service_id = operation_model.service_model.service_id.hyphenize()
event_name = 'request-created.{service_id}.{op_name}'.format(
service_id=service_id, op_name=operation_model.name
)
await self._event_emitter.emit(
event_name,
request=request,
operation_name=operation_model.name,
)
prepared_request = self.prepare_request(request)
return prepared_request
async def _send_request(self, request_dict, operation_model):
attempts = 1
context = request_dict['context']
self._update_retries_context(context, attempts)
request = await self.create_request(request_dict, operation_model)
success_response, exception = await self._get_response(
request, operation_model, context
)
while await self._needs_retry(
attempts,
operation_model,
request_dict,
success_response,
exception,
):
attempts += 1
self._update_retries_context(context, attempts, success_response)
# If there is a stream associated with the request, we need
# to reset it before attempting to send the request again.
# This will ensure that we resend the entire contents of the
# body.
request.reset_stream()
# Create a new request when retried (including a new signature).
request = await self.create_request(request_dict, operation_model)
success_response, exception = await self._get_response(
request, operation_model, context
)
if (
success_response is not None
and 'ResponseMetadata' in success_response[1]
):
# We want to share num retries, not num attempts.
total_retries = attempts - 1
success_response[1]['ResponseMetadata'][
'RetryAttempts'
] = total_retries
if exception is not None:
raise exception
else:
return success_response
async def _get_response(self, request, operation_model, context):
# This will return a tuple of (success_response, exception)
# and success_response is itself a tuple of
# (http_response, parsed_dict).
# If an exception occurs then the success_response is None.
# If no exception occurs then exception is None.
success_response, exception = await self._do_get_response(
request, operation_model, context
)
kwargs_to_emit = {
'response_dict': None,
'parsed_response': None,
'context': context,
'exception': exception,
}
if success_response is not None:
http_response, parsed_response = success_response
kwargs_to_emit['parsed_response'] = parsed_response
kwargs_to_emit['response_dict'] = await convert_to_response_dict(
http_response, operation_model
)
service_id = operation_model.service_model.service_id.hyphenize()
await self._event_emitter.emit(
f"response-received.{service_id}.{operation_model.name}",
**kwargs_to_emit,
)
return success_response, exception
async def _do_get_response(self, request, operation_model, context):
try:
logger.debug("Sending http request: %s", request)
history_recorder.record(
'HTTP_REQUEST',
{
'method': request.method,
'headers': request.headers,
'streaming': operation_model.has_streaming_input,
'url': request.url,
'body': request.body,
},
)
service_id = operation_model.service_model.service_id.hyphenize()
event_name = f"before-send.{service_id}.{operation_model.name}"
responses = await self._event_emitter.emit(
event_name, request=request
)
http_response = first_non_none_response(responses)
if http_response is None:
http_response = await self._send(request)
except HTTPClientError as e:
return (None, e)
except Exception as e:
logger.debug(
"Exception received when sending HTTP request.", exc_info=True
)
return (None, e)
# This returns the http_response and the parsed_data.
response_dict = await convert_to_response_dict(
http_response, operation_model
)
await handle_checksum_body(
http_response,
response_dict,
context,
operation_model,
)
http_response_record_dict = response_dict.copy()
http_response_record_dict[
'streaming'
] = operation_model.has_streaming_output
history_recorder.record('HTTP_RESPONSE', http_response_record_dict)
protocol = operation_model.metadata['protocol']
parser = self._response_parser_factory.create_parser(protocol)
if asyncio.iscoroutinefunction(parser.parse):
parsed_response = await parser.parse(
response_dict, operation_model.output_shape
)
else:
parsed_response = parser.parse(
response_dict, operation_model.output_shape
)
if http_response.status_code >= 300:
await self._add_modeled_error_fields(
response_dict,
parsed_response,
operation_model,
parser,
)
history_recorder.record('PARSED_RESPONSE', parsed_response)
return (http_response, parsed_response), None
async def _add_modeled_error_fields(
self,
response_dict,
parsed_response,
operation_model,
parser,
):
error_code = parsed_response.get("Error", {}).get("Code")
if error_code is None:
return
service_model = operation_model.service_model
error_shape = service_model.shape_for_error_code(error_code)
if error_shape is None:
return
if asyncio.iscoroutinefunction(parser.parse):
modeled_parse = await parser.parse(response_dict, error_shape)
else:
modeled_parse = parser.parse(response_dict, error_shape)
# TODO: avoid naming conflicts with ResponseMetadata and Error
parsed_response.update(modeled_parse)
# NOTE: The only line changed here changing time.sleep to asyncio.sleep
async def _needs_retry(
self,
attempts,
operation_model,
request_dict,
response=None,
caught_exception=None,
):
service_id = operation_model.service_model.service_id.hyphenize()
event_name = f"needs-retry.{service_id}.{operation_model.name}"
responses = await self._event_emitter.emit(
event_name,
response=response,
endpoint=self,
operation=operation_model,
attempts=attempts,
caught_exception=caught_exception,
request_dict=request_dict,
)
handler_response = first_non_none_response(responses)
if handler_response is None:
return False
else:
# Request needs to be retried, and we need to sleep
# for the specified number of times.
logger.debug(
"Response received to retry, sleeping for %s seconds",
handler_response,
)
await asyncio.sleep(handler_response)
return True
async def _send(self, request):
return await self.http_session.send(request)
class AioEndpointCreator(EndpointCreator):
def create_endpoint(
self,
service_model,
region_name,
endpoint_url,
verify=None,
response_parser_factory=None,
timeout=DEFAULT_TIMEOUT,
max_pool_connections=MAX_POOL_CONNECTIONS,
http_session_cls=AIOHTTPSession,
proxies=None,
socket_options=None,
client_cert=None,
proxies_config=None,
connector_args=None,
):
if not is_valid_endpoint_url(
endpoint_url
) and not is_valid_ipv6_endpoint_url(endpoint_url):
raise ValueError("Invalid endpoint: %s" % endpoint_url)
if proxies is None:
proxies = self._get_proxies(endpoint_url)
endpoint_prefix = service_model.endpoint_prefix
logger.debug('Setting %s timeout as %s', endpoint_prefix, timeout)
http_session = http_session_cls(
timeout=timeout,
proxies=proxies,
verify=self._get_verify_value(verify),
max_pool_connections=max_pool_connections,
socket_options=socket_options,
client_cert=client_cert,
proxies_config=proxies_config,
connector_args=connector_args,
)
return AioEndpoint(
endpoint_url,
endpoint_prefix=endpoint_prefix,
event_emitter=self._event_emitter,
response_parser_factory=response_parser_factory,
http_session=http_session,
)