Kris8an's picture
Upload folder using huggingface_hub
a06facb verified
import aioitertools
import jmespath
from botocore.exceptions import PaginationError
from botocore.paginate import PageIterator, Paginator
from botocore.utils import merge_dicts, set_value_from_jmespath
class AioPageIterator(PageIterator):
def __aiter__(self):
return self.__anext__()
async def __anext__(self):
current_kwargs = self._op_kwargs
previous_next_token = None
next_token = {key: None for key in self._input_token}
if self._starting_token is not None:
# If the starting token exists, populate the next_token with the
# values inside it. This ensures that we have the service's
# pagination token on hand if we need to truncate after the
# first response.
next_token = self._parse_starting_token()[0]
# The number of items from result_key we've seen so far.
total_items = 0
first_request = True
primary_result_key = self.result_keys[0]
starting_truncation = 0
self._inject_starting_params(current_kwargs)
while True:
response = await self._make_request(current_kwargs)
parsed = self._extract_parsed_response(response)
if first_request:
# The first request is handled differently. We could
# possibly have a resume/starting token that tells us where
# to index into the retrieved page.
if self._starting_token is not None:
starting_truncation = self._handle_first_request(
parsed, primary_result_key, starting_truncation
)
first_request = False
self._record_non_aggregate_key_values(parsed)
else:
# If this isn't the first request, we have already sliced into
# the first request and had to make additional requests after.
# We no longer need to add this to truncation.
starting_truncation = 0
current_response = primary_result_key.search(parsed)
if current_response is None:
current_response = []
num_current_response = len(current_response)
truncate_amount = 0
if self._max_items is not None:
truncate_amount = (
total_items + num_current_response - self._max_items
)
if truncate_amount > 0:
self._truncate_response(
parsed,
primary_result_key,
truncate_amount,
starting_truncation,
next_token,
)
yield response
break
else:
yield response
total_items += num_current_response
next_token = self._get_next_token(parsed)
if all(t is None for t in next_token.values()):
break
if (
self._max_items is not None
and total_items == self._max_items
):
# We're on a page boundary so we can set the current
# next token to be the resume token.
self.resume_token = next_token
break
if (
previous_next_token is not None
and previous_next_token == next_token
):
message = (
f"The same next token was received "
f"twice: {next_token}"
)
raise PaginationError(message=message)
self._inject_token_into_kwargs(current_kwargs, next_token)
previous_next_token = next_token
async def search(self, expression):
compiled = jmespath.compile(expression)
async for page in self:
results = compiled.search(page)
if isinstance(results, list):
for element in results:
yield element # unfortunately yield from not avail from async f
else:
yield results
def result_key_iters(self):
teed_results = aioitertools.tee(self, len(self.result_keys))
return [
ResultKeyIterator(i, result_key)
for i, result_key in zip(teed_results, self.result_keys)
]
async def build_full_result(self):
complete_result = {}
async for response in self:
page = response
# We want to try to catch operation object pagination
# and format correctly for those. They come in the form
# of a tuple of two elements: (http_response, parsed_responsed).
# We want the parsed_response as that is what the page iterator
# uses. We can remove it though once operation objects are removed.
if isinstance(response, tuple) and len(response) == 2:
page = response[1]
# We're incrementally building the full response page
# by page. For each page in the response we need to
# inject the necessary components from the page
# into the complete_result.
for result_expression in self.result_keys:
# In order to incrementally update a result key
# we need to search the existing value from complete_result,
# then we need to search the _current_ page for the
# current result key value. Then we append the current
# value onto the existing value, and re-set that value
# as the new value.
result_value = result_expression.search(page)
if result_value is None:
continue
existing_value = result_expression.search(complete_result)
if existing_value is None:
# Set the initial result
set_value_from_jmespath(
complete_result,
result_expression.expression,
result_value,
)
continue
# Now both result_value and existing_value contain something
if isinstance(result_value, list):
existing_value.extend(result_value)
elif isinstance(result_value, (int, float, str)):
# Modify the existing result with the sum or concatenation
set_value_from_jmespath(
complete_result,
result_expression.expression,
existing_value + result_value,
)
merge_dicts(complete_result, self.non_aggregate_part)
if self.resume_token is not None:
complete_result['NextToken'] = self.resume_token
return complete_result
class AioPaginator(Paginator):
PAGE_ITERATOR_CLS = AioPageIterator
class ResultKeyIterator:
"""Iterates over the results of paginated responses.
Each iterator is associated with a single result key.
Iterating over this object will give you each element in
the result key list.
:param pages_iterator: An iterator that will give you
pages of results (a ``PageIterator`` class).
:param result_key: The JMESPath expression representing
the result key.
"""
def __init__(self, pages_iterator, result_key):
self._pages_iterator = pages_iterator
self.result_key = result_key
def __aiter__(self):
return self.__anext__()
async def __anext__(self):
async for page in self._pages_iterator:
results = self.result_key.search(page)
if results is None:
results = []
for result in results:
yield result # yield from not avail from async func