# filename: analysis_runner.py """ Module for running text analysis in parallel with rate limiting. """ from concurrent.futures import as_completed from ratelimit import limits, RateLimitException from log_config import get_logger logger = get_logger('AnalysisRunner') # Constants for rate limiting RATE_LIMIT = 8 # Maximum number of concurrent requests RATE_PERIOD = 1 # Rate period in seconds # Apply the rate limiter decorator calls = limits(calls=RATE_LIMIT, period=RATE_PERIOD) def analyze_text_parallel(raw_text: str, selected_analyses: list, analysis_prompts: dict, optimal_models: list, thread_pool, progress) -> dict: """ Run text analyses in parallel with rate limiting. Args: raw_text (str): The text to be analyzed. selected_analyses (list): A list of selected analyses to perform. analysis_prompts (dict): A dictionary containing analysis prompts. optimal_models (list): A list of optimal models for each analysis. thread_pool (concurrent.futures.Executor): The thread pool executor for running analyses. progress (callable): A callable to report progress. Returns: dict: A dictionary with analysis results. """ logger.debug(f"Selected Analyses: {selected_analyses}, Type: {type(selected_analyses)}") logger.debug(f"Optimal Models: {optimal_models}, Type: {type(optimal_models)}") analysis_results = {} total_analyses = len(selected_analyses) completed_analyses = 0 @calls def run_analysis(analysis: str, model) -> tuple: nonlocal completed_analyses try: prompt = analysis_prompts[analysis] result = model.invoke(prompt) logger.info(f"Analysis '{analysis}' completed successfully using {model.__class__.__name__}.") completed_analyses += 1 progress(completed_analyses / total_analyses, desc=f"Analysis '{analysis}' completed.") return analysis, result except RateLimitException as e: error_message = f"Rate limit exceeded for analysis '{analysis}' using {model.__class__.__name__}. Error: {str(e)}" logger.warning(error_message) return analysis, "Analysis failed due to rate limit exceeded." except Exception as e: error_message = f"Error running analysis '{analysis}' using {model.__class__.__name__}. Error type: {type(e).__name__}. Error message: {str(e)}" logger.error(error_message, exc_info=True) return analysis, f"Analysis failed due to an error: {str(e)}" futures = {analysis: thread_pool.submit(run_analysis, analysis, model) for analysis, model in zip(selected_analyses, optimal_models) if model} for future in as_completed(futures.values()): analysis, result = future.result() analysis_results[analysis] = result progress(completed_analyses / total_analyses, desc=f"Completed {completed_analyses}/{total_analyses} analyses.") progress(1.0, desc="Analysis completed.") return analysis_results # file: analysis_runner.py (end)