Spaces:
Sleeping
Sleeping
| """ | |
| Advanced RAG Processor - Modular Version | |
| Orchestrates all RAG components for document question answering. | |
| Version: 3.0 - Modular Architecture | |
| """ | |
| import time | |
| from typing import Dict, Tuple | |
| from pathlib import Path | |
| # Import all modular components | |
| from RAG.rag_modules.query_expansion import QueryExpansionManager | |
| from RAG.rag_modules.embedding_manager import EmbeddingManager | |
| from RAG.rag_modules.search_manager import SearchManager | |
| from RAG.rag_modules.reranking_manager import RerankingManager | |
| from RAG.rag_modules.context_manager import ContextManager | |
| from RAG.rag_modules.answer_generator import AnswerGenerator | |
| from LLM.llm_handler import llm_handler | |
| from config.config import OUTPUT_DIR, TOP_K | |
| class AdvancedRAGProcessor: | |
| """ | |
| Advanced RAG processor with modular architecture for better maintainability. | |
| Orchestrates query expansion, hybrid search, reranking, and answer generation. | |
| """ | |
| def __init__(self): | |
| """Initialize the advanced RAG processor with all modules.""" | |
| self.base_db_path = Path(OUTPUT_DIR) | |
| # Initialize all managers | |
| print("π Initializing Advanced RAG Processor (Modular)...") | |
| # Core components | |
| self.embedding_manager = EmbeddingManager() | |
| self.query_expansion_manager = QueryExpansionManager() | |
| self.search_manager = SearchManager(self.embedding_manager) | |
| self.reranking_manager = RerankingManager() | |
| self.context_manager = ContextManager() | |
| self.answer_generator = AnswerGenerator() | |
| # Keep reference to LLM handler for info | |
| self.llm_handler = llm_handler | |
| print(f"β Advanced RAG Processor initialized with {self.llm_handler.provider.upper()} LLM") | |
| print("π¦ All modules loaded successfully:") | |
| print(" π Query Expansion Manager") | |
| print(" π§ Embedding Manager") | |
| print(" π Search Manager (Hybrid)") | |
| print(" π― Reranking Manager") | |
| print(" π Context Manager") | |
| print(" π¬ Answer Generator") | |
| async def answer_question(self, question: str, doc_id: str, logger=None, request_id: str = None) -> Tuple[str, Dict[str, float]]: | |
| """ | |
| Answer a question using advanced RAG techniques with detailed timing. | |
| Args: | |
| question: The question to answer | |
| doc_id: Document ID to search in | |
| logger: Optional logger for tracking | |
| request_id: Optional request ID for logging | |
| Returns: | |
| Tuple of (answer, timing_breakdown) | |
| """ | |
| timings = {} | |
| overall_start = time.time() | |
| try: | |
| # Check if collection exists | |
| collection_name = f"{doc_id}_collection" | |
| try: | |
| client = self.search_manager.get_qdrant_client(doc_id) | |
| collection_info = client.get_collection(collection_name) | |
| except Exception: | |
| return "I don't have information about this document. Please ensure the document has been processed.", timings | |
| print(f"π Advanced RAG processing for: {question[:100]}...") | |
| # Step 1: Query Expansion | |
| step_start = time.time() | |
| expanded_queries = await self.query_expansion_manager.expand_query(question) | |
| expansion_time = time.time() - step_start | |
| timings['query_expansion'] = expansion_time | |
| if logger and request_id: | |
| logger.log_pipeline_stage(request_id, "query_expansion", expansion_time) | |
| # Step 2: Hybrid Search with Fusion | |
| step_start = time.time() | |
| search_results = await self.search_manager.hybrid_search(expanded_queries, doc_id, TOP_K) | |
| search_time = time.time() - step_start | |
| timings['hybrid_search'] = search_time | |
| if logger and request_id: | |
| logger.log_pipeline_stage(request_id, "hybrid_search", search_time) | |
| if not search_results: | |
| return "I couldn't find relevant information to answer your question.", timings | |
| # Step 3: Reranking | |
| step_start = time.time() | |
| reranked_results = await self.reranking_manager.rerank_results(question, search_results) | |
| rerank_time = time.time() - step_start | |
| timings['reranking'] = rerank_time | |
| if logger and request_id: | |
| logger.log_pipeline_stage(request_id, "reranking", rerank_time) | |
| # Step 4: Multi-perspective Context Creation | |
| step_start = time.time() | |
| context = self.context_manager.create_enhanced_context(question, reranked_results) | |
| context_time = time.time() - step_start | |
| timings['context_creation'] = context_time | |
| if logger and request_id: | |
| logger.log_pipeline_stage(request_id, "context_creation", context_time) | |
| # Step 5: Enhanced Answer Generation | |
| step_start = time.time() | |
| answer = await self.answer_generator.generate_enhanced_answer(question, context, expanded_queries) | |
| generation_time = time.time() - step_start | |
| timings['llm_generation'] = generation_time | |
| if logger and request_id: | |
| logger.log_pipeline_stage(request_id, "llm_generation", generation_time) | |
| # Calculate total time | |
| total_time = time.time() - overall_start | |
| timings['total_pipeline'] = total_time | |
| print(f"\nβ Advanced RAG processing completed in {total_time:.4f}s") | |
| print(f" π Query expansion: {expansion_time:.4f}s") | |
| print(f" π Hybrid search: {search_time:.4f}s") | |
| print(f" π― Reranking: {rerank_time:.4f}s") | |
| print(f" π Context creation: {context_time:.4f}s") | |
| print(f" π¬ LLM generation: {generation_time:.4f}s") | |
| return answer, timings | |
| except Exception as e: | |
| error_time = time.time() - overall_start | |
| timings['error_time'] = error_time | |
| print(f"β Error in advanced RAG processing: {str(e)}") | |
| return f"I encountered an error while processing your question: {str(e)}", timings | |
| def cleanup(self): | |
| """Cleanup all manager resources.""" | |
| print("π§Ή Cleaning up Advanced RAG processor resources...") | |
| # Cleanup search manager (which has the most resources) | |
| self.search_manager.cleanup() | |
| print("β Advanced RAG cleanup completed") | |
| def get_system_info(self) -> Dict: | |
| """Get information about the RAG system.""" | |
| return { | |
| "version": "3.0 - Modular", | |
| "llm_provider": self.llm_handler.provider, | |
| "llm_model": self.llm_handler.model_name, | |
| "modules": [ | |
| "QueryExpansionManager", | |
| "EmbeddingManager", | |
| "SearchManager", | |
| "RerankingManager", | |
| "ContextManager", | |
| "AnswerGenerator" | |
| ], | |
| "base_db_path": str(self.base_db_path) | |
| } | |