rag-bajaj / RAG /advanced_rag_processor.py
quantumbit's picture
Upload 39 files
e8051be verified
"""
Advanced RAG Processor - Modular Version
Orchestrates all RAG components for document question answering.
Version: 3.0 - Modular Architecture
"""
import time
from typing import Dict, Tuple
from pathlib import Path
# Import all modular components
from RAG.rag_modules.query_expansion import QueryExpansionManager
from RAG.rag_modules.embedding_manager import EmbeddingManager
from RAG.rag_modules.search_manager import SearchManager
from RAG.rag_modules.reranking_manager import RerankingManager
from RAG.rag_modules.context_manager import ContextManager
from RAG.rag_modules.answer_generator import AnswerGenerator
from LLM.llm_handler import llm_handler
from config.config import OUTPUT_DIR, TOP_K
class AdvancedRAGProcessor:
"""
Advanced RAG processor with modular architecture for better maintainability.
Orchestrates query expansion, hybrid search, reranking, and answer generation.
"""
def __init__(self):
"""Initialize the advanced RAG processor with all modules."""
self.base_db_path = Path(OUTPUT_DIR)
# Initialize all managers
print("πŸš€ Initializing Advanced RAG Processor (Modular)...")
# Core components
self.embedding_manager = EmbeddingManager()
self.query_expansion_manager = QueryExpansionManager()
self.search_manager = SearchManager(self.embedding_manager)
self.reranking_manager = RerankingManager()
self.context_manager = ContextManager()
self.answer_generator = AnswerGenerator()
# Keep reference to LLM handler for info
self.llm_handler = llm_handler
print(f"βœ… Advanced RAG Processor initialized with {self.llm_handler.provider.upper()} LLM")
print("πŸ“¦ All modules loaded successfully:")
print(" πŸ”„ Query Expansion Manager")
print(" 🧠 Embedding Manager")
print(" πŸ” Search Manager (Hybrid)")
print(" 🎯 Reranking Manager")
print(" πŸ“ Context Manager")
print(" πŸ’¬ Answer Generator")
async def answer_question(self, question: str, doc_id: str, logger=None, request_id: str = None) -> Tuple[str, Dict[str, float]]:
"""
Answer a question using advanced RAG techniques with detailed timing.
Args:
question: The question to answer
doc_id: Document ID to search in
logger: Optional logger for tracking
request_id: Optional request ID for logging
Returns:
Tuple of (answer, timing_breakdown)
"""
timings = {}
overall_start = time.time()
try:
# Check if collection exists
collection_name = f"{doc_id}_collection"
try:
client = self.search_manager.get_qdrant_client(doc_id)
collection_info = client.get_collection(collection_name)
except Exception:
return "I don't have information about this document. Please ensure the document has been processed.", timings
print(f"πŸš€ Advanced RAG processing for: {question[:100]}...")
# Step 1: Query Expansion
step_start = time.time()
expanded_queries = await self.query_expansion_manager.expand_query(question)
expansion_time = time.time() - step_start
timings['query_expansion'] = expansion_time
if logger and request_id:
logger.log_pipeline_stage(request_id, "query_expansion", expansion_time)
# Step 2: Hybrid Search with Fusion
step_start = time.time()
search_results = await self.search_manager.hybrid_search(expanded_queries, doc_id, TOP_K)
search_time = time.time() - step_start
timings['hybrid_search'] = search_time
if logger and request_id:
logger.log_pipeline_stage(request_id, "hybrid_search", search_time)
if not search_results:
return "I couldn't find relevant information to answer your question.", timings
# Step 3: Reranking
step_start = time.time()
reranked_results = await self.reranking_manager.rerank_results(question, search_results)
rerank_time = time.time() - step_start
timings['reranking'] = rerank_time
if logger and request_id:
logger.log_pipeline_stage(request_id, "reranking", rerank_time)
# Step 4: Multi-perspective Context Creation
step_start = time.time()
context = self.context_manager.create_enhanced_context(question, reranked_results)
context_time = time.time() - step_start
timings['context_creation'] = context_time
if logger and request_id:
logger.log_pipeline_stage(request_id, "context_creation", context_time)
# Step 5: Enhanced Answer Generation
step_start = time.time()
answer = await self.answer_generator.generate_enhanced_answer(question, context, expanded_queries)
generation_time = time.time() - step_start
timings['llm_generation'] = generation_time
if logger and request_id:
logger.log_pipeline_stage(request_id, "llm_generation", generation_time)
# Calculate total time
total_time = time.time() - overall_start
timings['total_pipeline'] = total_time
print(f"\nβœ… Advanced RAG processing completed in {total_time:.4f}s")
print(f" πŸ” Query expansion: {expansion_time:.4f}s")
print(f" πŸ”Ž Hybrid search: {search_time:.4f}s")
print(f" 🎯 Reranking: {rerank_time:.4f}s")
print(f" πŸ“ Context creation: {context_time:.4f}s")
print(f" πŸ’¬ LLM generation: {generation_time:.4f}s")
return answer, timings
except Exception as e:
error_time = time.time() - overall_start
timings['error_time'] = error_time
print(f"❌ Error in advanced RAG processing: {str(e)}")
return f"I encountered an error while processing your question: {str(e)}", timings
def cleanup(self):
"""Cleanup all manager resources."""
print("🧹 Cleaning up Advanced RAG processor resources...")
# Cleanup search manager (which has the most resources)
self.search_manager.cleanup()
print("βœ… Advanced RAG cleanup completed")
def get_system_info(self) -> Dict:
"""Get information about the RAG system."""
return {
"version": "3.0 - Modular",
"llm_provider": self.llm_handler.provider,
"llm_model": self.llm_handler.model_name,
"modules": [
"QueryExpansionManager",
"EmbeddingManager",
"SearchManager",
"RerankingManager",
"ContextManager",
"AnswerGenerator"
],
"base_db_path": str(self.base_db_path)
}