Spaces:
Sleeping
Sleeping
File size: 1,533 Bytes
bdc2878 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | """Bounded-concurrency batch processing utility."""
import asyncio
from collections.abc import Awaitable, Callable, Iterable
from typing import Any, TypeVar
T = TypeVar("T")
R = TypeVar("R")
async def run_batch(
items: Iterable[T],
handler: Callable[[T], Awaitable[R]],
*,
concurrency: int = 10,
pause_sec: float = 0.0,
batch_size: int = 0,
) -> list[R]:
"""Process *items* with bounded concurrency.
Args:
items: Input sequence.
handler: Async callable applied to each item.
concurrency: Maximum simultaneous tasks.
pause_sec: Sleep between batches (only when *batch_size* > 0).
batch_size: Group size for inter-batch pauses; 0 = no grouping.
Returns:
Results in the same order as *items*.
"""
item_list = list(items)
if not item_list:
return []
semaphore = asyncio.Semaphore(max(1, concurrency))
async def _guarded(item: T) -> R:
async with semaphore:
return await handler(item)
if not batch_size or batch_size >= len(item_list):
return list(await asyncio.gather(*[_guarded(i) for i in item_list]))
results: list[Any] = []
for start in range(0, len(item_list), batch_size):
chunk = item_list[start : start + batch_size]
chunk_results = await asyncio.gather(*[_guarded(i) for i in chunk])
results.extend(chunk_results)
if pause_sec > 0 and start + batch_size < len(item_list):
await asyncio.sleep(pause_sec)
return results
|