QuerySphere / vector_store /index_builder.py
satyakimitra's picture
first commit
0a4529c
# DEPENDENCIES
import time
import faiss
import numpy as np
from typing import List
from pathlib import Path
from typing import Optional
from config.models import DocumentChunk
from config.settings import get_settings
from config.logging_config import get_logger
from utils.error_handler import handle_errors
from utils.error_handler import IndexingError
from vector_store.bm25_index import BM25Index
from vector_store.faiss_manager import FAISSManager
from vector_store.metadata_store import MetadataStore
# Setup Settings and Logging
settings = get_settings()
logger = get_logger(__name__)
class IndexBuilder:
"""
Main index builder orchestrator: Builds and manages both vector and keyword indexes
Coordinates FAISS vector index, BM25 keyword index, and metadata storage
"""
def __init__(self, vector_store_dir: Optional[Path] = None):
"""
Initialize index builder
Arguments:
----------
vector_store_dir { Path } : Directory for index storage
"""
self.logger = logger
self.vector_store_dir = Path(vector_store_dir or settings.VECTOR_STORE_DIR)
# Initialize component managers
self.faiss_manager = FAISSManager(vector_store_dir = self.vector_store_dir)
self.bm25_index = BM25Index()
self.metadata_store = MetadataStore()
# Index statistics
self.total_chunks_indexed = 0
self.last_build_time = None
self.logger.info(f"Initialized IndexBuilder: store_dir={self.vector_store_dir}")
@handle_errors(error_type = IndexingError, log_error = True, reraise = True)
def build_indexes(self, chunks: List[DocumentChunk], rebuild: bool = False) -> dict:
"""
Build both vector and keyword indexes from document chunks - FIXED VERSION
Arguments:
----------
chunks { list } : List of DocumentChunk objects with embeddings
rebuild { bool } : Whether to rebuild existing indexes
Returns:
--------
{ dict } : Build statistics
"""
if not chunks:
raise IndexingError("No chunks provided for indexing")
# Validate chunks have embeddings
chunks_with_embeddings = [c for c in chunks if (c.embedding is not None)]
if (len(chunks_with_embeddings) != len(chunks)):
self.logger.warning(f"{len(chunks) - len(chunks_with_embeddings)} chunks missing embeddings")
if not chunks_with_embeddings:
raise IndexingError("No chunks with embeddings found")
self.logger.info(f"Building indexes for {len(chunks_with_embeddings)} chunks (rebuild={rebuild})")
start_time = time.time()
# Extract data for indexing
embeddings = self._extract_embeddings(chunks = chunks_with_embeddings)
texts = [chunk.text for chunk in chunks_with_embeddings]
chunk_ids = [chunk.chunk_id for chunk in chunks_with_embeddings]
# Build vector index (FAISS)
self.logger.info("Building FAISS vector index...")
faiss_stats = self.faiss_manager.build_index(embeddings = embeddings,
chunk_ids = chunk_ids,
rebuild = rebuild,
)
# Build keyword index (BM25)
self.logger.info("Building BM25 keyword index...")
bm25_stats = self.bm25_index.build_index(texts = texts,
chunk_ids = chunk_ids,
rebuild = rebuild,
)
# Store metadata
self.logger.info("Storing chunk metadata...")
metadata_stats = self.metadata_store.store_chunks(chunks = chunks_with_embeddings,
rebuild = rebuild,
)
# Update statistics
self.total_chunks_indexed += len(chunks_with_embeddings)
self.last_build_time = time.time()
build_time = time.time() - start_time
stats = {"total_chunks" : len(chunks_with_embeddings),
"build_time_seconds" : build_time,
"chunks_per_second" : len(chunks_with_embeddings) / build_time if build_time > 0 else 0,
"faiss" : faiss_stats,
"bm25" : bm25_stats,
"metadata" : metadata_stats,
"vector_dimension" : embeddings.shape[1] if (len(embeddings) > 0) else 0,
}
self.logger.info(f"Index building completed: {len(chunks_with_embeddings)} chunks in {build_time:.2f}s")
self.logger.info(f"FAISS index: {faiss_stats.get('vectors', 0)} vectors")
self.logger.info(f"BM25 index: {bm25_stats.get('documents', 0)} documents")
self.logger.info(f"Metadata: {metadata_stats.get('stored_chunks', 0)} chunks stored")
return stats
def _extract_embeddings(self, chunks: List[DocumentChunk]) -> np.ndarray:
"""
Extract embeddings from chunks as numpy array
Arguments:
----------
chunks { list } : List of DocumentChunk objects
Returns:
--------
{ np.ndarray } : Embeddings matrix
"""
embeddings = list()
for chunk in chunks:
if (chunk.embedding is not None):
embeddings.append(chunk.embedding)
if not embeddings:
raise IndexingError("No embeddings found in chunks")
return np.array(embeddings).astype('float32')
def get_index_stats(self) -> dict:
"""
Get comprehensive index statistics
Returns:
--------
{ dict } : Index statistics
"""
faiss_stats = self.faiss_manager.get_index_stats()
bm25_stats = self.bm25_index.get_index_stats()
metadata_stats = self.metadata_store.get_stats()
# Also check VectorSearch stats
try:
vector_search = get_vector_search()
vector_stats = vector_search.get_index_stats()
except Exception as e:
vector_stats = {"error": str(e)}
stats = {"total_chunks_indexed" : self.total_chunks_indexed,
"last_build_time" : self.last_build_time,
"faiss" : faiss_stats,
"bm25" : bm25_stats,
"metadata" : metadata_stats,
"index_directory" : str(self.vector_store_dir),
}
return stats
def is_index_built(self) -> bool:
"""
Check if indexes are built and ready
Returns:
--------
{ bool } : True if indexes are built
"""
faiss_ready = self.faiss_manager.is_index_built()
bm25_ready = self.bm25_index.is_index_built()
metadata_ready = self.metadata_store.is_ready()
return faiss_ready and bm25_ready and metadata_ready
def optimize_indexes(self) -> dict:
"""
Optimize indexes for better performance
Returns:
--------
{ dict } : Optimization results
"""
self.logger.info("Optimizing indexes")
faiss_optimization = self.faiss_manager.optimize_index()
bm25_optimization = self.bm25_index.optimize_index()
optimization_stats = {"faiss" : faiss_optimization,
"bm25" : bm25_optimization,
"message" : "Index optimization completed",
}
return optimization_stats
def clear_indexes(self):
"""
Clear all indexes
"""
self.logger.warning("Clearing all indexes")
self.faiss_manager.clear_index()
self.bm25_index.clear_index()
self.metadata_store.clear()
self.total_chunks_indexed = 0
def get_index_size(self) -> dict:
"""
Get index sizes in memory and disk
Returns:
--------
{ dict } : Size information
"""
faiss_size = self.faiss_manager.get_index_size()
bm25_size = self.bm25_index.get_index_size()
metadata_size = self.metadata_store.get_size()
total_memory = (faiss_size.get("memory_mb", 0) + bm25_size.get("memory_mb", 0) + metadata_size.get("memory_mb", 0))
total_disk = (faiss_size.get("disk_mb", 0) + bm25_size.get("disk_mb", 0) + metadata_size.get("disk_mb", 0))
return {"total_memory_mb" : total_memory,
"total_disk_mb" : total_disk,
"faiss" : faiss_size,
"bm25" : bm25_size,
"metadata" : metadata_size,
}
# Global index builder instance
_index_builder = None
def get_index_builder(vector_store_dir: Optional[Path] = None) -> IndexBuilder:
"""
Get global index builder instance
Arguments:
----------
vector_store_dir { Path } : Vector store directory
Returns:
--------
{ IndexBuilder } : IndexBuilder instance
"""
global _index_builder
if _index_builder is None:
_index_builder = IndexBuilder(vector_store_dir)
return _index_builder
def build_indexes(chunks: List[DocumentChunk], **kwargs) -> dict:
"""
Convenience function to build indexes
Arguments:
----------
chunks { list } : List of DocumentChunk objects
**kwargs : Additional arguments
Returns:
--------
{ dict } : Build statistics
"""
builder = get_index_builder()
return builder.build_indexes(chunks, **kwargs)