Spaces:
Sleeping
Sleeping
| from typing import List, Any | |
| # Using community version to avoid 'BaseBlobParser' version conflict in langchain-chroma/core | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from src.config import ( | |
| REVIEW_HIGHLIGHTS_TXT, | |
| CHROMA_DB_DIR, | |
| EMBEDDING_MODEL, | |
| RERANK_CANDIDATES_MAX, | |
| RRF_K, | |
| SMALL_TO_BIG_OVER_RETRIEVE_FACTOR, | |
| ) | |
| from src.utils import setup_logger | |
| from src.data.stores.metadata_store import metadata_store | |
| from src.data.stores.online_books_store import online_books_store | |
| logger = setup_logger(__name__) | |
| class VectorDB: | |
| """ | |
| Hybrid Vector Database combining ChromaDB (Dense) and SQLite FTS5 (Sparse). | |
| ENGINEERING IMPROVEMENT: | |
| Transitioned from in-memory `rank_bm25` logic to a disk-based SQLite FTS5 | |
| architecture for keyword search. This allows for zero-RAM search indexing and | |
| eliminates the need for dataset pruning. | |
| Features: | |
| - Zero-RAM Keyword Indexing (via FTS5). | |
| - Hybrid RRF scoring (ChromaDB + FTS5). | |
| - Persistence on disk for 221k+ items. | |
| """ | |
| _instance = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super(VectorDB, cls).__new__(cls) | |
| cls._instance.db = None | |
| cls._instance.embeddings = None | |
| return cls._instance | |
| def __init__(self): | |
| if self.db is None: | |
| self._initialize_db() | |
| def _initialize_db(self): | |
| """Initialize ChromaDB with local embeddings.""" | |
| try: | |
| # Use local sentence-transformers model (no API calls) | |
| logger.info(f"Loading embedding model: {EMBEDDING_MODEL}") | |
| self.embeddings = HuggingFaceEmbeddings( | |
| model_name=EMBEDDING_MODEL, | |
| model_kwargs={"device": "cpu"}, | |
| encode_kwargs={"normalize_embeddings": True}, | |
| ) | |
| logger.info("Embedding model loaded successfully") | |
| if CHROMA_DB_DIR.exists() and any(CHROMA_DB_DIR.iterdir()): | |
| logger.info(f"Loading existing vector database from {CHROMA_DB_DIR}") | |
| self.db = Chroma( | |
| persist_directory=str(CHROMA_DB_DIR), | |
| embedding_function=self.embeddings, | |
| ) | |
| logger.info( | |
| "Loaded %s documents from vector database", | |
| self.db._collection.count(), | |
| ) | |
| else: | |
| error_msg = ( | |
| f"Vector Database not found at {CHROMA_DB_DIR}.\n" | |
| "Please run the initialization script first to build the index:\n" | |
| " python src/init_db.py" | |
| ) | |
| logger.warning(error_msg) | |
| self.db = None | |
| except Exception as e: | |
| logger.error("VectorDB init failed [%s]: %s", type(e).__name__, e) | |
| # Graceful degradation: search() will return [] | |
| self.db = None | |
| # 2. Initialize FTS5 (Sparse Retrieval) | |
| self._init_fts5() | |
| # 3. Initialize Temporal Data logic (Zero-RAM mode) | |
| logger.info("VectorDB: Temporal Scoring will use SQLite metadata.") | |
| def _init_fts5(self): | |
| """ | |
| Initialize FTS5 for Sparse (Keyword) Retrieval via SQLite. | |
| """ | |
| try: | |
| conn = metadata_store.connection | |
| if conn: | |
| logger.info("VectorDB: FTS5 keyword search enabled via SQLite.") | |
| self.fts_enabled = True | |
| else: | |
| logger.warning( | |
| "VectorDB: SQLite connection not available. FTS5 disabled." | |
| ) | |
| self.fts_enabled = False | |
| except Exception as e: | |
| logger.error("Failed to initialize FTS5: %s", e) | |
| self.fts_enabled = False | |
| def _sparse_fts_search(self, query: str, k: int = 5) -> List[Any]: | |
| """ | |
| Sparse retrieval: main FTS5 + online staging FTS5. No lock on main DB from writes. | |
| """ | |
| if not self.fts_enabled: | |
| logger.warning("FTS5 not enabled, cannot perform sparse search.") | |
| return [] | |
| class MockDoc: | |
| def __init__(self, content, metadata): | |
| self.page_content = content | |
| self.metadata = metadata | |
| def mk_doc(row: dict) -> MockDoc: | |
| title = row.get("title", "") or "" | |
| desc = row.get("description", "") or "" | |
| return MockDoc( | |
| f"{title} {desc}", | |
| { | |
| "isbn": row.get("isbn13", ""), | |
| "title": title, | |
| "authors": row.get("authors", ""), | |
| "categories": row.get("simple_categories", ""), | |
| }, | |
| ) | |
| results: List[Any] = [] | |
| try: | |
| # 1. Main store (read-only, no contention) | |
| conn = metadata_store.connection | |
| if conn: | |
| clean_query = query.strip().replace('"', '""') | |
| if clean_query: | |
| fts_query = f'"{clean_query}"' | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| SELECT isbn13, title, description, authors, simple_categories | |
| FROM books_fts WHERE books_fts MATCH ? ORDER BY rank LIMIT ? | |
| """, | |
| (fts_query, k), | |
| ) | |
| for row in cursor.fetchall(): | |
| results.append(mk_doc(dict(row))) | |
| # 2. Online staging store (separate DB) | |
| for row in online_books_store.fts_search(query, k=k): | |
| results.append(mk_doc(row)) | |
| logger.info( | |
| "VectorDB: FTS5 keyword search found %d results.", len(results) | |
| ) | |
| return results | |
| except Exception as e: | |
| logger.error("VectorDB: FTS5 keyword search failed: %s", e) | |
| return [] | |
| def search(self, query: str, k: int = 5) -> List[Any]: | |
| """ | |
| Legacy semantic search. | |
| """ | |
| if not self.db: | |
| return [] | |
| return self.db.similarity_search(query, k=k) | |
| def get_book_details(self, isbn: str): | |
| """Get book metadata by ISBN from the centralized MetadataStore.""" | |
| return metadata_store.get_book_metadata(str(isbn)) | |
| def hybrid_search( | |
| self, | |
| query: str, | |
| k: int = 5, | |
| alpha: float = 0.5, | |
| rerank: bool = False, | |
| temporal: bool = False, | |
| ) -> List[Any]: | |
| """ | |
| Hybrid Search = Dense (Vector) + Sparse (FTS5) with Reciprocal Rank Fusion (RRF). | |
| Optional: Cross-Encoder Reranking for high precision. | |
| """ | |
| if not self.db or not self.fts_enabled: | |
| logger.warning("FTS5 or DB missing, falling back to simple search.") | |
| return self.search(query, k) | |
| # 1. Sparse Retrieval (FTS5) | |
| # Get top K*2 candidates | |
| sparse_results = self._sparse_fts_search(query, k=k * 2) | |
| # Optimization: If alpha=1.0, return Sparse results directly (Skip Dense) | |
| if alpha == 1.0: | |
| return sparse_results[:k] | |
| # 2. Dense Retrieval (Chroma) | |
| dense_results = self.search(query, k=k * 2) | |
| # 3. Reciprocal Rank Fusion: score = 1 / (RRF_K + rank) | |
| fusion_scores = {} | |
| # Helper to get ID (using ISBN as unique ID) | |
| def get_id(doc): | |
| # Try metadata, fallback to content parsing if needed | |
| if "isbn" in doc.metadata and doc.metadata["isbn"]: | |
| return str(doc.metadata["isbn"]) | |
| if "isbn13" in doc.metadata and doc.metadata["isbn13"]: | |
| return str(doc.metadata["isbn13"]) | |
| # Fallback parsing | |
| if "ISBN:" in doc.page_content: | |
| return doc.page_content.split("ISBN:")[1].strip().split()[0] | |
| # Fallback 2: Check first word (legacy format) | |
| return doc.page_content.strip().split()[0] | |
| # Fusion: Dense | |
| for rank, doc in enumerate(dense_results): | |
| doc_id = get_id(doc) | |
| if doc_id not in fusion_scores: | |
| fusion_scores[doc_id] = {"score": 0.0, "doc": doc} | |
| fusion_scores[doc_id]["score"] += 1 / (rank + RRF_K) | |
| # Fusion: Sparse | |
| for rank, doc in enumerate(sparse_results): | |
| doc_id = get_id(doc) | |
| if doc_id not in fusion_scores: | |
| fusion_scores[doc_id] = {"score": 0.0, "doc": doc} | |
| fusion_scores[doc_id]["score"] += 1 / (rank + RRF_K) | |
| # Sort by RRF score | |
| sorted_docs = sorted( | |
| fusion_scores.values(), key=lambda x: x["score"], reverse=True | |
| ) | |
| # Keep all unique candidates for reranking | |
| top_candidates = [item["doc"] for item in sorted_docs] | |
| # 4. Reranking (Cross-Encoder) | |
| final_results = top_candidates[:k] | |
| if rerank: | |
| from src.core.reranker import reranker | |
| rerank_candidates = top_candidates[ | |
| : min(len(top_candidates), RERANK_CANDIDATES_MAX) | |
| ] | |
| logger.info("Reranking top %d candidates...", len(rerank_candidates)) | |
| final_results = reranker.rerank(query, rerank_candidates, top_k=k) | |
| # 5. Temporal Dynamics (Optional) | |
| # Apply boost to 'final_results' (which now have scores from reranker) | |
| if temporal: | |
| from src.core.temporal import temporal_ranker | |
| logger.info("Applying Temporal Decay...") | |
| # Populate local year map for candidates to avoid repeated queries | |
| candidate_years = {} | |
| for doc in final_results: | |
| isbn = get_id(doc) | |
| if isbn: | |
| rec = metadata_store.get_book_metadata(isbn) | |
| year = temporal_ranker.parse_year(rec.get("publishedDate")) | |
| if year > 0: | |
| candidate_years[isbn] = year | |
| final_results = temporal_ranker.apply_decay(final_results, candidate_years) | |
| return final_results | |
| def small_to_big_search(self, query: str, k: int = 5) -> List[Any]: | |
| """ | |
| Small-to-Big Retrieval (Parent-Child / Chunk-to-Parent Pattern). | |
| ALGORITHM: | |
| 1. Search FINE-GRAINED chunks (e.g. review snippets) for high-precision | |
| semantic match. User queries like "twist ending" match chunk content. | |
| 2. Map matched chunks back to PARENT books via parent_isbn metadata. | |
| 3. Return full book documents (title, description) for LLM context. | |
| WHY: Book-level vectors often miss query-specific details (e.g. "unreliable | |
| narrator"). Chunk-level search finds those, then we lift to book-level. | |
| Ref: LlamaIndex Recursive Retrieval, RAPTOR (Sarthi et al., 2024). | |
| """ | |
| from langchain_community.vectorstores import Chroma | |
| chunk_persist_dir = "data/chroma_chunks" | |
| # Load chunk index (lazy) | |
| if not hasattr(self, "chunk_db"): | |
| try: | |
| self.chunk_db = Chroma( | |
| persist_directory=chunk_persist_dir, | |
| embedding_function=self.embeddings, | |
| collection_name="review_chunks", | |
| ) | |
| logger.info("Loaded chunk index from %s", chunk_persist_dir) | |
| except Exception as e: | |
| logger.warning( | |
| "Chunk index not available: %s. Falling back to hybrid search.", e | |
| ) | |
| return self.hybrid_search(query, k=k, rerank=True) | |
| # Step 1: Search chunks (Fine-grained). Over-retrieve to improve recall. | |
| chunk_results = self.chunk_db.similarity_search( | |
| query, k=k * SMALL_TO_BIG_OVER_RETRIEVE_FACTOR | |
| ) | |
| logger.info("Small-to-Big: Found %d chunk matches", len(chunk_results)) | |
| # Step 2: Extract unique parent ISBNs | |
| parent_isbns = [] | |
| seen = set() | |
| for chunk in chunk_results: | |
| isbn = chunk.metadata.get("parent_isbn") | |
| if isbn and isbn not in seen: | |
| parent_isbns.append(isbn) | |
| seen.add(isbn) | |
| logger.info("Small-to-Big: Mapped to %d unique books", len(parent_isbns)) | |
| # Step 3: Fetch full book context from parent index | |
| from langchain_core.documents import Document | |
| parent_docs = [] | |
| for isbn in parent_isbns[:k]: | |
| rec = metadata_store.get_book_metadata(isbn) | |
| if rec: | |
| doc = Document( | |
| page_content=( | |
| f"Title: {rec.get('title', 'Unknown')}\n" | |
| f"ISBN: {rec.get('isbn13', isbn)}\n" | |
| f"Description: {rec.get('description', '')}" | |
| ), | |
| metadata={ | |
| "isbn": rec.get("isbn13", isbn), | |
| "title": rec.get("title"), | |
| }, | |
| ) | |
| parent_docs.append(doc) | |
| # Fallback: If BM25 didn't work, try similarity search with ISBN | |
| if not parent_docs and self.db: | |
| for isbn in parent_isbns[:k]: | |
| results = self.db.similarity_search(isbn, k=1) | |
| if results: | |
| parent_docs.append(results[0]) | |
| # Final fallback: Return chunks with enriched context | |
| if not parent_docs: | |
| logger.warning( | |
| "Parent lookup failed, returning chunks with ISBN context" | |
| ) | |
| # Enrich chunks with their ISBN context | |
| for chunk in chunk_results[:k]: | |
| chunk.metadata["note"] = ( | |
| f"From review of ISBN: {chunk.metadata.get('parent_isbn')}" | |
| ) | |
| return chunk_results[:k] | |
| return parent_docs | |
| def add_book(self, book_data: dict): | |
| """ | |
| Dynamically add a new book to the ChromaDB vector index. | |
| Note: FTS5 incremental updates are handled separately via | |
| metadata_store.insert_book_with_fts() called from RecommendationOrchestrator.add_new_book(). | |
| This method only handles the dense vector index. | |
| Args: | |
| book_data: Dict with isbn13, title, authors, description, etc. | |
| """ | |
| from langchain_core.documents import Document | |
| isbn = str(book_data.get("isbn13")) | |
| title = book_data.get("title", "") | |
| author = book_data.get("authors", "") | |
| description = book_data.get("description", "") | |
| # Add to ChromaDB (dense vector index) | |
| content = ( | |
| f"Title: {title}\nAuthor: {author}\nDescription: {description}\nISBN: {isbn}" | |
| ) | |
| doc = Document( | |
| page_content=content, | |
| metadata={ | |
| "isbn": isbn, | |
| "isbn13": isbn, | |
| "title": title, | |
| "authors": author, | |
| "description": description, | |
| }, | |
| ) | |
| if self.db: | |
| self.db.add_documents([doc]) | |
| logger.info("Added book %s to ChromaDB", isbn) | |
| __all__ = ["VectorDB"] | |