File size: 7,233 Bytes
e8051be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
"""
Advanced RAG Processor - Modular Version
Orchestrates all RAG components for document question answering.
Version: 3.0 - Modular Architecture
"""

import time
from typing import Dict, Tuple
from pathlib import Path

# Import all modular components
from RAG.rag_modules.query_expansion import QueryExpansionManager
from RAG.rag_modules.embedding_manager import EmbeddingManager
from RAG.rag_modules.search_manager import SearchManager
from RAG.rag_modules.reranking_manager import RerankingManager
from RAG.rag_modules.context_manager import ContextManager
from RAG.rag_modules.answer_generator import AnswerGenerator

from LLM.llm_handler import llm_handler
from config.config import OUTPUT_DIR, TOP_K


class AdvancedRAGProcessor:
    """
    Advanced RAG processor with modular architecture for better maintainability.
    Orchestrates query expansion, hybrid search, reranking, and answer generation.
    """
    
    def __init__(self):
        """Initialize the advanced RAG processor with all modules."""
        self.base_db_path = Path(OUTPUT_DIR)
        
        # Initialize all managers
        print("πŸš€ Initializing Advanced RAG Processor (Modular)...")
        
        # Core components
        self.embedding_manager = EmbeddingManager()
        self.query_expansion_manager = QueryExpansionManager()
        self.search_manager = SearchManager(self.embedding_manager)
        self.reranking_manager = RerankingManager()
        self.context_manager = ContextManager()
        self.answer_generator = AnswerGenerator()
        
        # Keep reference to LLM handler for info
        self.llm_handler = llm_handler
        
        print(f"βœ… Advanced RAG Processor initialized with {self.llm_handler.provider.upper()} LLM")
        print("πŸ“¦ All modules loaded successfully:")
        print("   πŸ”„ Query Expansion Manager")
        print("   🧠 Embedding Manager")
        print("   πŸ” Search Manager (Hybrid)")
        print("   🎯 Reranking Manager")
        print("   πŸ“ Context Manager")
        print("   πŸ’¬ Answer Generator")
    
    async def answer_question(self, question: str, doc_id: str, logger=None, request_id: str = None) -> Tuple[str, Dict[str, float]]:
        """
        Answer a question using advanced RAG techniques with detailed timing.
        
        Args:
            question: The question to answer
            doc_id: Document ID to search in
            logger: Optional logger for tracking
            request_id: Optional request ID for logging
            
        Returns:
            Tuple of (answer, timing_breakdown)
        """
        timings = {}
        overall_start = time.time()
        
        try:
            # Check if collection exists
            collection_name = f"{doc_id}_collection"
            try:
                client = self.search_manager.get_qdrant_client(doc_id)
                collection_info = client.get_collection(collection_name)
            except Exception:
                return "I don't have information about this document. Please ensure the document has been processed.", timings
            
            print(f"πŸš€ Advanced RAG processing for: {question[:100]}...")
            
            # Step 1: Query Expansion
            step_start = time.time()
            expanded_queries = await self.query_expansion_manager.expand_query(question)
            expansion_time = time.time() - step_start
            timings['query_expansion'] = expansion_time
            if logger and request_id:
                logger.log_pipeline_stage(request_id, "query_expansion", expansion_time)
            
            # Step 2: Hybrid Search with Fusion
            step_start = time.time()
            search_results = await self.search_manager.hybrid_search(expanded_queries, doc_id, TOP_K)
            search_time = time.time() - step_start
            timings['hybrid_search'] = search_time
            if logger and request_id:
                logger.log_pipeline_stage(request_id, "hybrid_search", search_time)
            
            if not search_results:
                return "I couldn't find relevant information to answer your question.", timings
            
            # Step 3: Reranking
            step_start = time.time()
            reranked_results = await self.reranking_manager.rerank_results(question, search_results)
            rerank_time = time.time() - step_start
            timings['reranking'] = rerank_time
            if logger and request_id:
                logger.log_pipeline_stage(request_id, "reranking", rerank_time)
            
            # Step 4: Multi-perspective Context Creation
            step_start = time.time()
            context = self.context_manager.create_enhanced_context(question, reranked_results)
            context_time = time.time() - step_start
            timings['context_creation'] = context_time
            if logger and request_id:
                logger.log_pipeline_stage(request_id, "context_creation", context_time)

            # Step 5: Enhanced Answer Generation
            step_start = time.time()
            answer = await self.answer_generator.generate_enhanced_answer(question, context, expanded_queries)
            generation_time = time.time() - step_start
            timings['llm_generation'] = generation_time
            if logger and request_id:
                logger.log_pipeline_stage(request_id, "llm_generation", generation_time)

            # Calculate total time
            total_time = time.time() - overall_start
            timings['total_pipeline'] = total_time

            print(f"\nβœ… Advanced RAG processing completed in {total_time:.4f}s")
            print(f"   πŸ” Query expansion: {expansion_time:.4f}s")
            print(f"   πŸ”Ž Hybrid search: {search_time:.4f}s") 
            print(f"   🎯 Reranking: {rerank_time:.4f}s")
            print(f"   πŸ“ Context creation: {context_time:.4f}s")
            print(f"   πŸ’¬ LLM generation: {generation_time:.4f}s")

            return answer, timings
            
        except Exception as e:
            error_time = time.time() - overall_start
            timings['error_time'] = error_time
            print(f"❌ Error in advanced RAG processing: {str(e)}")
            return f"I encountered an error while processing your question: {str(e)}", timings
    
    def cleanup(self):
        """Cleanup all manager resources."""
        print("🧹 Cleaning up Advanced RAG processor resources...")
        
        # Cleanup search manager (which has the most resources)
        self.search_manager.cleanup()
        
        print("βœ… Advanced RAG cleanup completed")
    
    def get_system_info(self) -> Dict:
        """Get information about the RAG system."""
        return {
            "version": "3.0 - Modular",
            "llm_provider": self.llm_handler.provider,
            "llm_model": self.llm_handler.model_name,
            "modules": [
                "QueryExpansionManager",
                "EmbeddingManager", 
                "SearchManager",
                "RerankingManager",
                "ContextManager",
                "AnswerGenerator"
            ],
            "base_db_path": str(self.base_db_path)
        }