Spaces:
Sleeping
Sleeping
| import os | |
| import pickle | |
| import faiss | |
| import numpy as np | |
| from typing import List, Optional | |
| import logging | |
| import torch | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import AutoTokenizer, AutoModel | |
| def load_faiss_index(index_path: str): | |
| """ | |
| Loads a binary FAISS index from disk. | |
| """ | |
| if not os.path.exists(index_path): | |
| raise FileNotFoundError(f"FAISS index not found at {index_path}") | |
| return faiss.read_index(index_path) | |
| def normalize_embeddings(embeddings: np.ndarray) -> np.ndarray: | |
| """ | |
| Applies L2 normalization to embeddings. | |
| This converts Euclidean distance search into Cosine Similarity search. | |
| """ | |
| norms = np.linalg.norm(embeddings, axis=1, keepdims=True) | |
| return embeddings / norms | |
| def save_faiss_index(index: faiss.Index, index_path: str): | |
| """ | |
| Saves the FAISS binary index to disk. | |
| """ | |
| try: | |
| os.makedirs(os.path.dirname(index_path), exist_ok=True) | |
| faiss.write_index(index, index_path) | |
| logging.info(f"Successfully saved FAISS index to {index_path}") | |
| except Exception as e: | |
| logging.error(f"Failed to save FAISS index: {e}") | |
| raise | |
| def save_metadata(metadata: list, meta_path: str): | |
| """ | |
| Saves the document metadata list using pickle. | |
| """ | |
| try: | |
| os.makedirs(os.path.dirname(meta_path), exist_ok=True) | |
| with open(meta_path, 'wb') as f: | |
| pickle.dump(metadata, f) | |
| logging.info(f"Successfully saved metadata to {meta_path}") | |
| except Exception as e: | |
| logging.error(f"Failed to save metadata: {e}") | |
| raise | |
| def add_embeddings_to_index(index_path: str, embeddings: np.ndarray): | |
| """Optimized FAISS management with unit normalization for Cosine Similarity.""" | |
| embeddings = embeddings.astype('float32') | |
| # 1. Always normalize for 'Inner Product' to simulate Cosine Similarity | |
| faiss.normalize_L2(embeddings) | |
| dim = embeddings.shape[1] | |
| if os.path.exists(index_path): | |
| idx = faiss.read_index(index_path) | |
| if idx.d != dim: | |
| raise ValueError(f"Dimension mismatch: Index {idx.d} vs New {dim}") | |
| idx.add(embeddings) | |
| else: | |
| # 2. Use IndexFlatIP (Inner Product) + Normalization = Cosine Similarity | |
| idx = faiss.IndexFlatIP(dim) | |
| idx.add(embeddings) | |
| faiss.write_index(idx, index_path) | |
| def append_metadata(meta_path: str, new_meta: list) -> int: | |
| """ | |
| Efficiently appends to a pickle file using 'ab' (append binary) mode. | |
| This avoids loading the entire existing metadata list into memory. | |
| And returns the TOTAL count of chunks in the file. | |
| """ | |
| os.makedirs(os.path.dirname(meta_path), exist_ok=True) | |
| # 1. Perform the append | |
| with open(meta_path, "ab") as f: | |
| pickle.dump(new_meta, f, protocol=pickle.HIGHEST_PROTOCOL) | |
| # 2. Calculate the total size by reading the "stacked" objects | |
| total_count = 0 | |
| try: | |
| with open(meta_path, "rb") as f: | |
| while True: | |
| try: | |
| data = pickle.load(f) | |
| # If data is a list, add its length; if it's a single dict, add 1 | |
| total_count += len(data) if isinstance(data, list) else 1 | |
| except EOFError: | |
| break | |
| except Exception as e: | |
| logging.error(f"Error calculating metadata size: {e}") | |
| logging.info(f"Total metadata chunks after append: {total_count}") | |
| return total_count | |
| def load_metadata(path: str) -> list: | |
| """Loads all objects from an appended pickle file into a single flat list.""" | |
| all_data = [] | |
| if not os.path.exists(path): | |
| return [] | |
| with open(path, "rb") as f: | |
| while True: | |
| try: | |
| all_data.extend(pickle.load(f)) | |
| except EOFError: | |
| break | |
| return all_data | |
| def compute_embeddings( | |
| texts: List[str], | |
| model_name: str = "nomic-ai/nomic-embed-text-v1", | |
| batch_size: int = 32 | |
| ) -> np.ndarray: | |
| if not texts: | |
| return np.zeros((0, 0), dtype='float32') | |
| # Path 1: SentenceTransformer (highly optimized) | |
| if SentenceTransformer is not None: | |
| try: | |
| # use device_map or cuda if available | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| model = SentenceTransformer(model_name, device=device) | |
| # ST handles batching and progress internally | |
| return model.encode(texts, batch_size=batch_size, show_progress_bar=False).astype('float32') | |
| except Exception: | |
| pass | |
| # Path 2: HF Fallback with 'device_map' for memory safety | |
| if not all([AutoTokenizer, AutoModel, torch]): | |
| raise RuntimeError("Missing dependencies: torch, transformers") | |
| # Use 'auto' to shard large models across GPU/CPU automatically | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) | |
| model_hf = AutoModel.from_pretrained(model_name, trust_remote_code=True, device_map="auto") | |
| all_emb = [] | |
| for i in range(0, len(texts), batch_size): | |
| batch = texts[i : i + batch_size] | |
| toks = tokenizer(batch, return_tensors="pt", padding=True, truncation=True).to(model_hf.device) | |
| with torch.no_grad(): | |
| out = model_hf(**toks) | |
| # Faster Mean Pooling using built-in torch ops | |
| mask = toks["attention_mask"].unsqueeze(-1).expand(out.last_hidden_state.size()).float() | |
| summed = torch.sum(out.last_hidden_state * mask, 1) | |
| counts = torch.clamp(mask.sum(1), min=1e-9) | |
| emb_batch = (summed / counts).cpu().numpy() | |
| all_emb.append(emb_batch.astype('float32')) | |
| return np.vstack(all_emb) | |