ymlin105's picture
chore: remove obsolete files and update project structure
6ad997d
from typing import List, Any
# Using community version to avoid 'BaseBlobParser' version conflict in langchain-chroma/core
from langchain_community.vectorstores import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
from src.config import (
REVIEW_HIGHLIGHTS_TXT,
CHROMA_DB_DIR,
EMBEDDING_MODEL,
RERANK_CANDIDATES_MAX,
RRF_K,
SMALL_TO_BIG_OVER_RETRIEVE_FACTOR,
)
from src.utils import setup_logger
from src.data.stores.metadata_store import metadata_store
from src.data.stores.online_books_store import online_books_store
logger = setup_logger(__name__)
class VectorDB:
"""
Hybrid Vector Database combining ChromaDB (Dense) and SQLite FTS5 (Sparse).
ENGINEERING IMPROVEMENT:
Transitioned from in-memory `rank_bm25` logic to a disk-based SQLite FTS5
architecture for keyword search. This allows for zero-RAM search indexing and
eliminates the need for dataset pruning.
Features:
- Zero-RAM Keyword Indexing (via FTS5).
- Hybrid RRF scoring (ChromaDB + FTS5).
- Persistence on disk for 221k+ items.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(VectorDB, cls).__new__(cls)
cls._instance.db = None
cls._instance.embeddings = None
return cls._instance
def __init__(self):
if self.db is None:
self._initialize_db()
def _initialize_db(self):
"""Initialize ChromaDB with local embeddings."""
try:
# Use local sentence-transformers model (no API calls)
logger.info(f"Loading embedding model: {EMBEDDING_MODEL}")
self.embeddings = HuggingFaceEmbeddings(
model_name=EMBEDDING_MODEL,
model_kwargs={"device": "cpu"},
encode_kwargs={"normalize_embeddings": True},
)
logger.info("Embedding model loaded successfully")
if CHROMA_DB_DIR.exists() and any(CHROMA_DB_DIR.iterdir()):
logger.info(f"Loading existing vector database from {CHROMA_DB_DIR}")
self.db = Chroma(
persist_directory=str(CHROMA_DB_DIR),
embedding_function=self.embeddings,
)
logger.info(
"Loaded %s documents from vector database",
self.db._collection.count(),
)
else:
error_msg = (
f"Vector Database not found at {CHROMA_DB_DIR}.\n"
"Please run the initialization script first to build the index:\n"
" python src/init_db.py"
)
logger.warning(error_msg)
self.db = None
except Exception as e:
logger.error("VectorDB init failed [%s]: %s", type(e).__name__, e)
# Graceful degradation: search() will return []
self.db = None
# 2. Initialize FTS5 (Sparse Retrieval)
self._init_fts5()
# 3. Initialize Temporal Data logic (Zero-RAM mode)
logger.info("VectorDB: Temporal Scoring will use SQLite metadata.")
def _init_fts5(self):
"""
Initialize FTS5 for Sparse (Keyword) Retrieval via SQLite.
"""
try:
conn = metadata_store.connection
if conn:
logger.info("VectorDB: FTS5 keyword search enabled via SQLite.")
self.fts_enabled = True
else:
logger.warning(
"VectorDB: SQLite connection not available. FTS5 disabled."
)
self.fts_enabled = False
except Exception as e:
logger.error("Failed to initialize FTS5: %s", e)
self.fts_enabled = False
def _sparse_fts_search(self, query: str, k: int = 5) -> List[Any]:
"""
Sparse retrieval: main FTS5 + online staging FTS5. No lock on main DB from writes.
"""
if not self.fts_enabled:
logger.warning("FTS5 not enabled, cannot perform sparse search.")
return []
class MockDoc:
def __init__(self, content, metadata):
self.page_content = content
self.metadata = metadata
def mk_doc(row: dict) -> MockDoc:
title = row.get("title", "") or ""
desc = row.get("description", "") or ""
return MockDoc(
f"{title} {desc}",
{
"isbn": row.get("isbn13", ""),
"title": title,
"authors": row.get("authors", ""),
"categories": row.get("simple_categories", ""),
},
)
results: List[Any] = []
try:
# 1. Main store (read-only, no contention)
conn = metadata_store.connection
if conn:
clean_query = query.strip().replace('"', '""')
if clean_query:
fts_query = f'"{clean_query}"'
cursor = conn.cursor()
cursor.execute(
"""
SELECT isbn13, title, description, authors, simple_categories
FROM books_fts WHERE books_fts MATCH ? ORDER BY rank LIMIT ?
""",
(fts_query, k),
)
for row in cursor.fetchall():
results.append(mk_doc(dict(row)))
# 2. Online staging store (separate DB)
for row in online_books_store.fts_search(query, k=k):
results.append(mk_doc(row))
logger.info(
"VectorDB: FTS5 keyword search found %d results.", len(results)
)
return results
except Exception as e:
logger.error("VectorDB: FTS5 keyword search failed: %s", e)
return []
def search(self, query: str, k: int = 5) -> List[Any]:
"""
Legacy semantic search.
"""
if not self.db:
return []
return self.db.similarity_search(query, k=k)
def get_book_details(self, isbn: str):
"""Get book metadata by ISBN from the centralized MetadataStore."""
return metadata_store.get_book_metadata(str(isbn))
def hybrid_search(
self,
query: str,
k: int = 5,
alpha: float = 0.5,
rerank: bool = False,
temporal: bool = False,
) -> List[Any]:
"""
Hybrid Search = Dense (Vector) + Sparse (FTS5) with Reciprocal Rank Fusion (RRF).
Optional: Cross-Encoder Reranking for high precision.
"""
if not self.db or not self.fts_enabled:
logger.warning("FTS5 or DB missing, falling back to simple search.")
return self.search(query, k)
# 1. Sparse Retrieval (FTS5)
# Get top K*2 candidates
sparse_results = self._sparse_fts_search(query, k=k * 2)
# Optimization: If alpha=1.0, return Sparse results directly (Skip Dense)
if alpha == 1.0:
return sparse_results[:k]
# 2. Dense Retrieval (Chroma)
dense_results = self.search(query, k=k * 2)
# 3. Reciprocal Rank Fusion: score = 1 / (RRF_K + rank)
fusion_scores = {}
# Helper to get ID (using ISBN as unique ID)
def get_id(doc):
# Try metadata, fallback to content parsing if needed
if "isbn" in doc.metadata and doc.metadata["isbn"]:
return str(doc.metadata["isbn"])
if "isbn13" in doc.metadata and doc.metadata["isbn13"]:
return str(doc.metadata["isbn13"])
# Fallback parsing
if "ISBN:" in doc.page_content:
return doc.page_content.split("ISBN:")[1].strip().split()[0]
# Fallback 2: Check first word (legacy format)
return doc.page_content.strip().split()[0]
# Fusion: Dense
for rank, doc in enumerate(dense_results):
doc_id = get_id(doc)
if doc_id not in fusion_scores:
fusion_scores[doc_id] = {"score": 0.0, "doc": doc}
fusion_scores[doc_id]["score"] += 1 / (rank + RRF_K)
# Fusion: Sparse
for rank, doc in enumerate(sparse_results):
doc_id = get_id(doc)
if doc_id not in fusion_scores:
fusion_scores[doc_id] = {"score": 0.0, "doc": doc}
fusion_scores[doc_id]["score"] += 1 / (rank + RRF_K)
# Sort by RRF score
sorted_docs = sorted(
fusion_scores.values(), key=lambda x: x["score"], reverse=True
)
# Keep all unique candidates for reranking
top_candidates = [item["doc"] for item in sorted_docs]
# 4. Reranking (Cross-Encoder)
final_results = top_candidates[:k]
if rerank:
from src.core.reranker import reranker
rerank_candidates = top_candidates[
: min(len(top_candidates), RERANK_CANDIDATES_MAX)
]
logger.info("Reranking top %d candidates...", len(rerank_candidates))
final_results = reranker.rerank(query, rerank_candidates, top_k=k)
# 5. Temporal Dynamics (Optional)
# Apply boost to 'final_results' (which now have scores from reranker)
if temporal:
from src.core.temporal import temporal_ranker
logger.info("Applying Temporal Decay...")
# Populate local year map for candidates to avoid repeated queries
candidate_years = {}
for doc in final_results:
isbn = get_id(doc)
if isbn:
rec = metadata_store.get_book_metadata(isbn)
year = temporal_ranker.parse_year(rec.get("publishedDate"))
if year > 0:
candidate_years[isbn] = year
final_results = temporal_ranker.apply_decay(final_results, candidate_years)
return final_results
def small_to_big_search(self, query: str, k: int = 5) -> List[Any]:
"""
Small-to-Big Retrieval (Parent-Child / Chunk-to-Parent Pattern).
ALGORITHM:
1. Search FINE-GRAINED chunks (e.g. review snippets) for high-precision
semantic match. User queries like "twist ending" match chunk content.
2. Map matched chunks back to PARENT books via parent_isbn metadata.
3. Return full book documents (title, description) for LLM context.
WHY: Book-level vectors often miss query-specific details (e.g. "unreliable
narrator"). Chunk-level search finds those, then we lift to book-level.
Ref: LlamaIndex Recursive Retrieval, RAPTOR (Sarthi et al., 2024).
"""
from langchain_community.vectorstores import Chroma
chunk_persist_dir = "data/chroma_chunks"
# Load chunk index (lazy)
if not hasattr(self, "chunk_db"):
try:
self.chunk_db = Chroma(
persist_directory=chunk_persist_dir,
embedding_function=self.embeddings,
collection_name="review_chunks",
)
logger.info("Loaded chunk index from %s", chunk_persist_dir)
except Exception as e:
logger.warning(
"Chunk index not available: %s. Falling back to hybrid search.", e
)
return self.hybrid_search(query, k=k, rerank=True)
# Step 1: Search chunks (Fine-grained). Over-retrieve to improve recall.
chunk_results = self.chunk_db.similarity_search(
query, k=k * SMALL_TO_BIG_OVER_RETRIEVE_FACTOR
)
logger.info("Small-to-Big: Found %d chunk matches", len(chunk_results))
# Step 2: Extract unique parent ISBNs
parent_isbns = []
seen = set()
for chunk in chunk_results:
isbn = chunk.metadata.get("parent_isbn")
if isbn and isbn not in seen:
parent_isbns.append(isbn)
seen.add(isbn)
logger.info("Small-to-Big: Mapped to %d unique books", len(parent_isbns))
# Step 3: Fetch full book context from parent index
from langchain_core.documents import Document
parent_docs = []
for isbn in parent_isbns[:k]:
rec = metadata_store.get_book_metadata(isbn)
if rec:
doc = Document(
page_content=(
f"Title: {rec.get('title', 'Unknown')}\n"
f"ISBN: {rec.get('isbn13', isbn)}\n"
f"Description: {rec.get('description', '')}"
),
metadata={
"isbn": rec.get("isbn13", isbn),
"title": rec.get("title"),
},
)
parent_docs.append(doc)
# Fallback: If BM25 didn't work, try similarity search with ISBN
if not parent_docs and self.db:
for isbn in parent_isbns[:k]:
results = self.db.similarity_search(isbn, k=1)
if results:
parent_docs.append(results[0])
# Final fallback: Return chunks with enriched context
if not parent_docs:
logger.warning(
"Parent lookup failed, returning chunks with ISBN context"
)
# Enrich chunks with their ISBN context
for chunk in chunk_results[:k]:
chunk.metadata["note"] = (
f"From review of ISBN: {chunk.metadata.get('parent_isbn')}"
)
return chunk_results[:k]
return parent_docs
def add_book(self, book_data: dict):
"""
Dynamically add a new book to the ChromaDB vector index.
Note: FTS5 incremental updates are handled separately via
metadata_store.insert_book_with_fts() called from RecommendationOrchestrator.add_new_book().
This method only handles the dense vector index.
Args:
book_data: Dict with isbn13, title, authors, description, etc.
"""
from langchain_core.documents import Document
isbn = str(book_data.get("isbn13"))
title = book_data.get("title", "")
author = book_data.get("authors", "")
description = book_data.get("description", "")
# Add to ChromaDB (dense vector index)
content = (
f"Title: {title}\nAuthor: {author}\nDescription: {description}\nISBN: {isbn}"
)
doc = Document(
page_content=content,
metadata={
"isbn": isbn,
"isbn13": isbn,
"title": title,
"authors": author,
"description": description,
},
)
if self.db:
self.db.add_documents([doc])
logger.info("Added book %s to ChromaDB", isbn)
__all__ = ["VectorDB"]