# DEPENDENCIES import json import faiss import pickle from pathlib import Path from typing import Optional from datetime import datetime from config.settings import get_settings from utils.file_handler import FileHandler from config.logging_config import get_logger from utils.error_handler import handle_errors from utils.error_handler import IndexingError # Setup Settings and Logging settings = get_settings() logger = get_logger(__name__) class IndexPersister: """ Handles persistence of indexes to disk: Saves and loads FAISS indexes, BM25 indexes, and metadata """ def __init__(self, vector_store_dir: Optional[Path] = None): """ Initialize index persister Arguments: ---------- vector_store_dir { Path } : Directory for index storage """ self.logger = logger self.vector_store_dir = Path(vector_store_dir or settings.VECTOR_STORE_DIR) # Ensure directory exists FileHandler.ensure_directory(self.vector_store_dir) # File paths self.faiss_index_path = self.vector_store_dir / "faiss.index" self.faiss_metadata_path = self.vector_store_dir / "faiss_metadata.pkl" self.bm25_index_path = self.vector_store_dir / "bm25_index.pkl" self.metadata_db_path = self.vector_store_dir / "metadata.db" self.logger.info(f"Initialized IndexPersister: store_dir={self.vector_store_dir}") @handle_errors(error_type = IndexingError, log_error = True, reraise = True) def save_faiss_index(self, index: faiss.Index, chunk_ids: list, metadata: Optional[dict] = None) -> bool: """ Save FAISS index and metadata to disk Arguments: ---------- index { faiss.Index } : FAISS index object chunk_ids { list } : List of chunk IDs in order metadata { dict } : Additional metadata Returns: -------- { bool } : True if successful """ try: self.logger.info(f"Saving FAISS index with {len(chunk_ids)} chunks") # Save FAISS index faiss.write_index(index, str(self.faiss_index_path)) # Save metadata faiss_metadata = {"chunk_ids" : chunk_ids, "total_chunks" : len(chunk_ids), "timestamp" : self._get_timestamp(), "index_type" : type(index).__name__, } if metadata: faiss_metadata.update(metadata) with open(self.faiss_metadata_path, 'wb') as f: pickle.dump(faiss_metadata, f) self.logger.info(f"FAISS index saved: {self.faiss_index_path}") return True except Exception as e: self.logger.error(f"Failed to save FAISS index: {repr(e)}") raise IndexingError(f"FAISS index save failed: {repr(e)}") @handle_errors(error_type = IndexingError, log_error = True, reraise = True) def load_faiss_index(self) -> tuple[Optional[faiss.Index], list, dict]: """ Load FAISS index and metadata from disk Returns: -------- { tuple } : Tuple of (index, chunk_ids, metadata) """ if not self.faiss_index_path.exists(): self.logger.warning("FAISS index file not found") return None, [], {} try: self.logger.info("Loading FAISS index from disk") # Load FAISS index index = faiss.read_index(str(self.faiss_index_path)) # Load metadata chunk_ids = list() metadata = dict() if self.faiss_metadata_path.exists(): with open(self.faiss_metadata_path, 'rb') as f: loaded_metadata = pickle.load(f) chunk_ids = loaded_metadata.get("chunk_ids", []) metadata = loaded_metadata self.logger.info(f"Loaded FAISS index with {len(chunk_ids)} chunks") return index, chunk_ids, metadata except Exception as e: self.logger.error(f"Failed to load FAISS index: {repr(e)}") raise IndexingError(f"FAISS index load failed: {repr(e)}") @handle_errors(error_type = IndexingError, log_error = True, reraise = True) def save_bm25_index(self, bm25_index, chunk_ids: list, metadata: Optional[dict] = None) -> bool: """ Save BM25 index to disk Arguments: ---------- bm25_index : BM25 index object chunk_ids { list } : List of chunk IDs metadata { dict } : Additional metadata Returns: -------- { bool } : True if successful """ try: self.logger.info(f"Saving BM25 index with {len(chunk_ids)} chunks") bm25_data = {"index" : bm25_index, "chunk_ids" : chunk_ids, "timestamp" : self._get_timestamp(), "total_chunks": len(chunk_ids), } if metadata: bm25_data.update(metadata) with open(self.bm25_index_path, 'wb') as f: pickle.dump(bm25_data, f) self.logger.info(f"BM25 index saved: {self.bm25_index_path}") return True except Exception as e: self.logger.error(f"Failed to save BM25 index: {repr(e)}") raise IndexingError(f"BM25 index save failed: {repr(e)}") @handle_errors(error_type = IndexingError, log_error = True, reraise = True) def load_bm25_index(self) -> tuple[Optional[object], list, dict]: """ Load BM25 index from disk Returns: -------- { tuple } : Tuple of (index, chunk_ids, metadata) """ if not self.bm25_index_path.exists(): self.logger.warning("BM25 index file not found") return None, [], {} try: self.logger.info("Loading BM25 index from disk") with open(self.bm25_index_path, 'rb') as f: bm25_data = pickle.load(f) index = bm25_data.get("index") chunk_ids = bm25_data.get("chunk_ids", []) metadata = {k: v for k, v in bm25_data.items() if k not in ["index", "chunk_ids"]} self.logger.info(f"Loaded BM25 index with {len(chunk_ids)} chunks") return index, chunk_ids, metadata except Exception as e: self.logger.error(f"Failed to load BM25 index: {repr(e)}") raise IndexingError(f"BM25 index load failed: {repr(e)}") def save_index_metadata(self, metadata: dict, filename: str = "index_metadata.json") -> bool: """ Save general index metadata Arguments: ---------- metadata { dict } : Metadata dictionary filename { str } : Metadata filename Returns: -------- { bool } : True if successful """ try: metadata_path = self.vector_store_dir / filename # Add timestamp metadata["last_saved"] = self._get_timestamp() with open(metadata_path, 'w') as f: json.dump(obj = metadata, fp = f, indent = 4, ) self.logger.debug(f"Index metadata saved: {metadata_path}") return True except Exception as e: self.logger.error(f"Failed to save index metadata: {repr(e)}") return False def load_index_metadata(self, filename: str = "index_metadata.json") -> dict: """ Load general index metadata Arguments: ---------- filename { str } : Metadata filename Returns: -------- { dict } : Metadata dictionary """ metadata_path = self.vector_store_dir / filename if not metadata_path.exists(): return {} try: with open(metadata_path, 'r') as f: metadata = json.load(f) return metadata except Exception as e: self.logger.error(f"Failed to load index metadata: {repr(e)}") return {} def index_files_exist(self) -> bool: """ Check if index files exist on disk Returns: -------- { bool } : True if index files exist """ faiss_exists = self.faiss_index_path.exists() bm25_exists = self.bm25_index_path.exists() metadata_exists = self.faiss_metadata_path.exists() return faiss_exists and bm25_exists and metadata_exists def get_index_files_info(self) -> dict: """ Get information about index files Returns: -------- { dict } : File information """ files_info = dict() for file_path in [self.faiss_index_path, self.faiss_metadata_path, self.bm25_index_path]: if file_path.exists(): stat = file_path.stat() files_info[file_path.name] = {"size_bytes" : stat.st_size, "size_mb" : stat.st_size / (1024 * 1024), "modified_time" : stat.st_mtime, "exists" : True, } else: files_info[file_path.name] = {"exists": False} return files_info def cleanup_old_indexes(self, keep_latest: bool = True) -> dict: """ Clean up old index files Arguments: ---------- keep_latest { bool } : Whether to keep the latest indexes Returns: -------- { dict } : Cleanup results """ # This would be implemented for versioned indexes files_info = self.get_index_files_info() return {"cleaned_files": 0, "kept_files" : len([f for f in files_info.values() if f.get("exists")]), "files_info" : files_info, "message" : "Index cleanup completed (basic implementation)", } @staticmethod def _get_timestamp() -> str: """ Get current timestamp string Returns: -------- { str } : Timestamp string """ return datetime.now().isoformat() def get_persistence_stats(self) -> dict: """ Get persistence statistics Returns: -------- { dict } : Persistence statistics """ files_info = self.get_index_files_info() total_size = sum(info.get("size_mb", 0) for info in files_info.values()) file_count = sum(1 for info in files_info.values() if info.get("exists")) return {"total_size_mb" : total_size, "file_count" : file_count, "store_directory" : str(self.vector_store_dir), "files" : files_info, } # Global index persister instance _index_persister = None def get_index_persister(vector_store_dir: Optional[Path] = None) -> IndexPersister: """ Get global index persister instance Arguments: ---------- vector_store_dir { Path } : Vector store directory Returns: -------- { IndexPersister } : IndexPersister instance """ global _index_persister if _index_persister is None: _index_persister = IndexPersister(vector_store_dir) return _index_persister