#!/usr/bin/env python3 """ Concurrent Test Executor with Exponential Backoff Retry Provides concurrent execution for perturbation tests with: - Configurable concurrency - Exponential backoff retry - Rate limiting - Progress callbacks """ import asyncio from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Any, Callable, Optional, TypeVar import logging import time import random logger = logging.getLogger(__name__) T = TypeVar('T') class ConcurrentTestExecutor: """ Concurrent test executor with: - Configurable concurrency - Exponential backoff retry - Rate limiting - Progress callbacks """ def __init__( self, max_workers: int = 5, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, rate_limit_per_minute: int = 60 ): """ Initialize the executor. Args: max_workers: Maximum concurrent workers max_retries: Maximum retry attempts base_delay: Base delay for exponential backoff (seconds) max_delay: Maximum delay between retries (seconds) rate_limit_per_minute: Maximum requests per minute """ self.max_workers = max_workers self.max_retries = max_retries self.base_delay = base_delay self.max_delay = max_delay self.rate_limit_per_minute = rate_limit_per_minute self._request_times: List[float] = [] self._lock = None # Will be created per-execution def _wait_for_rate_limit(self): """Check and wait for rate limit if needed.""" now = time.time() # Clean up records older than 1 minute self._request_times = [t for t in self._request_times if now - t < 60] if len(self._request_times) >= self.rate_limit_per_minute: # Wait until the oldest request is more than 1 minute old sleep_time = 60 - (now - self._request_times[0]) + 0.1 if sleep_time > 0: logger.info(f"Rate limit reached, waiting {sleep_time:.1f}s") time.sleep(sleep_time) self._request_times.append(time.time()) def _should_retry(self, error: Exception) -> bool: """Determine if an error should trigger a retry.""" error_str = str(error).lower() retryable_patterns = [ 'rate limit', 'rate_limit', '429', 'too many requests', 'timeout', 'connection', 'temporary', 'overloaded', 'server error', '500', '502', '503', '504', 'resource exhausted', 'quota exceeded' ] return any(pattern in error_str for pattern in retryable_patterns) def execute_with_retry( self, func: Callable[..., T], *args, **kwargs ) -> T: """ Execute a function with exponential backoff retry. Args: func: Function to execute *args: Positional arguments for func **kwargs: Keyword arguments for func Returns: Result of func Raises: Last exception if all retries fail """ last_exception = None for attempt in range(self.max_retries): try: self._wait_for_rate_limit() return func(*args, **kwargs) except Exception as e: last_exception = e # Check if we should retry if not self._should_retry(e) or attempt == self.max_retries - 1: raise # Calculate delay with exponential backoff + jitter delay = min( self.base_delay * (2 ** attempt) + random.uniform(0, 1), self.max_delay ) logger.warning( f"Attempt {attempt + 1}/{self.max_retries} failed: {e}. " f"Retrying in {delay:.1f}s..." ) time.sleep(delay) raise last_exception def execute_batch( self, items: List[Any], process_func: Callable[[Any], Any], progress_callback: Optional[Callable[[int, int, str], None]] = None ) -> List[Dict[str, Any]]: """ Execute batch processing with concurrency. Args: items: List of items to process process_func: Function to process each item progress_callback: Optional callback (current, total, message) Returns: List of results in original order """ if not items: return [] results = [None] * len(items) completed = 0 total = len(items) # Reset rate limit tracking for this batch self._request_times = [] with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Submit all tasks future_to_index = { executor.submit( self.execute_with_retry, process_func, item ): i for i, item in enumerate(items) } # Collect results as they complete for future in as_completed(future_to_index): index = future_to_index[future] try: results[index] = future.result() except Exception as e: logger.error(f"Task {index} failed after retries: {e}") results[index] = {"error": str(e), "index": index} completed += 1 if progress_callback: progress_callback( completed, total, f"Completed {completed}/{total} tasks" ) return results class AsyncConcurrentExecutor: """Async version of the concurrent executor for FastAPI integration.""" def __init__( self, max_concurrent: int = 5, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0 ): """ Initialize the async executor. Args: max_concurrent: Maximum concurrent tasks max_retries: Maximum retry attempts base_delay: Base delay for exponential backoff max_delay: Maximum delay between retries """ self.max_concurrent = max_concurrent self.max_retries = max_retries self.base_delay = base_delay self.max_delay = max_delay def _should_retry(self, error: Exception) -> bool: """Determine if an error should trigger a retry.""" error_str = str(error).lower() retryable_patterns = [ 'rate limit', 'rate_limit', '429', 'too many requests', 'timeout', 'connection', 'temporary', 'overloaded', 'server error', '500', '502', '503', '504' ] return any(pattern in error_str for pattern in retryable_patterns) async def execute_with_retry( self, coro_func: Callable, *args, semaphore: asyncio.Semaphore = None, **kwargs ): """ Execute an async function with exponential backoff retry. Args: coro_func: Async function to execute *args: Positional arguments semaphore: Optional semaphore for concurrency control **kwargs: Keyword arguments Returns: Result of the async function """ async def _execute(): for attempt in range(self.max_retries): try: return await coro_func(*args, **kwargs) except Exception as e: if not self._should_retry(e) or attempt == self.max_retries - 1: raise delay = min( self.base_delay * (2 ** attempt) + random.uniform(0, 1), self.max_delay ) logger.warning(f"Async retry {attempt + 1}: {e}, waiting {delay:.1f}s") await asyncio.sleep(delay) if semaphore: async with semaphore: return await _execute() return await _execute() async def execute_batch( self, items: List[Any], process_func: Callable, progress_callback: Optional[Callable] = None ) -> List[Any]: """ Execute batch processing with async concurrency. Args: items: List of items to process process_func: Async function to process each item progress_callback: Optional async callback Returns: List of results """ semaphore = asyncio.Semaphore(self.max_concurrent) async def process_with_tracking(item, index): result = await self.execute_with_retry( process_func, item, semaphore=semaphore ) if progress_callback: await progress_callback(index + 1, len(items), f"Completed {index + 1}/{len(items)}") return result tasks = [ process_with_tracking(item, i) for i, item in enumerate(items) ] results = await asyncio.gather(*tasks, return_exceptions=True) # Convert exceptions to error dicts processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): processed_results.append({"error": str(result), "index": i}) else: processed_results.append(result) return processed_results def create_executor( max_workers: int = 5, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, rate_limit_per_minute: int = 60 ) -> ConcurrentTestExecutor: """ Factory function to create a configured executor. Args: max_workers: Maximum concurrent workers max_retries: Maximum retry attempts base_delay: Base delay for exponential backoff max_delay: Maximum delay between retries rate_limit_per_minute: Rate limit Returns: Configured ConcurrentTestExecutor """ return ConcurrentTestExecutor( max_workers=max_workers, max_retries=max_retries, base_delay=base_delay, max_delay=max_delay, rate_limit_per_minute=rate_limit_per_minute )