|
|
import asyncio |
|
|
|
|
|
from botocore.endpoint import ( |
|
|
DEFAULT_TIMEOUT, |
|
|
MAX_POOL_CONNECTIONS, |
|
|
Endpoint, |
|
|
EndpointCreator, |
|
|
HTTPClientError, |
|
|
create_request_object, |
|
|
history_recorder, |
|
|
is_valid_endpoint_url, |
|
|
is_valid_ipv6_endpoint_url, |
|
|
logger, |
|
|
) |
|
|
from botocore.hooks import first_non_none_response |
|
|
from urllib3.response import HTTPHeaderDict |
|
|
|
|
|
from aiobotocore.httpchecksum import handle_checksum_body |
|
|
from aiobotocore.httpsession import AIOHTTPSession |
|
|
from aiobotocore.response import StreamingBody |
|
|
|
|
|
|
|
|
async def convert_to_response_dict(http_response, operation_model): |
|
|
"""Convert an HTTP response object to a request dict. |
|
|
|
|
|
This converts the requests library's HTTP response object to |
|
|
a dictionary. |
|
|
|
|
|
:type http_response: botocore.vendored.requests.model.Response |
|
|
:param http_response: The HTTP response from an AWS service request. |
|
|
|
|
|
:rtype: dict |
|
|
:return: A response dictionary which will contain the following keys: |
|
|
* headers (dict) |
|
|
* status_code (int) |
|
|
* body (string or file-like object) |
|
|
|
|
|
""" |
|
|
response_dict = { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
'headers': HTTPHeaderDict( |
|
|
{ |
|
|
k.decode('utf-8').lower(): v.decode('utf-8') |
|
|
for k, v in http_response.raw.raw_headers |
|
|
} |
|
|
), |
|
|
'status_code': http_response.status_code, |
|
|
'context': { |
|
|
'operation_name': operation_model.name, |
|
|
}, |
|
|
} |
|
|
if response_dict['status_code'] >= 300: |
|
|
response_dict['body'] = await http_response.content |
|
|
elif operation_model.has_event_stream_output: |
|
|
response_dict['body'] = http_response.raw |
|
|
elif operation_model.has_streaming_output: |
|
|
length = response_dict['headers'].get('content-length') |
|
|
response_dict['body'] = StreamingBody(http_response.raw, length) |
|
|
else: |
|
|
response_dict['body'] = await http_response.content |
|
|
return response_dict |
|
|
|
|
|
|
|
|
class AioEndpoint(Endpoint): |
|
|
async def close(self): |
|
|
await self.http_session.close() |
|
|
|
|
|
async def create_request(self, params, operation_model=None): |
|
|
request = create_request_object(params) |
|
|
if operation_model: |
|
|
request.stream_output = any( |
|
|
[ |
|
|
operation_model.has_streaming_output, |
|
|
operation_model.has_event_stream_output, |
|
|
] |
|
|
) |
|
|
service_id = operation_model.service_model.service_id.hyphenize() |
|
|
event_name = 'request-created.{service_id}.{op_name}'.format( |
|
|
service_id=service_id, op_name=operation_model.name |
|
|
) |
|
|
await self._event_emitter.emit( |
|
|
event_name, |
|
|
request=request, |
|
|
operation_name=operation_model.name, |
|
|
) |
|
|
prepared_request = self.prepare_request(request) |
|
|
return prepared_request |
|
|
|
|
|
async def _send_request(self, request_dict, operation_model): |
|
|
attempts = 1 |
|
|
context = request_dict['context'] |
|
|
self._update_retries_context(context, attempts) |
|
|
request = await self.create_request(request_dict, operation_model) |
|
|
success_response, exception = await self._get_response( |
|
|
request, operation_model, context |
|
|
) |
|
|
while await self._needs_retry( |
|
|
attempts, |
|
|
operation_model, |
|
|
request_dict, |
|
|
success_response, |
|
|
exception, |
|
|
): |
|
|
attempts += 1 |
|
|
self._update_retries_context(context, attempts, success_response) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
request.reset_stream() |
|
|
|
|
|
request = await self.create_request(request_dict, operation_model) |
|
|
success_response, exception = await self._get_response( |
|
|
request, operation_model, context |
|
|
) |
|
|
if ( |
|
|
success_response is not None |
|
|
and 'ResponseMetadata' in success_response[1] |
|
|
): |
|
|
|
|
|
total_retries = attempts - 1 |
|
|
success_response[1]['ResponseMetadata'][ |
|
|
'RetryAttempts' |
|
|
] = total_retries |
|
|
if exception is not None: |
|
|
raise exception |
|
|
else: |
|
|
return success_response |
|
|
|
|
|
async def _get_response(self, request, operation_model, context): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
success_response, exception = await self._do_get_response( |
|
|
request, operation_model, context |
|
|
) |
|
|
kwargs_to_emit = { |
|
|
'response_dict': None, |
|
|
'parsed_response': None, |
|
|
'context': context, |
|
|
'exception': exception, |
|
|
} |
|
|
if success_response is not None: |
|
|
http_response, parsed_response = success_response |
|
|
kwargs_to_emit['parsed_response'] = parsed_response |
|
|
kwargs_to_emit['response_dict'] = await convert_to_response_dict( |
|
|
http_response, operation_model |
|
|
) |
|
|
service_id = operation_model.service_model.service_id.hyphenize() |
|
|
await self._event_emitter.emit( |
|
|
f"response-received.{service_id}.{operation_model.name}", |
|
|
**kwargs_to_emit, |
|
|
) |
|
|
return success_response, exception |
|
|
|
|
|
async def _do_get_response(self, request, operation_model, context): |
|
|
try: |
|
|
logger.debug("Sending http request: %s", request) |
|
|
history_recorder.record( |
|
|
'HTTP_REQUEST', |
|
|
{ |
|
|
'method': request.method, |
|
|
'headers': request.headers, |
|
|
'streaming': operation_model.has_streaming_input, |
|
|
'url': request.url, |
|
|
'body': request.body, |
|
|
}, |
|
|
) |
|
|
service_id = operation_model.service_model.service_id.hyphenize() |
|
|
event_name = f"before-send.{service_id}.{operation_model.name}" |
|
|
responses = await self._event_emitter.emit( |
|
|
event_name, request=request |
|
|
) |
|
|
http_response = first_non_none_response(responses) |
|
|
if http_response is None: |
|
|
http_response = await self._send(request) |
|
|
except HTTPClientError as e: |
|
|
return (None, e) |
|
|
except Exception as e: |
|
|
logger.debug( |
|
|
"Exception received when sending HTTP request.", exc_info=True |
|
|
) |
|
|
return (None, e) |
|
|
|
|
|
|
|
|
response_dict = await convert_to_response_dict( |
|
|
http_response, operation_model |
|
|
) |
|
|
await handle_checksum_body( |
|
|
http_response, |
|
|
response_dict, |
|
|
context, |
|
|
operation_model, |
|
|
) |
|
|
|
|
|
http_response_record_dict = response_dict.copy() |
|
|
http_response_record_dict[ |
|
|
'streaming' |
|
|
] = operation_model.has_streaming_output |
|
|
history_recorder.record('HTTP_RESPONSE', http_response_record_dict) |
|
|
|
|
|
protocol = operation_model.metadata['protocol'] |
|
|
parser = self._response_parser_factory.create_parser(protocol) |
|
|
|
|
|
if asyncio.iscoroutinefunction(parser.parse): |
|
|
parsed_response = await parser.parse( |
|
|
response_dict, operation_model.output_shape |
|
|
) |
|
|
else: |
|
|
parsed_response = parser.parse( |
|
|
response_dict, operation_model.output_shape |
|
|
) |
|
|
|
|
|
if http_response.status_code >= 300: |
|
|
await self._add_modeled_error_fields( |
|
|
response_dict, |
|
|
parsed_response, |
|
|
operation_model, |
|
|
parser, |
|
|
) |
|
|
history_recorder.record('PARSED_RESPONSE', parsed_response) |
|
|
return (http_response, parsed_response), None |
|
|
|
|
|
async def _add_modeled_error_fields( |
|
|
self, |
|
|
response_dict, |
|
|
parsed_response, |
|
|
operation_model, |
|
|
parser, |
|
|
): |
|
|
error_code = parsed_response.get("Error", {}).get("Code") |
|
|
if error_code is None: |
|
|
return |
|
|
service_model = operation_model.service_model |
|
|
error_shape = service_model.shape_for_error_code(error_code) |
|
|
if error_shape is None: |
|
|
return |
|
|
|
|
|
if asyncio.iscoroutinefunction(parser.parse): |
|
|
modeled_parse = await parser.parse(response_dict, error_shape) |
|
|
else: |
|
|
modeled_parse = parser.parse(response_dict, error_shape) |
|
|
|
|
|
parsed_response.update(modeled_parse) |
|
|
|
|
|
|
|
|
async def _needs_retry( |
|
|
self, |
|
|
attempts, |
|
|
operation_model, |
|
|
request_dict, |
|
|
response=None, |
|
|
caught_exception=None, |
|
|
): |
|
|
service_id = operation_model.service_model.service_id.hyphenize() |
|
|
event_name = f"needs-retry.{service_id}.{operation_model.name}" |
|
|
responses = await self._event_emitter.emit( |
|
|
event_name, |
|
|
response=response, |
|
|
endpoint=self, |
|
|
operation=operation_model, |
|
|
attempts=attempts, |
|
|
caught_exception=caught_exception, |
|
|
request_dict=request_dict, |
|
|
) |
|
|
handler_response = first_non_none_response(responses) |
|
|
if handler_response is None: |
|
|
return False |
|
|
else: |
|
|
|
|
|
|
|
|
logger.debug( |
|
|
"Response received to retry, sleeping for %s seconds", |
|
|
handler_response, |
|
|
) |
|
|
await asyncio.sleep(handler_response) |
|
|
return True |
|
|
|
|
|
async def _send(self, request): |
|
|
return await self.http_session.send(request) |
|
|
|
|
|
|
|
|
class AioEndpointCreator(EndpointCreator): |
|
|
def create_endpoint( |
|
|
self, |
|
|
service_model, |
|
|
region_name, |
|
|
endpoint_url, |
|
|
verify=None, |
|
|
response_parser_factory=None, |
|
|
timeout=DEFAULT_TIMEOUT, |
|
|
max_pool_connections=MAX_POOL_CONNECTIONS, |
|
|
http_session_cls=AIOHTTPSession, |
|
|
proxies=None, |
|
|
socket_options=None, |
|
|
client_cert=None, |
|
|
proxies_config=None, |
|
|
connector_args=None, |
|
|
): |
|
|
if not is_valid_endpoint_url( |
|
|
endpoint_url |
|
|
) and not is_valid_ipv6_endpoint_url(endpoint_url): |
|
|
raise ValueError("Invalid endpoint: %s" % endpoint_url) |
|
|
|
|
|
if proxies is None: |
|
|
proxies = self._get_proxies(endpoint_url) |
|
|
endpoint_prefix = service_model.endpoint_prefix |
|
|
|
|
|
logger.debug('Setting %s timeout as %s', endpoint_prefix, timeout) |
|
|
http_session = http_session_cls( |
|
|
timeout=timeout, |
|
|
proxies=proxies, |
|
|
verify=self._get_verify_value(verify), |
|
|
max_pool_connections=max_pool_connections, |
|
|
socket_options=socket_options, |
|
|
client_cert=client_cert, |
|
|
proxies_config=proxies_config, |
|
|
connector_args=connector_args, |
|
|
) |
|
|
|
|
|
return AioEndpoint( |
|
|
endpoint_url, |
|
|
endpoint_prefix=endpoint_prefix, |
|
|
event_emitter=self._event_emitter, |
|
|
response_parser_factory=response_parser_factory, |
|
|
http_session=http_session, |
|
|
) |
|
|
|