Spaces:
Sleeping
Sleeping
File size: 7,233 Bytes
e8051be |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
"""
Advanced RAG Processor - Modular Version
Orchestrates all RAG components for document question answering.
Version: 3.0 - Modular Architecture
"""
import time
from typing import Dict, Tuple
from pathlib import Path
# Import all modular components
from RAG.rag_modules.query_expansion import QueryExpansionManager
from RAG.rag_modules.embedding_manager import EmbeddingManager
from RAG.rag_modules.search_manager import SearchManager
from RAG.rag_modules.reranking_manager import RerankingManager
from RAG.rag_modules.context_manager import ContextManager
from RAG.rag_modules.answer_generator import AnswerGenerator
from LLM.llm_handler import llm_handler
from config.config import OUTPUT_DIR, TOP_K
class AdvancedRAGProcessor:
"""
Advanced RAG processor with modular architecture for better maintainability.
Orchestrates query expansion, hybrid search, reranking, and answer generation.
"""
def __init__(self):
"""Initialize the advanced RAG processor with all modules."""
self.base_db_path = Path(OUTPUT_DIR)
# Initialize all managers
print("π Initializing Advanced RAG Processor (Modular)...")
# Core components
self.embedding_manager = EmbeddingManager()
self.query_expansion_manager = QueryExpansionManager()
self.search_manager = SearchManager(self.embedding_manager)
self.reranking_manager = RerankingManager()
self.context_manager = ContextManager()
self.answer_generator = AnswerGenerator()
# Keep reference to LLM handler for info
self.llm_handler = llm_handler
print(f"β
Advanced RAG Processor initialized with {self.llm_handler.provider.upper()} LLM")
print("π¦ All modules loaded successfully:")
print(" π Query Expansion Manager")
print(" π§ Embedding Manager")
print(" π Search Manager (Hybrid)")
print(" π― Reranking Manager")
print(" π Context Manager")
print(" π¬ Answer Generator")
async def answer_question(self, question: str, doc_id: str, logger=None, request_id: str = None) -> Tuple[str, Dict[str, float]]:
"""
Answer a question using advanced RAG techniques with detailed timing.
Args:
question: The question to answer
doc_id: Document ID to search in
logger: Optional logger for tracking
request_id: Optional request ID for logging
Returns:
Tuple of (answer, timing_breakdown)
"""
timings = {}
overall_start = time.time()
try:
# Check if collection exists
collection_name = f"{doc_id}_collection"
try:
client = self.search_manager.get_qdrant_client(doc_id)
collection_info = client.get_collection(collection_name)
except Exception:
return "I don't have information about this document. Please ensure the document has been processed.", timings
print(f"π Advanced RAG processing for: {question[:100]}...")
# Step 1: Query Expansion
step_start = time.time()
expanded_queries = await self.query_expansion_manager.expand_query(question)
expansion_time = time.time() - step_start
timings['query_expansion'] = expansion_time
if logger and request_id:
logger.log_pipeline_stage(request_id, "query_expansion", expansion_time)
# Step 2: Hybrid Search with Fusion
step_start = time.time()
search_results = await self.search_manager.hybrid_search(expanded_queries, doc_id, TOP_K)
search_time = time.time() - step_start
timings['hybrid_search'] = search_time
if logger and request_id:
logger.log_pipeline_stage(request_id, "hybrid_search", search_time)
if not search_results:
return "I couldn't find relevant information to answer your question.", timings
# Step 3: Reranking
step_start = time.time()
reranked_results = await self.reranking_manager.rerank_results(question, search_results)
rerank_time = time.time() - step_start
timings['reranking'] = rerank_time
if logger and request_id:
logger.log_pipeline_stage(request_id, "reranking", rerank_time)
# Step 4: Multi-perspective Context Creation
step_start = time.time()
context = self.context_manager.create_enhanced_context(question, reranked_results)
context_time = time.time() - step_start
timings['context_creation'] = context_time
if logger and request_id:
logger.log_pipeline_stage(request_id, "context_creation", context_time)
# Step 5: Enhanced Answer Generation
step_start = time.time()
answer = await self.answer_generator.generate_enhanced_answer(question, context, expanded_queries)
generation_time = time.time() - step_start
timings['llm_generation'] = generation_time
if logger and request_id:
logger.log_pipeline_stage(request_id, "llm_generation", generation_time)
# Calculate total time
total_time = time.time() - overall_start
timings['total_pipeline'] = total_time
print(f"\nβ
Advanced RAG processing completed in {total_time:.4f}s")
print(f" π Query expansion: {expansion_time:.4f}s")
print(f" π Hybrid search: {search_time:.4f}s")
print(f" π― Reranking: {rerank_time:.4f}s")
print(f" π Context creation: {context_time:.4f}s")
print(f" π¬ LLM generation: {generation_time:.4f}s")
return answer, timings
except Exception as e:
error_time = time.time() - overall_start
timings['error_time'] = error_time
print(f"β Error in advanced RAG processing: {str(e)}")
return f"I encountered an error while processing your question: {str(e)}", timings
def cleanup(self):
"""Cleanup all manager resources."""
print("π§Ή Cleaning up Advanced RAG processor resources...")
# Cleanup search manager (which has the most resources)
self.search_manager.cleanup()
print("β
Advanced RAG cleanup completed")
def get_system_info(self) -> Dict:
"""Get information about the RAG system."""
return {
"version": "3.0 - Modular",
"llm_provider": self.llm_handler.provider,
"llm_model": self.llm_handler.model_name,
"modules": [
"QueryExpansionManager",
"EmbeddingManager",
"SearchManager",
"RerankingManager",
"ContextManager",
"AnswerGenerator"
],
"base_db_path": str(self.base_db_path)
}
|