Spaces:
Paused
Paused
| # filename: analysis_runner.py | |
| """ | |
| Module for running text analysis in parallel with rate limiting. | |
| """ | |
| from concurrent.futures import as_completed | |
| from ratelimit import limits, RateLimitException | |
| from log_config import get_logger | |
| logger = get_logger('AnalysisRunner') | |
| # Constants for rate limiting | |
| RATE_LIMIT = 8 # Maximum number of concurrent requests | |
| RATE_PERIOD = 1 # Rate period in seconds | |
| # Apply the rate limiter decorator | |
| calls = limits(calls=RATE_LIMIT, period=RATE_PERIOD) | |
| def analyze_text_parallel(raw_text: str, selected_analyses: list, analysis_prompts: dict, optimal_models: list, thread_pool, progress) -> dict: | |
| """ | |
| Run text analyses in parallel with rate limiting. | |
| Args: | |
| raw_text (str): The text to be analyzed. | |
| selected_analyses (list): A list of selected analyses to perform. | |
| analysis_prompts (dict): A dictionary containing analysis prompts. | |
| optimal_models (list): A list of optimal models for each analysis. | |
| thread_pool (concurrent.futures.Executor): The thread pool executor for running analyses. | |
| progress (callable): A callable to report progress. | |
| Returns: | |
| dict: A dictionary with analysis results. | |
| """ | |
| logger.debug(f"Selected Analyses: {selected_analyses}, Type: {type(selected_analyses)}") | |
| logger.debug(f"Optimal Models: {optimal_models}, Type: {type(optimal_models)}") | |
| analysis_results = {} | |
| total_analyses = len(selected_analyses) | |
| completed_analyses = 0 | |
| def run_analysis(analysis: str, model) -> tuple: | |
| nonlocal completed_analyses | |
| try: | |
| prompt = analysis_prompts[analysis] | |
| result = model.invoke(prompt) | |
| logger.info(f"Analysis '{analysis}' completed successfully using {model.__class__.__name__}.") | |
| completed_analyses += 1 | |
| progress(completed_analyses / total_analyses, desc=f"Analysis '{analysis}' completed.") | |
| return analysis, result | |
| except RateLimitException as e: | |
| error_message = f"Rate limit exceeded for analysis '{analysis}' using {model.__class__.__name__}. Error: {str(e)}" | |
| logger.warning(error_message) | |
| return analysis, "Analysis failed due to rate limit exceeded." | |
| except Exception as e: | |
| error_message = f"Error running analysis '{analysis}' using {model.__class__.__name__}. Error type: {type(e).__name__}. Error message: {str(e)}" | |
| logger.error(error_message, exc_info=True) | |
| return analysis, f"Analysis failed due to an error: {str(e)}" | |
| futures = {analysis: thread_pool.submit(run_analysis, analysis, model) | |
| for analysis, model in zip(selected_analyses, optimal_models) if model} | |
| for future in as_completed(futures.values()): | |
| analysis, result = future.result() | |
| analysis_results[analysis] = result | |
| progress(completed_analyses / total_analyses, desc=f"Completed {completed_analyses}/{total_analyses} analyses.") | |
| progress(1.0, desc="Analysis completed.") | |
| return analysis_results | |
| # file: analysis_runner.py (end) | |