"""Hybrid search implementation combining vector and sparse retrieval.""" import os import pickle from pathlib import Path from typing import List, Dict, Any, Tuple import numpy as np from langchain_qdrant import QdrantVectorStore from langchain_community.retrievers import BM25Retriever try: from langchain.docstore.document import Document except: from langchain_core.documents import Document from .filter import create_filter class HybridRetriever: """ Hybrid retrieval system combining vector search (dense) and BM25 (sparse) search. Supports configurable search modes: vector_only, sparse_only, or hybrid. """ def __init__(self, config: Dict[str, Any]): """ Initialize hybrid retriever. Args: config: Configuration dictionary with hybrid search settings """ self.config = config self.bm25_retriever = None self.documents = [] self._bm25_cache_file = None def _get_bm25_cache_path(self) -> str: """Get path for BM25 cache file.""" cache_dir = Path("cache/bm25") cache_dir.mkdir(parents=True, exist_ok=True) return str(cache_dir / "bm25_retriever.pkl") def initialize_bm25(self, documents: List[Document], force_rebuild: bool = False) -> None: """ Initialize BM25 retriever with documents. Args: documents: List of Document objects to index force_rebuild: Whether to force rebuilding the BM25 index """ self.documents = documents self._bm25_cache_file = self._get_bm25_cache_path() # Try to load cached BM25 retriever if not force_rebuild and os.path.exists(self._bm25_cache_file): try: print("Loading cached BM25 retriever...") with open(self._bm25_cache_file, 'rb') as f: self.bm25_retriever = pickle.load(f) print(f"✅ Loaded cached BM25 retriever with {len(self.documents)} documents") return except Exception as e: print(f"⚠️ Failed to load cached BM25 retriever: {e}") print("Building new BM25 index...") # Build new BM25 retriever print("Building BM25 index...") try: # Use langchain's BM25Retriever self.bm25_retriever = BM25Retriever.from_documents(documents) # Configure BM25 parameters bm25_config = self.config.get("bm25", {}) k = bm25_config.get("top_k", 20) self.bm25_retriever.k = k # Cache the BM25 retriever with open(self._bm25_cache_file, 'wb') as f: pickle.dump(self.bm25_retriever, f) print(f"✅ Built and cached BM25 retriever with {len(documents)} documents") except Exception as e: print(f"❌ Failed to build BM25 retriever: {e}") print("BM25 search will be disabled") self.bm25_retriever = None def _filter_documents_by_metadata( self, documents: List[Document], reports: List[str] = None, sources: str = None, subtype: List[str] = None, year: List[str] = None ) -> List[Document]: """ Filter documents by metadata criteria. Args: documents: List of documents to filter reports: List of specific report filenames sources: Source category subtype: List of subtypes year: List of years Returns: Filtered list of documents """ if not any([reports, sources, subtype, year]): return documents filtered_docs = [] for doc in documents: metadata = doc.metadata # Filter by reports if reports: filename = metadata.get('filename', '') if not any(report in filename for report in reports): continue # Filter by sources if sources: doc_source = metadata.get('source', '') if sources != doc_source: continue # Filter by subtype if subtype: doc_subtype = metadata.get('subtype', '') if doc_subtype not in subtype: continue # Filter by year if year: doc_year = str(metadata.get('year', '')) if doc_year not in year: continue filtered_docs.append(doc) return filtered_docs def _bm25_search( self, query: str, k: int = 20, reports: List[str] = None, sources: str = None, subtype: List[str] = None, year: List[str] = None ) -> List[Tuple[Document, float]]: """ Perform BM25 sparse search. Args: query: Search query k: Number of documents to retrieve reports: List of specific report filenames sources: Source category subtype: List of subtypes year: List of years Returns: List of (Document, score) tuples """ if not self.bm25_retriever: print("⚠️ BM25 retriever not available") return [] try: # Get BM25 results self.bm25_retriever.k = k bm25_docs = self.bm25_retriever.invoke(query) # Apply metadata filtering if any([reports, sources, subtype, year]): bm25_docs = self._filter_documents_by_metadata( bm25_docs, reports, sources, subtype, year ) # BM25Retriever doesn't return scores directly, so we'll use placeholder scores # In a production system, you'd want to access the actual BM25 scores results = [] for i, doc in enumerate(bm25_docs): # Assign decreasing scores based on rank (higher rank = higher score) # Normalize to [0, 1] range for consistency with vector search score = max(0.1, 1.0 - (i / max(len(bm25_docs), 1))) results.append((doc, score)) return results except Exception as e: print(f"❌ BM25 search failed: {e}") return [] def _vector_search( self, vectorstore: QdrantVectorStore, query: str, k: int = 20, reports: List[str] = None, sources: str = None, subtype: List[str] = None, year: List[str] = None ) -> List[Tuple[Document, float]]: """ Perform vector similarity search. Args: vectorstore: QdrantVectorStore instance query: Search query k: Number of documents to retrieve reports: List of specific report filenames sources: Source category subtype: List of subtypes year: List of years Returns: List of (Document, score) tuples """ try: # Create filter filter_obj = create_filter( reports=reports, sources=sources, subtype=subtype, year=year ) # Perform vector search if filter_obj: results = vectorstore.similarity_search_with_score( query, k=k, filter=filter_obj ) else: results = vectorstore.similarity_search_with_score(query, k=k) return results except Exception as e: print(f"❌ Vector search failed: {e}") return [] def _normalize_scores(self, results: List[Tuple[Document, float]], method: str = "min_max") -> List[Tuple[Document, float]]: """ Normalize scores to [0, 1] range. Args: results: List of (Document, score) tuples method: Normalization method ('min_max' or 'z_score') Returns: List of (Document, normalized_score) tuples """ if not results: return results scores = [score for _, score in results] if method == "min_max": min_score = min(scores) max_score = max(scores) if max_score == min_score: normalized_results = [(doc, 1.0) for doc, _ in results] else: normalized_results = [ (doc, (score - min_score) / (max_score - min_score)) for doc, score in results ] elif method == "z_score": mean_score = np.mean(scores) std_score = np.std(scores) if std_score == 0: normalized_results = [(doc, 1.0) for doc, _ in results] else: normalized_results = [ (doc, max(0, (score - mean_score) / std_score)) for doc, score in results ] else: normalized_results = results return normalized_results def _combine_results( self, vector_results: List[Tuple[Document, float]], bm25_results: List[Tuple[Document, float]], alpha: float = 0.5 ) -> List[Tuple[Document, float]]: """ Combine vector and BM25 results with weighted scoring. Args: vector_results: Vector search results bm25_results: BM25 search results alpha: Weight for vector scores (1-alpha for BM25 scores) Returns: Combined and ranked results """ # Normalize scores vector_results = self._normalize_scores(vector_results) bm25_results = self._normalize_scores(bm25_results) # Create document ID mapping for both result sets vector_docs = {id(doc): (doc, score) for doc, score in vector_results} bm25_docs = {id(doc): (doc, score) for doc, score in bm25_results} # Combine scores combined_scores = {} all_doc_ids = set(vector_docs.keys()) | set(bm25_docs.keys()) for doc_id in all_doc_ids: vector_score = vector_docs.get(doc_id, (None, 0.0))[1] bm25_score = bm25_docs.get(doc_id, (None, 0.0))[1] # Weighted combination combined_score = alpha * vector_score + (1 - alpha) * bm25_score # Get document object doc = vector_docs.get(doc_id, bm25_docs.get(doc_id))[0] combined_scores[doc_id] = (doc, combined_score) # Sort by combined score (descending) sorted_results = sorted( combined_scores.values(), key=lambda x: x[1], reverse=True ) return sorted_results def retrieve( self, vectorstore: QdrantVectorStore, query: str, mode: str = "hybrid", reports: List[str] = None, sources: str = None, subtype: List[str] = None, year: List[str] = None, alpha: float = 0.5, k: int = None ) -> List[Document]: """ Retrieve documents using the specified search mode. Args: vectorstore: QdrantVectorStore instance query: Search query mode: Search mode ('vector_only', 'sparse_only', or 'hybrid') reports: List of specific report filenames sources: Source category subtype: List of subtypes year: List of years alpha: Weight for vector scores in hybrid mode (0.5 = equal weight) k: Number of documents to retrieve Returns: List of relevant Document objects """ if k is None: k = self.config.get("retriever", {}).get("top_k", 20) results = [] if mode == "vector_only": # Vector search only vector_results = self._vector_search( vectorstore, query, k, reports, sources, subtype, year ) results = [(doc, score) for doc, score in vector_results] elif mode == "sparse_only": # BM25 search only bm25_results = self._bm25_search( query, k, reports, sources, subtype, year ) results = [(doc, score) for doc, score in bm25_results] elif mode == "hybrid": # Hybrid search - combine both # Get more results from each method to have better fusion retrieval_k = min(k * 2, 50) # Get more candidates for fusion vector_results = self._vector_search( vectorstore, query, retrieval_k, reports, sources, subtype, year ) bm25_results = self._bm25_search( query, retrieval_k, reports, sources, subtype, year ) results = self._combine_results(vector_results, bm25_results, alpha) else: raise ValueError(f"Unknown search mode: {mode}") # Limit to top k results results = results[:k] # Return just the documents return [doc for doc, score in results] def retrieve_with_scores( self, vectorstore: QdrantVectorStore, query: str, mode: str = "hybrid", reports: List[str] = None, sources: str = None, subtype: List[str] = None, year: List[str] = None, alpha: float = 0.5, k: int = None ) -> List[Tuple[Document, float]]: """ Retrieve documents with scores using the specified search mode. Args: vectorstore: QdrantVectorStore instance query: Search query mode: Search mode ('vector_only', 'sparse_only', or 'hybrid') reports: List of specific report filenames sources: Source category subtype: List of subtypes year: List of years alpha: Weight for vector scores in hybrid mode (0.5 = equal weight) k: Number of documents to retrieve Returns: List of (Document, score) tuples """ if k is None: k = self.config.get("retriever", {}).get("top_k", 20) results = [] if mode == "vector_only": # Vector search only results = self._vector_search( vectorstore, query, k, reports, sources, subtype, year ) elif mode == "sparse_only": # BM25 search only results = self._bm25_search( query, k, reports, sources, subtype, year ) elif mode == "hybrid": # Hybrid search - combine both # Get more results from each method to have better fusion retrieval_k = min(k * 2, 50) # Get more candidates for fusion vector_results = self._vector_search( vectorstore, query, retrieval_k, reports, sources, subtype, year ) bm25_results = self._bm25_search( query, retrieval_k, reports, sources, subtype, year ) results = self._combine_results(vector_results, bm25_results, alpha) else: raise ValueError(f"Unknown search mode: {mode}") # Limit to top k results return results[:k] def get_available_search_modes() -> List[str]: """Get list of available search modes.""" return ["vector_only", "sparse_only", "hybrid"] def get_search_mode_description() -> Dict[str, str]: """Get descriptions for each search mode.""" return { "vector_only": "Semantic search using dense embeddings - good for conceptual matching", "sparse_only": "Keyword search using BM25 - good for exact term matching", "hybrid": "Combined semantic and keyword search - balanced approach" }