|
|
import inspect |
|
|
|
|
|
from botocore.discovery import ( |
|
|
EndpointDiscoveryHandler, |
|
|
EndpointDiscoveryManager, |
|
|
EndpointDiscoveryRefreshFailed, |
|
|
HTTPClientError, |
|
|
logger, |
|
|
) |
|
|
|
|
|
|
|
|
class AioEndpointDiscoveryManager(EndpointDiscoveryManager): |
|
|
async def _refresh_current_endpoints(self, **kwargs): |
|
|
cache_key = self._create_cache_key(**kwargs) |
|
|
try: |
|
|
response = self._describe_endpoints(**kwargs) |
|
|
|
|
|
if inspect.isawaitable(response): |
|
|
response = await response |
|
|
|
|
|
endpoints = self._parse_endpoints(response) |
|
|
self._cache[cache_key] = endpoints |
|
|
self._failed_attempts.pop(cache_key, None) |
|
|
return endpoints |
|
|
except (ConnectionError, HTTPClientError): |
|
|
self._failed_attempts[cache_key] = self._time() + 60 |
|
|
return None |
|
|
|
|
|
async def describe_endpoint(self, **kwargs): |
|
|
operation = kwargs['Operation'] |
|
|
discovery_required = self._model.discovery_required_for(operation) |
|
|
|
|
|
if not self._always_discover and not discovery_required: |
|
|
|
|
|
logger.debug( |
|
|
'Optional discovery disabled. Skipping discovery for Operation: %s' |
|
|
% operation |
|
|
) |
|
|
return None |
|
|
|
|
|
|
|
|
cache_key = self._create_cache_key(**kwargs) |
|
|
endpoints = self._get_current_endpoints(cache_key) |
|
|
if endpoints: |
|
|
return self._select_endpoint(endpoints) |
|
|
|
|
|
recently_failed = self._recently_failed(cache_key) |
|
|
if not recently_failed: |
|
|
|
|
|
endpoints = await self._refresh_current_endpoints(**kwargs) |
|
|
if endpoints: |
|
|
return self._select_endpoint(endpoints) |
|
|
|
|
|
logger.debug('Endpoint Discovery has failed for: %s', kwargs) |
|
|
stale_entries = self._cache.get(cache_key, None) |
|
|
if stale_entries: |
|
|
|
|
|
return self._select_endpoint(stale_entries) |
|
|
if discovery_required: |
|
|
|
|
|
|
|
|
if recently_failed: |
|
|
|
|
|
endpoints = await self._refresh_current_endpoints(**kwargs) |
|
|
if endpoints: |
|
|
return self._select_endpoint(endpoints) |
|
|
|
|
|
raise EndpointDiscoveryRefreshFailed() |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
class AioEndpointDiscoveryHandler(EndpointDiscoveryHandler): |
|
|
async def discover_endpoint(self, request, operation_name, **kwargs): |
|
|
ids = request.context.get('discovery', {}).get('identifiers') |
|
|
if ids is None: |
|
|
return |
|
|
endpoint = await self._manager.describe_endpoint( |
|
|
Operation=operation_name, Identifiers=ids |
|
|
) |
|
|
if endpoint is None: |
|
|
logger.debug('Failed to discover and inject endpoint') |
|
|
return |
|
|
if not endpoint.startswith('http'): |
|
|
endpoint = 'https://' + endpoint |
|
|
logger.debug('Injecting discovered endpoint: %s', endpoint) |
|
|
request.url = endpoint |
|
|
|