| | """ |
| | Retriever Agent |
| | |
| | Implements hybrid retrieval combining dense and sparse methods. |
| | Follows FAANG best practices for production RAG systems. |
| | |
| | Key Features: |
| | - Dense retrieval (embedding-based semantic search) |
| | - Sparse retrieval (BM25/TF-IDF keyword matching) |
| | - Reciprocal Rank Fusion (RRF) for combining results |
| | - Query expansion using planner output |
| | - Adaptive retrieval based on query intent |
| | """ |
| |
|
| | from typing import List, Optional, Dict, Any, Tuple |
| | from pydantic import BaseModel, Field |
| | from loguru import logger |
| | from dataclasses import dataclass |
| | from collections import defaultdict |
| | import re |
| | import math |
| |
|
| | from ..store import VectorStore, VectorSearchResult, get_vector_store, VectorStoreConfig |
| | from ..embeddings import EmbeddingAdapter, get_embedding_adapter, EmbeddingConfig |
| | from .query_planner import QueryPlan, SubQuery, QueryIntent |
| |
|
| |
|
| | class HybridSearchConfig(BaseModel): |
| | """Configuration for hybrid retrieval.""" |
| | |
| | dense_weight: float = Field(default=0.7, ge=0.0, le=1.0) |
| | dense_top_k: int = Field(default=20, ge=1) |
| |
|
| | |
| | sparse_weight: float = Field(default=0.3, ge=0.0, le=1.0) |
| | sparse_top_k: int = Field(default=20, ge=1) |
| |
|
| | |
| | rrf_k: int = Field(default=60, description="RRF constant (typically 60)") |
| | final_top_k: int = Field(default=10, ge=1) |
| |
|
| | |
| | use_query_expansion: bool = Field(default=True) |
| | max_expanded_queries: int = Field(default=3, ge=1) |
| |
|
| | |
| | adapt_to_intent: bool = Field(default=True) |
| |
|
| |
|
| | class RetrievalResult(BaseModel): |
| | """Result from hybrid retrieval.""" |
| | chunk_id: str |
| | document_id: str |
| | text: str |
| | score: float |
| | dense_score: Optional[float] = None |
| | sparse_score: Optional[float] = None |
| | dense_rank: Optional[int] = None |
| | sparse_rank: Optional[int] = None |
| |
|
| | |
| | page: Optional[int] = None |
| | chunk_type: Optional[str] = None |
| | source_path: Optional[str] = None |
| | metadata: Dict[str, Any] = Field(default_factory=dict) |
| |
|
| | |
| | bbox: Optional[Dict[str, float]] = None |
| |
|
| |
|
| | class RetrieverAgent: |
| | """ |
| | Hybrid retrieval agent combining dense and sparse search. |
| | |
| | Capabilities: |
| | 1. Dense retrieval via embedding similarity |
| | 2. Sparse retrieval via BM25-style keyword matching |
| | 3. Reciprocal Rank Fusion for result combination |
| | 4. Query expansion from planner |
| | 5. Intent-aware retrieval adaptation |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | config: Optional[HybridSearchConfig] = None, |
| | vector_store: Optional[VectorStore] = None, |
| | embedding_adapter: Optional[EmbeddingAdapter] = None, |
| | ): |
| | """ |
| | Initialize Retriever Agent. |
| | |
| | Args: |
| | config: Hybrid search configuration |
| | vector_store: Vector store for dense retrieval |
| | embedding_adapter: Embedding adapter for query encoding |
| | """ |
| | self.config = config or HybridSearchConfig() |
| | self._store = vector_store |
| | self._embedder = embedding_adapter |
| |
|
| | |
| | self._k1 = 1.5 |
| | self._b = 0.75 |
| |
|
| | |
| | self._doc_stats: Optional[Dict[str, Any]] = None |
| |
|
| | logger.info("RetrieverAgent initialized with hybrid search") |
| |
|
| | @property |
| | def store(self) -> VectorStore: |
| | """Get vector store (lazy initialization).""" |
| | if self._store is None: |
| | self._store = get_vector_store() |
| | return self._store |
| |
|
| | @property |
| | def embedder(self) -> EmbeddingAdapter: |
| | """Get embedding adapter (lazy initialization).""" |
| | if self._embedder is None: |
| | self._embedder = get_embedding_adapter() |
| | return self._embedder |
| |
|
| | def retrieve( |
| | self, |
| | query: str, |
| | plan: Optional[QueryPlan] = None, |
| | top_k: Optional[int] = None, |
| | filters: Optional[Dict[str, Any]] = None, |
| | ) -> List[RetrievalResult]: |
| | """ |
| | Perform hybrid retrieval for a query. |
| | |
| | Args: |
| | query: Search query |
| | plan: Optional query plan for expansion and intent |
| | top_k: Number of results (overrides config) |
| | filters: Metadata filters |
| | |
| | Returns: |
| | List of retrieval results ranked by RRF score |
| | """ |
| | top_k = top_k or self.config.final_top_k |
| |
|
| | |
| | queries = self._get_queries(query, plan) |
| |
|
| | |
| | dense_weight, sparse_weight = self._adapt_weights(plan) |
| |
|
| | |
| | dense_results = self._dense_retrieve(queries, filters) |
| |
|
| | |
| | sparse_results = self._sparse_retrieve(queries, filters) |
| |
|
| | |
| | combined = self._reciprocal_rank_fusion( |
| | dense_results, |
| | sparse_results, |
| | dense_weight, |
| | sparse_weight, |
| | ) |
| |
|
| | |
| | results = sorted(combined.values(), key=lambda x: x.score, reverse=True) |
| | return results[:top_k] |
| |
|
| | def retrieve_for_subqueries( |
| | self, |
| | sub_queries: List[SubQuery], |
| | filters: Optional[Dict[str, Any]] = None, |
| | ) -> Dict[str, List[RetrievalResult]]: |
| | """ |
| | Retrieve for multiple sub-queries, respecting dependencies. |
| | |
| | Args: |
| | sub_queries: List of sub-queries from planner |
| | filters: Optional metadata filters |
| | |
| | Returns: |
| | Dict mapping sub-query ID to retrieval results |
| | """ |
| | results = {} |
| |
|
| | |
| | sorted_queries = self._topological_sort(sub_queries) |
| |
|
| | for sq in sorted_queries: |
| | |
| | sq_results = self.retrieve( |
| | sq.query, |
| | top_k=self.config.final_top_k, |
| | filters=filters, |
| | ) |
| | results[sq.id] = sq_results |
| |
|
| | return results |
| |
|
| | def _get_queries( |
| | self, |
| | query: str, |
| | plan: Optional[QueryPlan], |
| | ) -> List[str]: |
| | """Get list of queries to run (original + expanded).""" |
| | queries = [query] |
| |
|
| | if plan and self.config.use_query_expansion: |
| | |
| | for term in plan.expanded_terms[:self.config.max_expanded_queries]: |
| | |
| | expanded = f"{query} {term}" |
| | queries.append(expanded) |
| |
|
| | return queries |
| |
|
| | def _adapt_weights( |
| | self, |
| | plan: Optional[QueryPlan], |
| | ) -> Tuple[float, float]: |
| | """Adapt dense/sparse weights based on query intent.""" |
| | if not plan or not self.config.adapt_to_intent: |
| | return self.config.dense_weight, self.config.sparse_weight |
| |
|
| | intent = plan.intent |
| |
|
| | |
| | if intent == QueryIntent.FACTOID: |
| | return 0.6, 0.4 |
| |
|
| | |
| | if intent == QueryIntent.DEFINITION: |
| | return 0.8, 0.2 |
| |
|
| | |
| | if intent == QueryIntent.COMPARISON: |
| | return 0.5, 0.5 |
| |
|
| | |
| | if intent == QueryIntent.AGGREGATION: |
| | return 0.75, 0.25 |
| |
|
| | |
| | if intent == QueryIntent.LIST: |
| | return 0.5, 0.5 |
| |
|
| | return self.config.dense_weight, self.config.sparse_weight |
| |
|
| | def _dense_retrieve( |
| | self, |
| | queries: List[str], |
| | filters: Optional[Dict[str, Any]], |
| | ) -> Dict[str, Tuple[int, float]]: |
| | """ |
| | Perform dense (embedding) retrieval. |
| | |
| | Returns: |
| | Dict mapping chunk_id to (rank, score) |
| | """ |
| | all_results: Dict[str, List[Tuple[int, float, VectorSearchResult]]] = defaultdict(list) |
| |
|
| | for query in queries: |
| | |
| | query_embedding = self.embedder.embed_text(query) |
| |
|
| | |
| | results = self.store.search( |
| | query_embedding=query_embedding, |
| | top_k=self.config.dense_top_k, |
| | filters=filters, |
| | ) |
| |
|
| | |
| | for rank, result in enumerate(results, 1): |
| | all_results[result.chunk_id].append((rank, result.similarity, result)) |
| |
|
| | |
| | aggregated = {} |
| | for chunk_id, scores in all_results.items(): |
| | best_rank = min(s[0] for s in scores) |
| | best_score = max(s[1] for s in scores) |
| | aggregated[chunk_id] = (best_rank, best_score, scores[0][2]) |
| |
|
| | return aggregated |
| |
|
| | def _sparse_retrieve( |
| | self, |
| | queries: List[str], |
| | filters: Optional[Dict[str, Any]], |
| | ) -> Dict[str, Tuple[int, float]]: |
| | """ |
| | Perform sparse (BM25-style) retrieval. |
| | |
| | Returns: |
| | Dict mapping chunk_id to (rank, score) |
| | """ |
| | |
| | |
| | try: |
| | all_chunks = self._get_all_chunks(filters) |
| | except Exception as e: |
| | logger.warning(f"Sparse retrieval failed: {e}") |
| | return {} |
| |
|
| | if not all_chunks: |
| | return {} |
| |
|
| | |
| | if self._doc_stats is None: |
| | self._compute_doc_stats(all_chunks) |
| |
|
| | |
| | all_scores: Dict[str, List[float]] = defaultdict(list) |
| |
|
| | for query in queries: |
| | query_terms = self._tokenize(query) |
| | for chunk_id, text in all_chunks.items(): |
| | score = self._bm25_score(query_terms, text) |
| | all_scores[chunk_id].append(score) |
| |
|
| | |
| | aggregated = {} |
| | for chunk_id, scores in all_scores.items(): |
| | best_score = max(scores) |
| | aggregated[chunk_id] = best_score |
| |
|
| | |
| | ranked = sorted(aggregated.items(), key=lambda x: x[1], reverse=True) |
| | result = {} |
| | for rank, (chunk_id, score) in enumerate(ranked[:self.config.sparse_top_k], 1): |
| | result[chunk_id] = (rank, score, None) |
| |
|
| | return result |
| |
|
| | def _get_all_chunks( |
| | self, |
| | filters: Optional[Dict[str, Any]], |
| | ) -> Dict[str, str]: |
| | """Get all chunks for sparse retrieval.""" |
| | |
| | |
| |
|
| | |
| | query_embedding = self.embedder.embed_text("document content information") |
| | results = self.store.search( |
| | query_embedding=query_embedding, |
| | top_k=1000, |
| | filters=filters, |
| | ) |
| |
|
| | chunks = {} |
| | for result in results: |
| | chunks[result.chunk_id] = result.text |
| |
|
| | return chunks |
| |
|
| | def _compute_doc_stats(self, chunks: Dict[str, str]): |
| | """Compute document statistics for BM25.""" |
| | doc_lengths = [] |
| | df = defaultdict(int) |
| |
|
| | for text in chunks.values(): |
| | terms = self._tokenize(text) |
| | doc_lengths.append(len(terms)) |
| | for term in set(terms): |
| | df[term] += 1 |
| |
|
| | self._doc_stats = { |
| | "avg_dl": sum(doc_lengths) / len(doc_lengths) if doc_lengths else 1, |
| | "n_docs": len(chunks), |
| | "df": dict(df), |
| | } |
| |
|
| | def _tokenize(self, text: str) -> List[str]: |
| | """Simple tokenization.""" |
| | text = text.lower() |
| | text = re.sub(r'[^\w\s]', ' ', text) |
| | return text.split() |
| |
|
| | def _bm25_score(self, query_terms: List[str], doc_text: str) -> float: |
| | """Compute BM25 score.""" |
| | if not self._doc_stats: |
| | return 0.0 |
| |
|
| | doc_terms = self._tokenize(doc_text) |
| | dl = len(doc_terms) |
| | avg_dl = self._doc_stats["avg_dl"] |
| | n_docs = self._doc_stats["n_docs"] |
| | df = self._doc_stats["df"] |
| |
|
| | |
| | tf = defaultdict(int) |
| | for term in doc_terms: |
| | tf[term] += 1 |
| |
|
| | score = 0.0 |
| | for term in query_terms: |
| | if term not in tf: |
| | continue |
| |
|
| | |
| | doc_freq = df.get(term, 0) |
| | idf = math.log((n_docs - doc_freq + 0.5) / (doc_freq + 0.5) + 1) |
| |
|
| | |
| | term_freq = tf[term] |
| | tf_component = (term_freq * (self._k1 + 1)) / ( |
| | term_freq + self._k1 * (1 - self._b + self._b * dl / avg_dl) |
| | ) |
| |
|
| | score += idf * tf_component |
| |
|
| | return score |
| |
|
| | def _reciprocal_rank_fusion( |
| | self, |
| | dense_results: Dict[str, Tuple[int, float, Any]], |
| | sparse_results: Dict[str, Tuple[int, float, Any]], |
| | dense_weight: float, |
| | sparse_weight: float, |
| | ) -> Dict[str, RetrievalResult]: |
| | """ |
| | Combine dense and sparse results using RRF. |
| | |
| | RRF score = sum(1 / (k + rank)) for each ranking |
| | """ |
| | k = self.config.rrf_k |
| | combined = {} |
| |
|
| | |
| | all_chunk_ids = set(dense_results.keys()) | set(sparse_results.keys()) |
| |
|
| | for chunk_id in all_chunk_ids: |
| | dense_rank = dense_results.get(chunk_id, (1000, 0, None))[0] |
| | dense_score = dense_results.get(chunk_id, (1000, 0, None))[1] |
| | sparse_rank = sparse_results.get(chunk_id, (1000, 0, None))[0] |
| | sparse_score = sparse_results.get(chunk_id, (1000, 0, None))[1] |
| |
|
| | |
| | rrf_dense = dense_weight / (k + dense_rank) if chunk_id in dense_results else 0 |
| | rrf_sparse = sparse_weight / (k + sparse_rank) if chunk_id in sparse_results else 0 |
| | rrf_score = rrf_dense + rrf_sparse |
| |
|
| | |
| | metadata = {} |
| | page = None |
| | chunk_type = None |
| | source_path = None |
| | text = "" |
| | document_id = "" |
| | bbox = None |
| |
|
| | if chunk_id in dense_results: |
| | result_obj = dense_results[chunk_id][2] |
| | if result_obj: |
| | text = result_obj.text |
| | document_id = result_obj.document_id |
| | page = result_obj.page |
| | chunk_type = result_obj.chunk_type |
| | metadata = result_obj.metadata |
| | source_path = metadata.get("source_path", "") |
| | bbox = result_obj.bbox |
| |
|
| | combined[chunk_id] = RetrievalResult( |
| | chunk_id=chunk_id, |
| | document_id=document_id, |
| | text=text, |
| | score=rrf_score, |
| | dense_score=dense_score if chunk_id in dense_results else None, |
| | sparse_score=sparse_score if chunk_id in sparse_results else None, |
| | dense_rank=dense_rank if chunk_id in dense_results else None, |
| | sparse_rank=sparse_rank if chunk_id in sparse_results else None, |
| | page=page, |
| | chunk_type=chunk_type, |
| | source_path=source_path, |
| | metadata=metadata, |
| | bbox=bbox, |
| | ) |
| |
|
| | return combined |
| |
|
| | def _topological_sort(self, sub_queries: List[SubQuery]) -> List[SubQuery]: |
| | """Sort sub-queries by dependencies.""" |
| | |
| | sorted_queries = [] |
| | remaining = list(sub_queries) |
| | completed = set() |
| |
|
| | while remaining: |
| | for sq in remaining[:]: |
| | if all(dep in completed for dep in sq.depends_on): |
| | sorted_queries.append(sq) |
| | completed.add(sq.id) |
| | remaining.remove(sq) |
| | break |
| | else: |
| | |
| | sorted_queries.extend(remaining) |
| | break |
| |
|
| | return sorted_queries |
| |
|