""" Advanced RAG Processor - Modular Version Orchestrates all RAG components for document question answering. Version: 3.0 - Modular Architecture """ import time from typing import Dict, Tuple from pathlib import Path # Import all modular components from RAG.rag_modules.query_expansion import QueryExpansionManager from RAG.rag_modules.embedding_manager import EmbeddingManager from RAG.rag_modules.search_manager import SearchManager from RAG.rag_modules.reranking_manager import RerankingManager from RAG.rag_modules.context_manager import ContextManager from RAG.rag_modules.answer_generator import AnswerGenerator from LLM.llm_handler import llm_handler from config.config import OUTPUT_DIR, TOP_K class AdvancedRAGProcessor: """ Advanced RAG processor with modular architecture for better maintainability. Orchestrates query expansion, hybrid search, reranking, and answer generation. """ def __init__(self): """Initialize the advanced RAG processor with all modules.""" self.base_db_path = Path(OUTPUT_DIR) # Initialize all managers print("๐Ÿš€ Initializing Advanced RAG Processor (Modular)...") # Core components self.embedding_manager = EmbeddingManager() self.query_expansion_manager = QueryExpansionManager() self.search_manager = SearchManager(self.embedding_manager) self.reranking_manager = RerankingManager() self.context_manager = ContextManager() self.answer_generator = AnswerGenerator() # Keep reference to LLM handler for info self.llm_handler = llm_handler print(f"โœ… Advanced RAG Processor initialized with {self.llm_handler.provider.upper()} LLM") print("๐Ÿ“ฆ All modules loaded successfully:") print(" ๐Ÿ”„ Query Expansion Manager") print(" ๐Ÿง  Embedding Manager") print(" ๐Ÿ” Search Manager (Hybrid)") print(" ๐ŸŽฏ Reranking Manager") print(" ๐Ÿ“ Context Manager") print(" ๐Ÿ’ฌ Answer Generator") async def answer_question(self, question: str, doc_id: str, logger=None, request_id: str = None) -> Tuple[str, Dict[str, float]]: """ Answer a question using advanced RAG techniques with detailed timing. Args: question: The question to answer doc_id: Document ID to search in logger: Optional logger for tracking request_id: Optional request ID for logging Returns: Tuple of (answer, timing_breakdown) """ timings = {} overall_start = time.time() try: # Check if collection exists collection_name = f"{doc_id}_collection" try: client = self.search_manager.get_qdrant_client(doc_id) collection_info = client.get_collection(collection_name) except Exception: return "I don't have information about this document. Please ensure the document has been processed.", timings print(f"๐Ÿš€ Advanced RAG processing for: {question[:100]}...") # Step 1: Query Expansion step_start = time.time() expanded_queries = await self.query_expansion_manager.expand_query(question) expansion_time = time.time() - step_start timings['query_expansion'] = expansion_time if logger and request_id: logger.log_pipeline_stage(request_id, "query_expansion", expansion_time) # Step 2: Hybrid Search with Fusion step_start = time.time() search_results = await self.search_manager.hybrid_search(expanded_queries, doc_id, TOP_K) search_time = time.time() - step_start timings['hybrid_search'] = search_time if logger and request_id: logger.log_pipeline_stage(request_id, "hybrid_search", search_time) if not search_results: return "I couldn't find relevant information to answer your question.", timings # Step 3: Reranking step_start = time.time() reranked_results = await self.reranking_manager.rerank_results(question, search_results) rerank_time = time.time() - step_start timings['reranking'] = rerank_time if logger and request_id: logger.log_pipeline_stage(request_id, "reranking", rerank_time) # Step 4: Multi-perspective Context Creation step_start = time.time() context = self.context_manager.create_enhanced_context(question, reranked_results) context_time = time.time() - step_start timings['context_creation'] = context_time if logger and request_id: logger.log_pipeline_stage(request_id, "context_creation", context_time) # Step 5: Enhanced Answer Generation step_start = time.time() answer = await self.answer_generator.generate_enhanced_answer(question, context, expanded_queries) generation_time = time.time() - step_start timings['llm_generation'] = generation_time if logger and request_id: logger.log_pipeline_stage(request_id, "llm_generation", generation_time) # Calculate total time total_time = time.time() - overall_start timings['total_pipeline'] = total_time print(f"\nโœ… Advanced RAG processing completed in {total_time:.4f}s") print(f" ๐Ÿ” Query expansion: {expansion_time:.4f}s") print(f" ๐Ÿ”Ž Hybrid search: {search_time:.4f}s") print(f" ๐ŸŽฏ Reranking: {rerank_time:.4f}s") print(f" ๐Ÿ“ Context creation: {context_time:.4f}s") print(f" ๐Ÿ’ฌ LLM generation: {generation_time:.4f}s") return answer, timings except Exception as e: error_time = time.time() - overall_start timings['error_time'] = error_time print(f"โŒ Error in advanced RAG processing: {str(e)}") return f"I encountered an error while processing your question: {str(e)}", timings def cleanup(self): """Cleanup all manager resources.""" print("๐Ÿงน Cleaning up Advanced RAG processor resources...") # Cleanup search manager (which has the most resources) self.search_manager.cleanup() print("โœ… Advanced RAG cleanup completed") def get_system_info(self) -> Dict: """Get information about the RAG system.""" return { "version": "3.0 - Modular", "llm_provider": self.llm_handler.provider, "llm_model": self.llm_handler.model_name, "modules": [ "QueryExpansionManager", "EmbeddingManager", "SearchManager", "RerankingManager", "ContextManager", "AnswerGenerator" ], "base_db_path": str(self.base_db_path) }