import os import logging import numpy as np import faiss from typing import List, Dict from sentence_transformers import SentenceTransformer logger = logging.getLogger(__name__) # Constants STORAGE_DIR = "storage" EMB_FILE = os.path.join(STORAGE_DIR, "embeddings_float16.npz") FAISS_FILE = os.path.join(STORAGE_DIR, "faiss_index.idx") # Model Init def init_model(model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): """Initialize SentenceTransformer model (cached by HuggingFace).""" logger.info(f"Loading embedding model: {model_name}") return SentenceTransformer(model_name) # Embeddings def build_embeddings(documents: List[Dict], model) -> np.ndarray: """ Creates or loads embeddings. Saves them compressed in float16 (.npz) to drastically reduce storage size. Always returns float32 (FAISS requires float32). """ os.makedirs(STORAGE_DIR, exist_ok=True) if os.path.exists(EMB_FILE): logger.info(f"Loading compressed embeddings from {EMB_FILE}") data = np.load(EMB_FILE) emb_f16 = data["emb"] embeddings = emb_f16.astype(np.float32) logger.info(f"Loaded embeddings: {embeddings.shape}") return embeddings # Compute embeddings from scratch logger.info("Creating embeddings from scratch... (first run may take time)") texts = [doc["text"] for doc in documents] embeddings = model.encode(texts, convert_to_numpy=True, batch_size=32) # Save compressed float16 emb_f16 = embeddings.astype(np.float16) np.savez_compressed(EMB_FILE, emb=emb_f16) logger.info(f"Saved compressed embeddings → {EMB_FILE} ({emb_f16.nbytes / 1e6:.1f} MB float16)") return embeddings.astype(np.float32) # FAISS Index def build_faiss_index(embeddings: np.ndarray): """ Create or load a FAISS index. If missing, builds it from embeddings and saves to disk. """ os.makedirs(STORAGE_DIR, exist_ok=True) dimension = embeddings.shape[1] if os.path.exists(FAISS_FILE): logger.info(f"Loading FAISS index from {FAISS_FILE}") index = faiss.read_index(FAISS_FILE) return index logger.info("Building new FAISS index (IndexFlatL2)...") index = faiss.IndexFlatL2(dimension) index.add(embeddings) faiss.write_index(index, FAISS_FILE) logger.info(f"FAISS index saved → {FAISS_FILE}") return index # BM25 def build_bm25(documents: List[Dict]): """ Build BM25 sparse index. """ from rank_bm25 import BM25Okapi texts = [doc["text"].split() for doc in documents] logger.info("Building BM25 index...") bm25 = BM25Okapi(texts) return bm25 # Semantic Search (via FAISS) def semantic_search(query: str, model, faiss_index, documents, k=5): """ Returns top-k documents ranked by dense semantic similarity (FAISS). """ query_emb = model.encode([query], convert_to_numpy=True).astype(np.float32) distances, indices = faiss_index.search(query_emb, k) results = [] for i, doc_id in enumerate(indices[0]): if doc_id == -1: continue results.append({ "score": float(distances[0][i]), "document": documents[doc_id] }) return results # BM25 Search def bm25_search(query: str, bm25, documents, k=5): """ Returns top-k documents ranked by sparse lexical BM25 similarity. """ scores = bm25.get_scores(query.split()) top_k = np.argsort(scores)[::-1][:k] results = [] for idx in top_k: results.append({ "score": float(scores[idx]), "document": documents[idx] }) return results # Hybrid Search (FAISS + BM25) def hybrid_search(query: str, model, faiss_index, bm25, documents, k=5, alpha=0.5): """ Combines semantic FAISS + lexical BM25 search. alpha = weight for semantic search (0..1) """ dense = semantic_search(query, model, faiss_index, documents, k) sparse = bm25_search(query, bm25, documents, k) # Normalize scores dense_scores = np.array([d["score"] for d in dense]) sparse_scores = np.array([s["score"] for s in sparse]) if len(dense_scores) > 0: dense_scores = (dense_scores - dense_scores.min()) / (dense_scores.max() - dense_scores.min() + 1e-9) if len(sparse_scores) > 0: sparse_scores = (sparse_scores - sparse_scores.min()) / (sparse_scores.max() - sparse_scores.min() + 1e-9) # Combine results combined = [] for i in range(k): doc_dense = dense[i]["document"] doc_sparse = sparse[i]["document"] score = alpha * dense_scores[i] + (1 - alpha) * sparse_scores[i] combined.append({ "score": float(score), "document": doc_dense if alpha >= 0.5 else doc_sparse }) # Sort final hybrid result combined = sorted(combined, key=lambda x: x["score"], reverse=True) return combined