Kris8an's picture
Upload folder using huggingface_hub
a06facb verified
import asyncio
from botocore.docs.docstring import WaiterDocstring
# WaiterModel is required for client.py import
from botocore.exceptions import ClientError
from botocore.utils import get_service_module_name
from botocore.waiter import (
NormalizedOperationMethod as _NormalizedOperationMethod,
)
from botocore.waiter import WaiterModel # noqa: F401 lgtm[py/unused-import]
from botocore.waiter import (
Waiter,
WaiterError,
is_valid_waiter_error,
logger,
xform_name,
)
def create_waiter_with_client(waiter_name, waiter_model, client):
"""
:type waiter_name: str
:param waiter_name: The name of the waiter. The name should match
the name (including the casing) of the key name in the waiter
model file (typically this is CamelCasing).
:type waiter_model: botocore.waiter.WaiterModel
:param waiter_model: The model for the waiter configuration.
:type client: botocore.client.BaseClient
:param client: The botocore client associated with the service.
:rtype: botocore.waiter.Waiter
:return: The waiter object.
"""
single_waiter_config = waiter_model.get_waiter(waiter_name)
operation_name = xform_name(single_waiter_config.operation)
operation_method = NormalizedOperationMethod(
getattr(client, operation_name)
)
# Create a new wait method that will serve as a proxy to the underlying
# Waiter.wait method. This is needed to attach a docstring to the
# method.
async def wait(self, **kwargs):
return await AIOWaiter.wait(self, **kwargs)
wait.__doc__ = WaiterDocstring(
waiter_name=waiter_name,
event_emitter=client.meta.events,
service_model=client.meta.service_model,
service_waiter_model=waiter_model,
include_signature=False,
)
# Rename the waiter class based on the type of waiter.
waiter_class_name = str(
'%s.Waiter.%s'
% (get_service_module_name(client.meta.service_model), waiter_name)
)
# Create the new waiter class
documented_waiter_cls = type(waiter_class_name, (Waiter,), {'wait': wait})
# Return an instance of the new waiter class.
return documented_waiter_cls(
waiter_name, single_waiter_config, operation_method
)
class NormalizedOperationMethod(_NormalizedOperationMethod):
async def __call__(self, **kwargs):
try:
return await self._client_method(**kwargs)
except ClientError as e:
return e.response
class AIOWaiter(Waiter):
async def wait(self, **kwargs):
acceptors = list(self.config.acceptors)
current_state = 'waiting'
# pop the invocation specific config
config = kwargs.pop('WaiterConfig', {})
sleep_amount = config.get('Delay', self.config.delay)
max_attempts = config.get('MaxAttempts', self.config.max_attempts)
last_matched_acceptor = None
num_attempts = 0
while True:
response = await self._operation_method(**kwargs)
num_attempts += 1
for acceptor in acceptors:
if acceptor.matcher_func(response):
last_matched_acceptor = acceptor
current_state = acceptor.state
break
else:
# If none of the acceptors matched, we should
# transition to the failure state if an error
# response was received.
if is_valid_waiter_error(response):
# Transition to a failure state, which we
# can just handle here by raising an exception.
raise WaiterError(
name=self.name,
reason='An error occurred (%s): %s'
% (
response['Error'].get('Code', 'Unknown'),
response['Error'].get('Message', 'Unknown'),
),
last_response=response,
)
if current_state == 'success':
logger.debug(
"Waiting complete, waiter matched the " "success state."
)
return response
if current_state == 'failure':
reason = 'Waiter encountered a terminal failure state: %s' % (
acceptor.explanation
)
raise WaiterError(
name=self.name,
reason=reason,
last_response=response,
)
if num_attempts >= max_attempts:
if last_matched_acceptor is None:
reason = 'Max attempts exceeded'
else:
reason = (
'Max attempts exceeded. Previously accepted state: %s'
% (acceptor.explanation)
)
raise WaiterError(
name=self.name,
reason=reason,
last_response=response,
)
await asyncio.sleep(sleep_amount)