stocks / core /utils /execution.py
Arrechenash's picture
Initial Commit
da67450
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
logger = logging.getLogger(__name__)
class ParallelExecutor:
"""Standardized wrapper for ThreadPoolExecutor with progress tracking."""
def __init__(self, max_workers: int = 10):
self.max_workers = max_workers
def map(self, fn, items, desc="Processing", **kwargs):
"""Map a function over items in parallel with a progress bar."""
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(fn, item): item for item in items}
for future in tqdm(as_completed(futures), total=len(items), desc=desc):
try:
res = future.result()
if res is not None:
results.append(res)
except Exception as e:
logger.error(f"Error processing {futures[future]}: {e}")
return results