import os import pickle import faiss import numpy as np from typing import List, Optional import logging import torch from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer, AutoModel def load_faiss_index(index_path: str): """ Loads a binary FAISS index from disk. """ if not os.path.exists(index_path): raise FileNotFoundError(f"FAISS index not found at {index_path}") return faiss.read_index(index_path) def normalize_embeddings(embeddings: np.ndarray) -> np.ndarray: """ Applies L2 normalization to embeddings. This converts Euclidean distance search into Cosine Similarity search. """ norms = np.linalg.norm(embeddings, axis=1, keepdims=True) return embeddings / norms def save_faiss_index(index: faiss.Index, index_path: str): """ Saves the FAISS binary index to disk. """ try: os.makedirs(os.path.dirname(index_path), exist_ok=True) faiss.write_index(index, index_path) logging.info(f"Successfully saved FAISS index to {index_path}") except Exception as e: logging.error(f"Failed to save FAISS index: {e}") raise def save_metadata(metadata: list, meta_path: str): """ Saves the document metadata list using pickle. """ try: os.makedirs(os.path.dirname(meta_path), exist_ok=True) with open(meta_path, 'wb') as f: pickle.dump(metadata, f) logging.info(f"Successfully saved metadata to {meta_path}") except Exception as e: logging.error(f"Failed to save metadata: {e}") raise def add_embeddings_to_index(index_path: str, embeddings: np.ndarray): """Optimized FAISS management with unit normalization for Cosine Similarity.""" embeddings = embeddings.astype('float32') # 1. Always normalize for 'Inner Product' to simulate Cosine Similarity faiss.normalize_L2(embeddings) dim = embeddings.shape[1] if os.path.exists(index_path): idx = faiss.read_index(index_path) if idx.d != dim: raise ValueError(f"Dimension mismatch: Index {idx.d} vs New {dim}") idx.add(embeddings) else: # 2. Use IndexFlatIP (Inner Product) + Normalization = Cosine Similarity idx = faiss.IndexFlatIP(dim) idx.add(embeddings) faiss.write_index(idx, index_path) def append_metadata(meta_path: str, new_meta: list) -> int: """ Efficiently appends to a pickle file using 'ab' (append binary) mode. This avoids loading the entire existing metadata list into memory. And returns the TOTAL count of chunks in the file. """ os.makedirs(os.path.dirname(meta_path), exist_ok=True) # 1. Perform the append with open(meta_path, "ab") as f: pickle.dump(new_meta, f, protocol=pickle.HIGHEST_PROTOCOL) # 2. Calculate the total size by reading the "stacked" objects total_count = 0 try: with open(meta_path, "rb") as f: while True: try: data = pickle.load(f) # If data is a list, add its length; if it's a single dict, add 1 total_count += len(data) if isinstance(data, list) else 1 except EOFError: break except Exception as e: logging.error(f"Error calculating metadata size: {e}") logging.info(f"Total metadata chunks after append: {total_count}") return total_count def load_metadata(path: str) -> list: """Loads all objects from an appended pickle file into a single flat list.""" all_data = [] if not os.path.exists(path): return [] with open(path, "rb") as f: while True: try: all_data.extend(pickle.load(f)) except EOFError: break return all_data def compute_embeddings( texts: List[str], model_name: str = "nomic-ai/nomic-embed-text-v1", batch_size: int = 32 ) -> np.ndarray: if not texts: return np.zeros((0, 0), dtype='float32') # Path 1: SentenceTransformer (highly optimized) if SentenceTransformer is not None: try: # use device_map or cuda if available device = "cuda" if torch.cuda.is_available() else "cpu" model = SentenceTransformer(model_name, device=device) # ST handles batching and progress internally return model.encode(texts, batch_size=batch_size, show_progress_bar=False).astype('float32') except Exception: pass # Path 2: HF Fallback with 'device_map' for memory safety if not all([AutoTokenizer, AutoModel, torch]): raise RuntimeError("Missing dependencies: torch, transformers") # Use 'auto' to shard large models across GPU/CPU automatically tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) model_hf = AutoModel.from_pretrained(model_name, trust_remote_code=True, device_map="auto") all_emb = [] for i in range(0, len(texts), batch_size): batch = texts[i : i + batch_size] toks = tokenizer(batch, return_tensors="pt", padding=True, truncation=True).to(model_hf.device) with torch.no_grad(): out = model_hf(**toks) # Faster Mean Pooling using built-in torch ops mask = toks["attention_mask"].unsqueeze(-1).expand(out.last_hidden_state.size()).float() summed = torch.sum(out.last_hidden_state * mask, 1) counts = torch.clamp(mask.sum(1), min=1e-9) emb_batch = (summed / counts).cpu().numpy() all_emb.append(emb_batch.astype('float32')) return np.vstack(all_emb)