Spaces:
Paused
Paused
| # filename: text_analyzer.py | |
| from concurrent.futures import ThreadPoolExecutor | |
| import os | |
| import multiprocessing | |
| from typing import List, Callable, Dict, Any | |
| from document_processor import extract_text_from_document | |
| from analysis_config import get_analysis_prompts | |
| from model_selector import select_optimal_model | |
| from analysis_runner import analyze_text_parallel | |
| from log_config import get_logger | |
| # Configure logging | |
| logger = get_logger('TextAnalyzer') | |
| class TextAnalyzer: | |
| def __init__(self): | |
| """ | |
| Initialize the TextAnalyzer with a thread pool executor. | |
| """ | |
| num_cpus = os.cpu_count() or multiprocessing.cpu_count() | |
| max_workers = max(1, num_cpus - 1) # Leave one CPU core for other processes | |
| self.thread_pool = ThreadPoolExecutor(max_workers=max_workers) | |
| def analyze_text(self, file_path: str, selected_analyses: List[str], progress: Callable[[float, str], None]) -> str: | |
| """ | |
| Perform text analysis and manage execution in a thread pool. | |
| Args: | |
| file_path (str): Path to the document file. | |
| selected_analyses (List[str]): List of analyses to perform. | |
| progress (Callable[[float, str], None]): Progress callback function. | |
| Returns: | |
| str: Formatted analysis results or an error message. | |
| """ | |
| try: | |
| # Process document and prepare for analysis | |
| raw_text = extract_text_from_document(file_path) | |
| analysis_prompts = get_analysis_prompts(raw_text) | |
| optimal_models = [select_optimal_model(raw_text) for _ in selected_analyses] | |
| if any(model is None for model in optimal_models): | |
| error_message = "No suitable model found for one or more analyses." | |
| logger.warning(error_message) | |
| return error_message | |
| progress(0.0, desc="Initializing analysis...") | |
| analysis_results = analyze_text_parallel( | |
| raw_text, | |
| selected_analyses, | |
| analysis_prompts, | |
| optimal_models, | |
| self.thread_pool, | |
| progress | |
| ) | |
| return self.format_results(analysis_results) | |
| except Exception as e: | |
| error_message = f"Error analyzing text: {str(e)}" | |
| logger.error(error_message, exc_info=True) | |
| return error_message | |
| def format_results(self, analysis_results: Dict[str, Any]) -> str: | |
| """ | |
| Format the analysis results into a structured markdown format. | |
| Args: | |
| analysis_results (Dict[str, Any]): The analysis results. | |
| Returns: | |
| str: Formatted analysis results. | |
| """ | |
| formatted_output = "## Analysis Results\n\n" | |
| for analysis, result in analysis_results.items(): | |
| content = result.content if hasattr(result, 'content') else 'No result available' | |
| formatted_output += f"### {analysis}\n{content}\n\n" | |
| return formatted_output | |
| # Ensure this script can be used as a module and provide testing capabilities | |
| if __name__ == "__main__": | |
| analyzer = TextAnalyzer() | |
| # Example usage | |
| def report_progress(progress: float, desc: str): | |
| print(f"{desc}: {progress * 100:.2f}%") | |
| try: | |
| results = analyzer.analyze_text("path_to_your_document.txt", ["Summary", "Sentiment"], report_progress) | |
| print(results) | |
| except Exception as e: | |
| logger.error(f"Failed to analyze text: {str(e)}") | |