File size: 4,843 Bytes
6835659 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | """
Parallel processing utilities for data pipeline optimization.
Supports:
- Parallel data ingestion
- Batch processing with multiprocessing
- Distributed processing support (foundation for Dask/Beam)
"""
from __future__ import annotations
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from functools import partial
from typing import Any, Callable, Dict, Iterable, List, Optional, TypeVar
T = TypeVar("T")
R = TypeVar("R")
def parallel_map(
func: Callable[[T], R],
items: Iterable[T],
max_workers: Optional[int] = None,
use_threads: bool = False,
chunk_size: int = 1,
**func_kwargs,
) -> List[R]:
"""
Parallel map function.
Args:
func: Function to apply to each item
items: Iterable of items to process
max_workers: Maximum number of workers (default: CPU count)
use_threads: Use threads instead of processes (for I/O-bound tasks)
chunk_size: Number of items per chunk (for ProcessPoolExecutor)
**func_kwargs: Additional kwargs to pass to func
Returns:
List of results in the same order as items
"""
if max_workers is None:
max_workers = mp.cpu_count()
items_list = list(items)
if not items_list:
return []
# Prepare function with kwargs
if func_kwargs:
func = partial(func, **func_kwargs)
executor_class = ThreadPoolExecutor if use_threads else ProcessPoolExecutor
with executor_class(max_workers=max_workers) as executor:
if use_threads:
# ThreadPoolExecutor doesn't use chunk_size
futures = [executor.submit(func, item) for item in items_list]
else:
# ProcessPoolExecutor supports chunking
futures = executor.map(func, items_list, chunksize=chunk_size)
return list(futures)
# For ThreadPoolExecutor, collect results
results = []
for future in as_completed(futures):
try:
results.append(future.result())
except Exception as e:
# Log error but continue
print(f"Error in parallel_map: {e}")
results.append(None)
# Reorder results to match input order (approximate for as_completed)
# For exact ordering, use executor.map instead
return results
def batch_process(
func: Callable[[List[T]], List[R]],
items: Iterable[T],
batch_size: int = 32,
max_workers: Optional[int] = None,
**func_kwargs,
) -> List[R]:
"""
Process items in batches in parallel.
Args:
func: Function that processes a batch and returns list of results
items: Iterable of items to process
batch_size: Number of items per batch
max_workers: Maximum number of parallel batches
**func_kwargs: Additional kwargs to pass to func
Returns:
Flattened list of results
"""
items_list = list(items)
batches = [
items_list[i : i + batch_size] for i in range(0, len(items_list), batch_size)
]
if not batches:
return []
# Prepare function with kwargs
if func_kwargs:
func = partial(func, **func_kwargs)
if max_workers is None:
max_workers = min(len(batches), mp.cpu_count())
results = parallel_map(
func,
batches,
max_workers=max_workers,
use_threads=False,
)
# Flatten results
flattened = []
for batch_results in results:
if batch_results:
flattened.extend(batch_results)
return flattened
class ParallelProcessor:
"""Wrapper for parallel processing with configuration."""
def __init__(
self,
max_workers: Optional[int] = None,
use_threads: bool = False,
chunk_size: int = 1,
):
self.max_workers = max_workers or mp.cpu_count()
self.use_threads = use_threads
self.chunk_size = chunk_size
def map(self, func: Callable[[T], R], items: Iterable[T], **func_kwargs) -> List[R]:
"""Apply function to items in parallel."""
return parallel_map(
func,
items,
max_workers=self.max_workers,
use_threads=self.use_threads,
chunk_size=self.chunk_size,
**func_kwargs,
)
def batch_map(
self,
func: Callable[[List[T]], List[R]],
items: Iterable[T],
batch_size: int = 32,
**func_kwargs,
) -> List[R]:
"""Process items in batches in parallel."""
return batch_process(
func,
items,
batch_size=batch_size,
max_workers=self.max_workers,
**func_kwargs,
)
|