Spaces:
Runtime error
Runtime error
File size: 11,769 Bytes
9e5bc69 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
"""
Vector augmentation engine implementing Phase E (Steps 13-14).
Handles vector search operations and result fusion:
- Vector similarity search for additional context (Step 13)
- Result fusion strategy for enhanced answers (Step 14)
"""
import logging
import numpy as np
from typing import Dict, List, Any, Tuple
from dataclasses import dataclass
from datetime import datetime
from .setup import GraphRAGSetup
from .query_preprocessing import DriftRoutingResult
@dataclass
class VectorSearchResult:
"""Vector search result with similarity and content."""
document_id: str
content: str
similarity_score: float
metadata: Dict[str, Any]
source_type: str # 'vector_db', 'semantic_search'
relevance_score: float
@dataclass
class AugmentationResult:
"""Phase E augmentation result with enhanced context."""
vector_results: List[VectorSearchResult]
enhanced_context: str
fusion_strategy: str
augmentation_confidence: float
execution_time: float
metadata: Dict[str, Any]
class VectorAugmentationEngine:
def __init__(self, setup: GraphRAGSetup):
self.setup = setup
self.vector_engine = setup.query_engine # Milvus vector engine
self.embedding_model = setup.embedding_model
self.config = setup.config
self.logger = logging.getLogger(self.__class__.__name__)
# Vector search parameters
self.similarity_threshold = 0.75
self.max_vector_results = 10
async def execute_vector_augmentation_phase(self,
query_embedding: List[float],
graph_results: Dict[str, Any],
routing_result: DriftRoutingResult) -> AugmentationResult:
"""
Execute vector augmentation phase with similarity search.
Args:
query_embedding: Query vector for similarity matching
graph_results: Results from graph-based search
routing_result: Routing decision parameters
Returns:
Augmentation results with vector context
"""
start_time = datetime.now()
try:
# Step 13: Vector Similarity Search
self.logger.info("Starting Step 13: Vector Similarity Search")
vector_results = await self._perform_vector_search(
query_embedding, routing_result
)
# Step 14: Result Fusion and Enhancement
self.logger.info("Starting Step 14: Result Fusion and Enhancement")
enhanced_context = await self._fuse_results(
vector_results, graph_results, routing_result
)
execution_time = (datetime.now() - start_time).total_seconds()
augmentation_result = AugmentationResult(
vector_results=vector_results,
enhanced_context=enhanced_context,
fusion_strategy='graph_vector_hybrid',
augmentation_confidence=self._calculate_augmentation_confidence(vector_results),
execution_time=execution_time,
metadata={
'vector_results_count': len(vector_results),
'avg_similarity': np.mean([r.similarity_score for r in vector_results]) if vector_results else 0,
'phase': 'vector_augmentation',
'step_range': '13-14'
}
)
self.logger.info(f"Phase E completed: {len(vector_results)} vector results, augmentation confidence: {augmentation_result.augmentation_confidence:.3f}")
return augmentation_result
except Exception as e:
self.logger.error(f"Vector augmentation phase failed: {e}")
# Return empty augmentation on failure
return AugmentationResult(
vector_results=[],
enhanced_context="",
fusion_strategy='graph_only',
augmentation_confidence=0.0,
execution_time=(datetime.now() - start_time).total_seconds(),
metadata={'error': str(e), 'fallback': True}
)
async def _perform_vector_search(self,
query_embedding: List[float],
routing_result: DriftRoutingResult) -> List[VectorSearchResult]:
"""
Step 13: Perform comprehensive vector similarity search.
Uses the Milvus vector database to find semantically similar content.
"""
try:
vector_results = []
# Use the existing vector query engine for similarity search
if self.vector_engine:
# Query the vector database with the embedding
search_results = self.vector_engine.query(routing_result.original_query)
# Extract vector search results from the response
if hasattr(search_results, 'source_nodes') and search_results.source_nodes:
for i, node in enumerate(search_results.source_nodes[:self.max_vector_results]):
# Calculate similarity score (handle different node types)
similarity_score = 0.8 # Default similarity
if hasattr(node, 'score'):
similarity_score = node.score
elif hasattr(node, 'similarity'):
similarity_score = node.similarity
elif hasattr(node, 'metadata') and 'score' in node.metadata:
similarity_score = node.metadata['score']
# Extract content (handle different node types)
content = ""
if hasattr(node, 'text'):
content = node.text
elif hasattr(node, 'content'):
content = node.content
elif hasattr(node, 'get_content'):
content = node.get_content()
else:
content = str(node)
# Extract metadata safely
node_metadata = {}
if hasattr(node, 'metadata') and node.metadata:
node_metadata = node.metadata
elif hasattr(node, 'extra_info') and node.extra_info:
node_metadata = node.extra_info
vector_result = VectorSearchResult(
document_id=node_metadata.get('doc_id', f"doc_{i}"),
content=content,
similarity_score=similarity_score,
metadata=node_metadata,
source_type='vector_db',
relevance_score=similarity_score * 0.9 # Slightly weighted down
)
# Only include results above similarity threshold
if similarity_score >= self.similarity_threshold:
vector_results.append(vector_result)
self.logger.info(f"Vector search completed: {len(vector_results)} results above threshold {self.similarity_threshold}")
else:
self.logger.warning("Vector engine not available, skipping vector search")
return vector_results
except Exception as e:
self.logger.error(f"Vector search failed: {e}")
return []
async def _fuse_results(self,
vector_results: List[VectorSearchResult],
graph_results: Dict[str, Any],
routing_result: DriftRoutingResult) -> str:
"""
Step 14: Fuse vector and graph results for enhanced context.
Combines graph-based entity relationships with vector similarity content.
"""
try:
fusion_parts = []
# Start with graph-based context (Phase C & D results)
if 'initial_answer' in graph_results:
initial_answer = graph_results['initial_answer']
if isinstance(initial_answer, dict) and 'content' in initial_answer:
fusion_parts.extend([
"=== GRAPH-BASED KNOWLEDGE ===",
initial_answer['content'],
""
])
# Add vector-based augmentation
if vector_results:
fusion_parts.extend([
"=== SEMANTIC AUGMENTATION ===",
"Additional relevant information from vector similarity search:",
""
])
for i, result in enumerate(vector_results[:5], 1): # Top 5 vector results
fusion_parts.extend([
f"**{i}. Vector Result (Similarity: {result.similarity_score:.3f})**",
result.content, # Show full content without truncation
""
])
# Add fusion methodology explanation
fusion_parts.extend([
"=== FUSION METHODOLOGY ===",
"This enhanced answer combines graph-based entity relationships with vector semantic similarity search.",
"Graph results provide structured knowledge connections, while vector search adds contextual depth.",
""
])
enhanced_context = "\n".join(fusion_parts)
self.logger.info(f"Result fusion completed: {len(fusion_parts)} context sections")
return enhanced_context
except Exception as e:
self.logger.error(f"Result fusion failed: {e}")
return "Graph-based results only (vector fusion failed)"
def _calculate_augmentation_confidence(self, vector_results: List[VectorSearchResult]) -> float:
"""Calculate confidence score for the augmentation results."""
if not vector_results:
return 0.0
# Base confidence on average similarity and result count
avg_similarity = np.mean([r.similarity_score for r in vector_results])
count_factor = min(len(vector_results) / 10, 1.0) # Normalize to max 10 results
# Combined confidence
confidence = (avg_similarity * 0.7) + (count_factor * 0.3)
return min(confidence, 1.0)
def get_augmentation_stats(self) -> Dict[str, Any]:
"""Get statistics about vector augmentation performance."""
return {
'similarity_threshold': self.similarity_threshold,
'max_vector_results': self.max_vector_results,
'vector_engine_ready': bool(self.vector_engine),
'embedding_model': str(self.embedding_model) if self.embedding_model else None
}
# Export main class
__all__ = ['VectorAugmentationEngine', 'VectorSearchResult', 'AugmentationResult'] |