editor-app-v10 / analysis_runner.py
gilzero's picture
Upload folder using huggingface_hub
cb1a5c9 verified
# filename: analysis_runner.py
"""
Module for running text analysis in parallel with rate limiting.
"""
from concurrent.futures import as_completed
from ratelimit import limits, RateLimitException
from log_config import get_logger
logger = get_logger('AnalysisRunner')
# Constants for rate limiting
RATE_LIMIT = 8 # Maximum number of concurrent requests
RATE_PERIOD = 1 # Rate period in seconds
# Apply the rate limiter decorator
calls = limits(calls=RATE_LIMIT, period=RATE_PERIOD)
def analyze_text_parallel(raw_text: str, selected_analyses: list, analysis_prompts: dict, optimal_models: list, thread_pool, progress) -> dict:
"""
Run text analyses in parallel with rate limiting.
Args:
raw_text (str): The text to be analyzed.
selected_analyses (list): A list of selected analyses to perform.
analysis_prompts (dict): A dictionary containing analysis prompts.
optimal_models (list): A list of optimal models for each analysis.
thread_pool (concurrent.futures.Executor): The thread pool executor for running analyses.
progress (callable): A callable to report progress.
Returns:
dict: A dictionary with analysis results.
"""
logger.debug(f"Selected Analyses: {selected_analyses}, Type: {type(selected_analyses)}")
logger.debug(f"Optimal Models: {optimal_models}, Type: {type(optimal_models)}")
analysis_results = {}
total_analyses = len(selected_analyses)
completed_analyses = 0
@calls
def run_analysis(analysis: str, model) -> tuple:
nonlocal completed_analyses
try:
prompt = analysis_prompts[analysis]
result = model.invoke(prompt)
logger.info(f"Analysis '{analysis}' completed successfully using {model.__class__.__name__}.")
completed_analyses += 1
progress(completed_analyses / total_analyses, desc=f"Analysis '{analysis}' completed.")
return analysis, result
except RateLimitException as e:
error_message = f"Rate limit exceeded for analysis '{analysis}' using {model.__class__.__name__}. Error: {str(e)}"
logger.warning(error_message)
return analysis, "Analysis failed due to rate limit exceeded."
except Exception as e:
error_message = f"Error running analysis '{analysis}' using {model.__class__.__name__}. Error type: {type(e).__name__}. Error message: {str(e)}"
logger.error(error_message, exc_info=True)
return analysis, f"Analysis failed due to an error: {str(e)}"
futures = {analysis: thread_pool.submit(run_analysis, analysis, model)
for analysis, model in zip(selected_analyses, optimal_models) if model}
for future in as_completed(futures.values()):
analysis, result = future.result()
analysis_results[analysis] = result
progress(completed_analyses / total_analyses, desc=f"Completed {completed_analyses}/{total_analyses} analyses.")
progress(1.0, desc="Analysis completed.")
return analysis_results
# file: analysis_runner.py (end)