Kris8an's picture
Upload folder using huggingface_hub
a06facb verified
# Copyright 2022 Amethyst Reese
# Licensed under the MIT license
import asyncio
from typing import AsyncIterator
from unittest import TestCase
import aioitertools as ait
from .helpers import async_test
slist = ["A", "B", "C"]
srange = range(3)
srange1 = range(1, 4)
srange0 = range(1)
class BuiltinsTest(TestCase):
# aioitertools.all()
@async_test
async def test_all_list(self):
self.assertTrue(await ait.all([True, 1, "string"]))
self.assertFalse(await ait.all([True, 0, "string"]))
@async_test
async def test_all_range(self):
self.assertTrue(await ait.all(srange1))
self.assertFalse(await ait.all(srange))
@async_test
async def test_all_generator(self):
self.assertTrue(await ait.all(x for x in srange1))
self.assertFalse(await ait.all(x for x in srange))
@async_test
async def test_all_async_generator(self):
self.assertTrue(await ait.all(ait.iter(srange1)))
self.assertFalse(await ait.all(ait.iter(srange)))
# aioitertools.any()
@async_test
async def test_any_list(self):
self.assertTrue(await ait.any([False, 1, ""]))
self.assertFalse(await ait.any([False, 0, ""]))
@async_test
async def test_any_range(self):
self.assertTrue(await ait.any(srange))
self.assertTrue(await ait.any(srange1))
self.assertFalse(await ait.any(srange0))
@async_test
async def test_any_generator(self):
self.assertTrue(await ait.any(x for x in srange))
self.assertTrue(await ait.any(x for x in srange1))
self.assertFalse(await ait.any(x for x in srange0))
@async_test
async def test_any_async_generator(self):
self.assertTrue(await ait.any(ait.iter(srange)))
self.assertTrue(await ait.any(ait.iter(srange1)))
self.assertFalse(await ait.any(ait.iter(srange0)))
# aioitertools.iter()
@async_test
async def test_iter_list(self):
it = ait.iter(slist)
self.assertIsInstance(it, AsyncIterator)
idx = 0
async for item in it:
self.assertEqual(item, slist[idx])
idx += 1
@async_test
async def test_iter_range(self):
it = ait.iter(srange)
self.assertIsInstance(it, AsyncIterator)
idx = 0
async for item in it:
self.assertEqual(item, srange[idx])
idx += 1
@async_test
async def test_iter_iterable(self):
sentinel = object()
class async_iterable:
def __aiter__(self):
return sentinel
aiter = async_iterable()
self.assertEqual(ait.iter(aiter), sentinel)
@async_test
async def test_iter_iterator(self):
sentinel = object()
class async_iterator:
def __aiter__(self):
return sentinel
def __anext__(self):
return sentinel
aiter = async_iterator()
self.assertEqual(ait.iter(aiter), aiter)
@async_test
async def test_iter_async_generator(self):
async def async_gen():
yield 1
yield 2
agen = async_gen()
self.assertEqual(ait.iter(agen), agen)
# aioitertools.next()
@async_test
async def test_next_list(self):
it = ait.iter(slist)
self.assertEqual(await ait.next(it), "A")
self.assertEqual(await ait.next(it), "B")
self.assertEqual(await ait.next(it), "C")
with self.assertRaises(StopAsyncIteration):
await ait.next(it)
@async_test
async def test_next_range(self):
it = ait.iter(srange)
self.assertEqual(await ait.next(it), 0)
self.assertEqual(await ait.next(it), 1)
self.assertEqual(await ait.next(it), 2)
with self.assertRaises(StopAsyncIteration):
await ait.next(it)
@async_test
async def test_next_iterable(self):
class async_iter:
def __init__(self):
self.index = 0
def __aiter__(self):
return self
def __anext__(self):
if self.index > 2:
raise StopAsyncIteration()
return self.fake_next()
async def fake_next(self):
value = slist[self.index]
self.index += 1
return value
it = ait.iter(async_iter())
self.assertEqual(await ait.next(it), "A")
self.assertEqual(await ait.next(it), "B")
self.assertEqual(await ait.next(it), "C")
with self.assertRaises(StopAsyncIteration):
await ait.next(it)
it = iter(slist)
self.assertEqual(await ait.next(it), "A")
self.assertEqual(await ait.next(it), "B")
self.assertEqual(await ait.next(it), "C")
with self.assertRaises(StopAsyncIteration):
await ait.next(it)
@async_test
async def test_next_async_generator(self):
async def async_gen():
for item in slist:
yield item
it = ait.iter(async_gen())
self.assertEqual(await ait.next(it), "A")
self.assertEqual(await ait.next(it), "B")
self.assertEqual(await ait.next(it), "C")
with self.assertRaises(StopAsyncIteration):
await ait.next(it)
@async_test
async def test_next_default_iterable(self):
it = iter(["A"])
self.assertEqual(await ait.next(it, "?"), "A")
# End of iteration
self.assertEqual(await ait.next(it, "?"), "?")
@async_test
async def test_next_default_async_iterable(self):
it = ait.iter(["A"])
self.assertEqual(await ait.next(it, "?"), "A")
# End of iteration
self.assertEqual(await ait.next(it, "?"), "?")
# aioitertools.list()
@async_test
async def test_list(self):
self.assertEqual(await ait.list(ait.iter(slist)), slist)
# aioitertools.set()
@async_test
async def test_set(self):
self.assertEqual(await ait.set(ait.iter(slist)), set(slist))
# aioitertools.enumerate()
@async_test
async def test_enumerate(self):
async for index, value in ait.enumerate(slist):
self.assertEqual(value, slist[index])
@async_test
async def test_enumerate_start(self):
async for index, value in ait.enumerate(slist, 4):
self.assertEqual(value, slist[index - 4])
# aioitertools.map()
@async_test
async def test_map_function_list(self):
idx = 0
async for value in ait.map(str.lower, slist):
self.assertEqual(value, slist[idx].lower())
idx += 1
@async_test
async def test_map_function_async_generator(self):
async def gen():
for item in slist:
yield item
idx = 0
async for value in ait.map(str.lower, gen()):
self.assertEqual(value, slist[idx].lower())
idx += 1
@async_test
async def test_map_coroutine_list(self):
async def double(x):
await asyncio.sleep(0.0001)
return x * 2
idx = 0
async for value in ait.map(double, slist):
self.assertEqual(value, slist[idx] * 2)
idx += 1
@async_test
async def test_map_coroutine_generator(self):
async def gen():
for item in slist:
yield item
async def double(x):
await asyncio.sleep(0.0001)
return x * 2
idx = 0
async for value in ait.map(double, gen()):
self.assertEqual(value, slist[idx] * 2)
idx += 1
# aioitertools.max()
@async_test
async def test_max_basic(self):
async def gen():
for item in slist:
yield item
self.assertEqual(await ait.max(gen()), "C")
self.assertEqual(await ait.max(range(4)), 3)
with self.assertRaisesRegex(ValueError, "iterable is empty"):
await ait.max([])
with self.assertRaisesRegex(ValueError, "kwarg .+ not supported"):
await ait.max(None, foo="foo")
@async_test
async def test_max_default(self):
self.assertEqual(await ait.max(range(2), default="x"), 1)
self.assertEqual(await ait.max([], default="x"), "x")
self.assertEqual(await ait.max([], default=None), None)
@async_test
async def test_max_key(self):
words = ["star", "buzz", "guard"]
def reverse(s):
return s[::-1]
self.assertEqual(reverse("python"), "nohtyp")
self.assertEqual(await ait.max(words), "star")
self.assertEqual(await ait.max(words, key=reverse), "buzz")
# aioitertools.min()
@async_test
async def test_min_basic(self):
async def gen():
for item in slist:
yield item
self.assertEqual(await ait.min(gen()), "A")
self.assertEqual(await ait.min(range(4)), 0)
with self.assertRaisesRegex(ValueError, "iterable is empty"):
await ait.min([])
with self.assertRaisesRegex(ValueError, "kwarg .+ not supported"):
await ait.min(None, foo="foo")
@async_test
async def test_min_default(self):
self.assertEqual(await ait.min(range(2), default="x"), 0)
self.assertEqual(await ait.min([], default="x"), "x")
self.assertEqual(await ait.min([], default=None), None)
@async_test
async def test_min_key(self):
words = ["star", "buzz", "guard"]
def reverse(s):
return s[::-1]
self.assertEqual(reverse("python"), "nohtyp")
self.assertEqual(await ait.min(words), "buzz")
self.assertEqual(await ait.min(words, key=reverse), "guard")
# aioitertools.sum()
@async_test
async def test_sum_range_default(self):
self.assertEqual(await ait.sum(srange), sum(srange))
@async_test
async def test_sum_list_string(self):
self.assertEqual(await ait.sum(slist, "foo"), "fooABC")
# aioitertools.zip()
@async_test
async def test_zip_equal(self):
idx = 0
async for a, b in ait.zip(slist, srange):
self.assertEqual(a, slist[idx])
self.assertEqual(b, srange[idx])
idx += 1
@async_test
async def test_zip_shortest(self):
short = ["a", "b", "c"]
long = [0, 1, 2, 3, 5]
result = await ait.list(ait.zip(short, long))
expected = [["a", 0], ["b", 1], ["c", 2]]
self.assertListEqual(expected, result)