File size: 3,101 Bytes
cb1a5c9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# filename: analysis_runner.py

"""
Module for running text analysis in parallel with rate limiting.
"""

from concurrent.futures import as_completed
from ratelimit import limits, RateLimitException
from log_config import get_logger

logger = get_logger('AnalysisRunner')

# Constants for rate limiting
RATE_LIMIT = 8  # Maximum number of concurrent requests
RATE_PERIOD = 1  # Rate period in seconds

# Apply the rate limiter decorator
calls = limits(calls=RATE_LIMIT, period=RATE_PERIOD)

def analyze_text_parallel(raw_text: str, selected_analyses: list, analysis_prompts: dict, optimal_models: list, thread_pool, progress) -> dict:
    """
    Run text analyses in parallel with rate limiting.

    Args:
        raw_text (str): The text to be analyzed.
        selected_analyses (list): A list of selected analyses to perform.
        analysis_prompts (dict): A dictionary containing analysis prompts.
        optimal_models (list): A list of optimal models for each analysis.
        thread_pool (concurrent.futures.Executor): The thread pool executor for running analyses.
        progress (callable): A callable to report progress.

    Returns:
        dict: A dictionary with analysis results.
    """
    logger.debug(f"Selected Analyses: {selected_analyses}, Type: {type(selected_analyses)}")
    logger.debug(f"Optimal Models: {optimal_models}, Type: {type(optimal_models)}")

    analysis_results = {}
    total_analyses = len(selected_analyses)
    completed_analyses = 0

    @calls
    def run_analysis(analysis: str, model) -> tuple:
        nonlocal completed_analyses
        try:
            prompt = analysis_prompts[analysis]
            result = model.invoke(prompt)
            logger.info(f"Analysis '{analysis}' completed successfully using {model.__class__.__name__}.")
            completed_analyses += 1
            progress(completed_analyses / total_analyses, desc=f"Analysis '{analysis}' completed.")
            return analysis, result
        except RateLimitException as e:
            error_message = f"Rate limit exceeded for analysis '{analysis}' using {model.__class__.__name__}. Error: {str(e)}"
            logger.warning(error_message)
            return analysis, "Analysis failed due to rate limit exceeded."
        except Exception as e:
            error_message = f"Error running analysis '{analysis}' using {model.__class__.__name__}. Error type: {type(e).__name__}. Error message: {str(e)}"
            logger.error(error_message, exc_info=True)
            return analysis, f"Analysis failed due to an error: {str(e)}"

    futures = {analysis: thread_pool.submit(run_analysis, analysis, model)
               for analysis, model in zip(selected_analyses, optimal_models) if model}

    for future in as_completed(futures.values()):
        analysis, result = future.result()
        analysis_results[analysis] = result
        progress(completed_analyses / total_analyses, desc=f"Completed {completed_analyses}/{total_analyses} analyses.")

    progress(1.0, desc="Analysis completed.")
    return analysis_results

# file: analysis_runner.py (end)