Spaces:
Running
Running
| # DEPENDENCIES | |
| import time | |
| import faiss | |
| import numpy as np | |
| from typing import List | |
| from typing import Tuple | |
| from pathlib import Path | |
| from typing import Optional | |
| from config.settings import get_settings | |
| from config.logging_config import get_logger | |
| from utils.error_handler import handle_errors | |
| from utils.error_handler import IndexingError | |
| from vector_store.index_persister import get_index_persister | |
| # Setup Settings and Logging | |
| settings = get_settings() | |
| logger = get_logger(__name__) | |
| class FAISSManager: | |
| """ | |
| FAISS vector index management: Handles creation, optimization, and search operations for vector similarity search | |
| Supports multiple index types based on dataset size (Flat, IVF, HNSW) | |
| """ | |
| def __init__(self, vector_store_dir: Optional[Path] = None): | |
| """ | |
| Initialize FAISS manager | |
| Arguments: | |
| ---------- | |
| vector_store_dir { Path } : Directory for index storage | |
| """ | |
| self.logger = logger | |
| self.vector_store_dir = Path(vector_store_dir or settings.VECTOR_STORE_DIR) | |
| # FAISS configuration | |
| self.faiss_nprobe = settings.FAISS_NPROBE | |
| self.embedding_dim = settings.EMBEDDING_DIMENSION | |
| # Index components | |
| self.index = None | |
| self.chunk_ids = list() | |
| self.index_metadata = dict() | |
| # Initialize persister | |
| self.persister = get_index_persister(self.vector_store_dir) | |
| # Statistics | |
| self.search_count = 0 | |
| self.build_count = 0 | |
| self.logger.info(f"Initialized FAISSManager: dim={self.embedding_dim}, nprobe={self.faiss_nprobe}") | |
| def build_index(self, embeddings: np.ndarray, chunk_ids: List[str], rebuild: bool = False) -> dict: | |
| """ | |
| Build FAISS index from embeddings | |
| Arguments: | |
| ---------- | |
| embeddings { np.ndarray } : Embedding vectors (n x dim) | |
| chunk_ids { list } : Corresponding chunk IDs | |
| rebuild { bool } : Whether to rebuild existing index | |
| Returns: | |
| -------- | |
| { dict } : Build statistics | |
| """ | |
| if (embeddings.shape[0] != len(chunk_ids)): | |
| raise IndexingError(f"Embeddings count {embeddings.shape[0]} doesn't match chunk IDs count {len(chunk_ids)}") | |
| if (embeddings.shape[1] != self.embedding_dim): | |
| self.logger.warning(f"Embedding dimension {embeddings.shape[1]} doesn't match expected {self.embedding_dim}") | |
| if (self.index is not None) and (not rebuild): | |
| self.logger.info("Index already exists, use rebuild=True to rebuild") | |
| return self.get_index_stats() | |
| self.logger.info(f"Building FAISS index for {len(chunk_ids)} vectors") | |
| start_time = time.time() | |
| # Select appropriate index type based on dataset size | |
| index = self._create_optimal_index(embeddings.shape[0]) | |
| # Train index if needed | |
| if hasattr(index, 'train') and index.ntotal == 0: | |
| self.logger.info("Training FAISS index") | |
| index.train(embeddings) | |
| # Add vectors to index | |
| index.add(embeddings) | |
| # Configure search parameters | |
| if hasattr(index, 'nprobe'): | |
| index.nprobe = self.faiss_nprobe | |
| # Update instance state | |
| self.index = index | |
| self.chunk_ids = chunk_ids | |
| self.build_count += 1 | |
| build_time = time.time() - start_time | |
| # Save to disk | |
| metadata = {"build_time" : build_time, | |
| "vector_count" : len(chunk_ids), | |
| "embedding_dim" : embeddings.shape[1], | |
| "index_type" : type(index).__name__, | |
| } | |
| self.persister.save_faiss_index(index, chunk_ids, metadata) | |
| stats = {"vectors" : len(chunk_ids), | |
| "build_time_seconds": build_time, | |
| "index_type" : type(index).__name__, | |
| "embedding_dim" : embeddings.shape[1], | |
| "vectors_per_second": len(chunk_ids) / build_time if build_time > 0 else 0, | |
| } | |
| self.logger.info(f"FAISS index built: {len(chunk_ids)} vectors in {build_time:.2f}s") | |
| return stats | |
| def _create_optimal_index(self, vector_count: int) -> faiss.Index: | |
| """ | |
| Create optimal FAISS index type based on dataset size | |
| Arguments: | |
| ---------- | |
| vector_count { int } : Number of vectors to index | |
| Returns: | |
| -------- | |
| { faiss.Index } : FAISS index instance | |
| """ | |
| # Based on project document specifications: | |
| # - Small datasets (<100K): IndexFlatL2 (exact search) | |
| # - Medium datasets (100K-1M): IndexIVFFlat (balanced) | |
| # - Large datasets (>1M): IndexHNSW (approximate, fast) | |
| if (vector_count < 100000): | |
| # Exact search for small datasets | |
| index = faiss.IndexFlatL2(self.embedding_dim) | |
| self.logger.info(f"Using IndexFlatL2 for {vector_count} vectors (exact search)") | |
| elif (vector_count < 1000000): | |
| # Balanced approach for medium datasets | |
| nlist = min(4096, vector_count // 39) # FAISS recommendation | |
| quantizer = faiss.IndexFlatL2(self.embedding_dim) | |
| index = faiss.IndexIVFFlat(quantizer, self.embedding_dim, nlist) | |
| self.logger.info(f"Using IndexIVFFlat for {vector_count} vectors (nlist={nlist})") | |
| else: | |
| # Approximate search for large datasets | |
| M = 32 # Connections per node | |
| index = faiss.IndexHNSWFlat(self.embedding_dim, M) | |
| self.logger.info(f"Using IndexHNSW for {vector_count} vectors (M={M})") | |
| return index | |
| def add_to_index(self, embeddings: np.ndarray, chunk_ids: List[str]) -> dict: | |
| """ | |
| Add new vectors to existing index | |
| Arguments: | |
| ---------- | |
| embeddings { np.ndarray } : New embedding vectors | |
| chunk_ids { list } : New chunk IDs | |
| Returns: | |
| -------- | |
| { dict } : Add operation statistics | |
| """ | |
| if self.index is None: | |
| raise IndexingError("No index exists. Build index first.") | |
| if (embeddings.shape[0] != len(chunk_ids)): | |
| raise IndexingError(f"Embeddings count {embeddings.shape[0]} doesn't match chunk IDs count {len(chunk_ids)}") | |
| self.logger.info(f"Adding {len(chunk_ids)} vectors to FAISS index") | |
| start_time = time.time() | |
| # Add to index | |
| self.index.add(embeddings) | |
| # Update chunk IDs | |
| self.chunk_ids.extend(chunk_ids) | |
| add_time = time.time() - start_time | |
| # Save updated index | |
| metadata = {"added_vectors" : len(chunk_ids), | |
| "total_vectors" : len(self.chunk_ids), | |
| "add_time" : add_time, | |
| } | |
| self.persister.save_faiss_index(self.index, self.chunk_ids, metadata) | |
| stats = {"added" : len(chunk_ids), | |
| "new_total" : len(self.chunk_ids), | |
| "add_time_seconds" : add_time, | |
| "vectors_per_second" : len(chunk_ids) / add_time if add_time > 0 else 0, | |
| } | |
| self.logger.info(f"Added {len(chunk_ids)} vectors to FAISS index in {add_time:.2f}s") | |
| return stats | |
| def search(self, query_embedding: np.ndarray, top_k: int = 10) -> List[Tuple[str, float]]: | |
| """ | |
| Search for similar vectors | |
| Arguments: | |
| ---------- | |
| query_embedding { np.ndarray } : Query embedding vector | |
| top_k { int } : Number of results to return | |
| Returns: | |
| -------- | |
| { list } : List of (chunk_id, score) tuples | |
| """ | |
| if self.index is None: | |
| # Try to load from disk | |
| self._load_index_from_disk() | |
| if self.index is None: | |
| raise IndexingError("No FAISS index available for search") | |
| if (query_embedding.ndim != 1) or (query_embedding.shape[0] != self.embedding_dim): | |
| raise IndexingError(f"Query embedding must be 1D vector of dimension {self.embedding_dim}") | |
| # Reshape for FAISS (1 x dim) | |
| query_embedding = query_embedding.reshape(1, -1).astype('float32') | |
| # Search | |
| scores, indices = self.index.search(query_embedding, top_k) | |
| # Convert to results | |
| results = list() | |
| for i, (score, idx) in enumerate(zip(scores[0], indices[0])): | |
| if ((idx < len(self.chunk_ids)) and (idx >= 0)): | |
| chunk_id = self.chunk_ids[idx] | |
| # Convert L2 distance to similarity score (higher = more similar) | |
| similarity_score = 1.0 / (1.0 + score) if score > 0 else 1.0 | |
| results.append((chunk_id, float(similarity_score))) | |
| self.search_count += 1 | |
| self.logger.debug(f"FAISS search returned {len(results)} results") | |
| return results | |
| def _load_index_from_disk(self): | |
| """ | |
| Load index from disk if available | |
| """ | |
| try: | |
| index, chunk_ids, metadata = self.persister.load_faiss_index() | |
| if index is not None: | |
| self.index = index | |
| self.chunk_ids = chunk_ids | |
| self.index_metadata = metadata | |
| # Configure search parameters | |
| if hasattr(self.index, 'nprobe'): | |
| self.index.nprobe = self.faiss_nprobe | |
| self.logger.info(f"Loaded FAISS index from disk: {len(chunk_ids)} vectors") | |
| except Exception as e: | |
| self.logger.warning(f"Could not load FAISS index from disk: {e}") | |
| def is_index_built(self) -> bool: | |
| """ | |
| Check if index is built and ready | |
| Returns: | |
| -------- | |
| { bool } : True if index is built | |
| """ | |
| if self.index is not None: | |
| return True | |
| # Check if index exists on disk | |
| return self.persister.index_files_exist() | |
| def get_index_stats(self) -> dict: | |
| """ | |
| Get FAISS index statistics | |
| Returns: | |
| -------- | |
| { dict } : Index statistics | |
| """ | |
| if self.index is None: | |
| self._load_index_from_disk() | |
| if self.index is None: | |
| return {"built": False} | |
| stats = {"built" : True, | |
| "vector_count" : len(self.chunk_ids), | |
| "embedding_dim" : self.embedding_dim, | |
| "index_type" : type(self.index).__name__, | |
| "search_count" : self.search_count, | |
| "build_count" : self.build_count, | |
| "faiss_nprobe" : self.faiss_nprobe, | |
| } | |
| # Add index-specific stats | |
| if hasattr(self.index, 'ntotal'): | |
| stats["index_ntotal"] = self.index.ntotal | |
| if hasattr(self.index, 'nlist'): | |
| stats["index_nlist"] = self.index.nlist | |
| # Add metadata | |
| stats.update(self.index_metadata) | |
| return stats | |
| def optimize_index(self) -> dict: | |
| """ | |
| Optimize index for better performance | |
| Returns: | |
| -------- | |
| { dict } : Optimization results | |
| """ | |
| if self.index is None: | |
| return {"optimized": False, "message": "No index to optimize"} | |
| self.logger.info("Optimizing FAISS index") | |
| # For IVF indexes, we can adjust nprobe | |
| if hasattr(self.index, 'nprobe'): | |
| old_nprobe = self.index.nprobe | |
| new_nprobe = min(old_nprobe * 2, 50) # Increase but cap | |
| if new_nprobe != old_nprobe: | |
| self.index.nprobe = new_nprobe | |
| self.faiss_nprobe = new_nprobe | |
| self.logger.info(f"Adjusted nprobe: {old_nprobe} -> {new_nprobe}") | |
| return {"optimized" : True, | |
| "faiss_nprobe" : self.faiss_nprobe, | |
| "message" : "FAISS index optimization completed", | |
| } | |
| def clear_index(self): | |
| """ | |
| Clear the index | |
| """ | |
| self.index = None | |
| self.chunk_ids = [] | |
| self.index_metadata = {} | |
| self.search_count = 0 | |
| self.logger.info("FAISS index cleared") | |
| def get_index_size(self) -> dict: | |
| """ | |
| Get index size information | |
| Returns: | |
| -------- | |
| { dict } : Size information | |
| """ | |
| if self.index is None: | |
| return {"memory_mb": 0, "disk_mb": 0} | |
| # Estimate memory usage | |
| vector_count = len(self.chunk_ids) | |
| memory_bytes = vector_count * self.embedding_dim * 4 # float32 = 4 bytes | |
| # Get disk size from persister | |
| files_info = self.persister.get_index_files_info() | |
| faiss_files = {k: v for k, v in files_info.items() if 'faiss' in k} | |
| disk_size = sum(info.get('size_mb', 0) for info in faiss_files.values()) | |
| return {"memory_mb" : memory_bytes / (1024 * 1024), | |
| "disk_mb" : disk_size, | |
| "vector_count" : vector_count, | |
| "files" : faiss_files, | |
| } | |
| # Global FAISS manager instance | |
| _faiss_manager = None | |
| def get_faiss_manager(vector_store_dir: Optional[Path] = None) -> FAISSManager: | |
| """ | |
| Get global FAISS manager instance | |
| Arguments: | |
| ---------- | |
| vector_store_dir { Path } : Vector store directory | |
| Returns: | |
| -------- | |
| { FAISSManager } : FAISSManager instance | |
| """ | |
| global _faiss_manager | |
| if _faiss_manager is None: | |
| _faiss_manager = FAISSManager(vector_store_dir) | |
| return _faiss_manager | |
| def search_similar_vectors(query_embedding: np.ndarray, top_k: int = 10, **kwargs) -> List[Tuple[str, float]]: | |
| """ | |
| Convenience function for vector search | |
| Arguments: | |
| ---------- | |
| query_embedding { np.ndarray } : Query embedding | |
| top_k { int } : Number of results | |
| **kwargs : Additional arguments | |
| Returns: | |
| -------- | |
| { list } : Search results | |
| """ | |
| manager = get_faiss_manager() | |
| return manager.search(query_embedding, top_k, **kwargs) |