Spaces:
Running
Running
File size: 1,731 Bytes
5a1721f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | """Generic parallelization utilities.
This module provides small, reusable helpers for ProcessPool-based
parallel mapping with backpressure. Keep format-specific I/O utilities in
`synplan.utils.files`.
"""
from __future__ import annotations
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Any, Callable, Iterable, Iterator
def process_pool_map_stream(
items: Iterable[Any],
worker_fn: Callable[[Any], Any],
*,
max_workers: int,
max_pending: int | None = None,
) -> Iterator[Any]:
"""Submit tasks lazily and yield results as they finish.
Limits the number of in‑flight futures to avoid memory spikes when `items`
is a large or infinite iterator. Results are yielded in completion order.
"""
if max_workers < 1:
raise ValueError("max_workers must be >= 1")
max_pending = max_pending or (4 * max_workers)
if max_pending < 1:
max_pending = 1
with ProcessPoolExecutor(max_workers=max_workers) as executor:
iterator = iter(items)
pending = set()
# Prime the queue up to max_pending
try:
while len(pending) < max_pending:
pending.add(executor.submit(worker_fn, next(iterator)))
except StopIteration:
pass
while pending:
for future in as_completed(pending):
pending.remove(future)
# Refill the queue immediately to keep workers busy
try:
while len(pending) < max_pending:
pending.add(executor.submit(worker_fn, next(iterator)))
except StopIteration:
pass
yield future.result()
|