File size: 1,533 Bytes
bdc2878
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"""Bounded-concurrency batch processing utility."""

import asyncio
from collections.abc import Awaitable, Callable, Iterable
from typing import Any, TypeVar

T = TypeVar("T")
R = TypeVar("R")


async def run_batch(
    items: Iterable[T],
    handler: Callable[[T], Awaitable[R]],
    *,
    concurrency: int = 10,
    pause_sec: float = 0.0,
    batch_size: int = 0,
) -> list[R]:
    """Process *items* with bounded concurrency.

    Args:
        items: Input sequence.
        handler: Async callable applied to each item.
        concurrency: Maximum simultaneous tasks.
        pause_sec: Sleep between batches (only when *batch_size* > 0).
        batch_size: Group size for inter-batch pauses; 0 = no grouping.

    Returns:
        Results in the same order as *items*.
    """
    item_list = list(items)
    if not item_list:
        return []

    semaphore = asyncio.Semaphore(max(1, concurrency))

    async def _guarded(item: T) -> R:
        async with semaphore:
            return await handler(item)

    if not batch_size or batch_size >= len(item_list):
        return list(await asyncio.gather(*[_guarded(i) for i in item_list]))

    results: list[Any] = []
    for start in range(0, len(item_list), batch_size):
        chunk = item_list[start : start + batch_size]
        chunk_results = await asyncio.gather(*[_guarded(i) for i in chunk])
        results.extend(chunk_results)
        if pause_sec > 0 and start + batch_size < len(item_list):
            await asyncio.sleep(pause_sec)
    return results