File size: 1,004 Bytes
da67450
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed

from tqdm import tqdm

logger = logging.getLogger(__name__)


class ParallelExecutor:
    """Standardized wrapper for ThreadPoolExecutor with progress tracking."""

    def __init__(self, max_workers: int = 10):
        self.max_workers = max_workers

    def map(self, fn, items, desc="Processing", **kwargs):
        """Map a function over items in parallel with a progress bar."""
        results = []
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {executor.submit(fn, item): item for item in items}
            for future in tqdm(as_completed(futures), total=len(items), desc=desc):
                try:
                    res = future.result()
                    if res is not None:
                        results.append(res)
                except Exception as e:
                    logger.error(f"Error processing {futures[future]}: {e}")
        return results