File size: 1,731 Bytes
5a1721f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""Generic parallelization utilities.

This module provides small, reusable helpers for ProcessPool-based
parallel mapping with backpressure. Keep format-specific I/O utilities in
`synplan.utils.files`.
"""

from __future__ import annotations

from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Any, Callable, Iterable, Iterator


def process_pool_map_stream(
    items: Iterable[Any],
    worker_fn: Callable[[Any], Any],
    *,
    max_workers: int,
    max_pending: int | None = None,
) -> Iterator[Any]:
    """Submit tasks lazily and yield results as they finish.

    Limits the number of in‑flight futures to avoid memory spikes when `items`
    is a large or infinite iterator. Results are yielded in completion order.
    """
    if max_workers < 1:
        raise ValueError("max_workers must be >= 1")

    max_pending = max_pending or (4 * max_workers)
    if max_pending < 1:
        max_pending = 1

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        iterator = iter(items)
        pending = set()

        # Prime the queue up to max_pending
        try:
            while len(pending) < max_pending:
                pending.add(executor.submit(worker_fn, next(iterator)))
        except StopIteration:
            pass

        while pending:
            for future in as_completed(pending):
                pending.remove(future)

                # Refill the queue immediately to keep workers busy
                try:
                    while len(pending) < max_pending:
                        pending.add(executor.submit(worker_fn, next(iterator)))
                except StopIteration:
                    pass

                yield future.result()