File size: 3,502 Bytes
cb1a5c9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# filename: text_analyzer.py

from concurrent.futures import ThreadPoolExecutor
import os
import multiprocessing
from typing import List, Callable, Dict, Any

from document_processor import extract_text_from_document
from analysis_config import get_analysis_prompts
from model_selector import select_optimal_model
from analysis_runner import analyze_text_parallel
from log_config import get_logger

# Configure logging
logger = get_logger('TextAnalyzer')


class TextAnalyzer:
    def __init__(self):
        """
        Initialize the TextAnalyzer with a thread pool executor.
        """
        num_cpus = os.cpu_count() or multiprocessing.cpu_count()
        max_workers = max(1, num_cpus - 1)  # Leave one CPU core for other processes
        self.thread_pool = ThreadPoolExecutor(max_workers=max_workers)

    def analyze_text(self, file_path: str, selected_analyses: List[str], progress: Callable[[float, str], None]) -> str:
        """
        Perform text analysis and manage execution in a thread pool.

        Args:
            file_path (str): Path to the document file.
            selected_analyses (List[str]): List of analyses to perform.
            progress (Callable[[float, str], None]): Progress callback function.

        Returns:
            str: Formatted analysis results or an error message.
        """
        try:
            # Process document and prepare for analysis
            raw_text = extract_text_from_document(file_path)
            analysis_prompts = get_analysis_prompts(raw_text)
            optimal_models = [select_optimal_model(raw_text) for _ in selected_analyses]

            if any(model is None for model in optimal_models):
                error_message = "No suitable model found for one or more analyses."
                logger.warning(error_message)
                return error_message

            progress(0.0, desc="Initializing analysis...")
            analysis_results = analyze_text_parallel(
                raw_text,
                selected_analyses,
                analysis_prompts,
                optimal_models,
                self.thread_pool,
                progress
            )

            return self.format_results(analysis_results)
        except Exception as e:
            error_message = f"Error analyzing text: {str(e)}"
            logger.error(error_message, exc_info=True)
            return error_message

    def format_results(self, analysis_results: Dict[str, Any]) -> str:
        """
        Format the analysis results into a structured markdown format.

        Args:
            analysis_results (Dict[str, Any]): The analysis results.

        Returns:
            str: Formatted analysis results.
        """
        formatted_output = "## Analysis Results\n\n"
        for analysis, result in analysis_results.items():
            content = result.content if hasattr(result, 'content') else 'No result available'
            formatted_output += f"### {analysis}\n{content}\n\n"
        return formatted_output


# Ensure this script can be used as a module and provide testing capabilities
if __name__ == "__main__":
    analyzer = TextAnalyzer()
    # Example usage
    def report_progress(progress: float, desc: str):
        print(f"{desc}: {progress * 100:.2f}%")

    try:
        results = analyzer.analyze_text("path_to_your_document.txt", ["Summary", "Sentiment"], report_progress)
        print(results)
    except Exception as e:
        logger.error(f"Failed to analyze text: {str(e)}")