advance-multidoc-rag / src /embeddings_utils.py
Fnu Mahnoor
Fix inference
ab97519
import os
import pickle
import faiss
import numpy as np
from typing import List, Optional
import logging
import torch
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
def load_faiss_index(index_path: str):
"""
Loads a binary FAISS index from disk.
"""
if not os.path.exists(index_path):
raise FileNotFoundError(f"FAISS index not found at {index_path}")
return faiss.read_index(index_path)
def normalize_embeddings(embeddings: np.ndarray) -> np.ndarray:
"""
Applies L2 normalization to embeddings.
This converts Euclidean distance search into Cosine Similarity search.
"""
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
return embeddings / norms
def save_faiss_index(index: faiss.Index, index_path: str):
"""
Saves the FAISS binary index to disk.
"""
try:
os.makedirs(os.path.dirname(index_path), exist_ok=True)
faiss.write_index(index, index_path)
logging.info(f"Successfully saved FAISS index to {index_path}")
except Exception as e:
logging.error(f"Failed to save FAISS index: {e}")
raise
def save_metadata(metadata: list, meta_path: str):
"""
Saves the document metadata list using pickle.
"""
try:
os.makedirs(os.path.dirname(meta_path), exist_ok=True)
with open(meta_path, 'wb') as f:
pickle.dump(metadata, f)
logging.info(f"Successfully saved metadata to {meta_path}")
except Exception as e:
logging.error(f"Failed to save metadata: {e}")
raise
def add_embeddings_to_index(index_path: str, embeddings: np.ndarray):
"""Optimized FAISS management with unit normalization for Cosine Similarity."""
embeddings = embeddings.astype('float32')
# 1. Always normalize for 'Inner Product' to simulate Cosine Similarity
faiss.normalize_L2(embeddings)
dim = embeddings.shape[1]
if os.path.exists(index_path):
idx = faiss.read_index(index_path)
if idx.d != dim:
raise ValueError(f"Dimension mismatch: Index {idx.d} vs New {dim}")
idx.add(embeddings)
else:
# 2. Use IndexFlatIP (Inner Product) + Normalization = Cosine Similarity
idx = faiss.IndexFlatIP(dim)
idx.add(embeddings)
faiss.write_index(idx, index_path)
def append_metadata(meta_path: str, new_meta: list) -> int:
"""
Efficiently appends to a pickle file using 'ab' (append binary) mode.
This avoids loading the entire existing metadata list into memory.
And returns the TOTAL count of chunks in the file.
"""
os.makedirs(os.path.dirname(meta_path), exist_ok=True)
# 1. Perform the append
with open(meta_path, "ab") as f:
pickle.dump(new_meta, f, protocol=pickle.HIGHEST_PROTOCOL)
# 2. Calculate the total size by reading the "stacked" objects
total_count = 0
try:
with open(meta_path, "rb") as f:
while True:
try:
data = pickle.load(f)
# If data is a list, add its length; if it's a single dict, add 1
total_count += len(data) if isinstance(data, list) else 1
except EOFError:
break
except Exception as e:
logging.error(f"Error calculating metadata size: {e}")
logging.info(f"Total metadata chunks after append: {total_count}")
return total_count
def load_metadata(path: str) -> list:
"""Loads all objects from an appended pickle file into a single flat list."""
all_data = []
if not os.path.exists(path):
return []
with open(path, "rb") as f:
while True:
try:
all_data.extend(pickle.load(f))
except EOFError:
break
return all_data
def compute_embeddings(
texts: List[str],
model_name: str = "nomic-ai/nomic-embed-text-v1",
batch_size: int = 32
) -> np.ndarray:
if not texts:
return np.zeros((0, 0), dtype='float32')
# Path 1: SentenceTransformer (highly optimized)
if SentenceTransformer is not None:
try:
# use device_map or cuda if available
device = "cuda" if torch.cuda.is_available() else "cpu"
model = SentenceTransformer(model_name, device=device)
# ST handles batching and progress internally
return model.encode(texts, batch_size=batch_size, show_progress_bar=False).astype('float32')
except Exception:
pass
# Path 2: HF Fallback with 'device_map' for memory safety
if not all([AutoTokenizer, AutoModel, torch]):
raise RuntimeError("Missing dependencies: torch, transformers")
# Use 'auto' to shard large models across GPU/CPU automatically
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
model_hf = AutoModel.from_pretrained(model_name, trust_remote_code=True, device_map="auto")
all_emb = []
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
toks = tokenizer(batch, return_tensors="pt", padding=True, truncation=True).to(model_hf.device)
with torch.no_grad():
out = model_hf(**toks)
# Faster Mean Pooling using built-in torch ops
mask = toks["attention_mask"].unsqueeze(-1).expand(out.last_hidden_state.size()).float()
summed = torch.sum(out.last_hidden_state * mask, 1)
counts = torch.clamp(mask.sum(1), min=1e-9)
emb_batch = (summed / counts).cpu().numpy()
all_emb.append(emb_batch.astype('float32'))
return np.vstack(all_emb)