Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import numpy as np | |
| import faiss | |
| from typing import List, Dict | |
| from sentence_transformers import SentenceTransformer | |
| logger = logging.getLogger(__name__) | |
| # Constants | |
| STORAGE_DIR = "storage" | |
| EMB_FILE = os.path.join(STORAGE_DIR, "embeddings_float16.npz") | |
| FAISS_FILE = os.path.join(STORAGE_DIR, "faiss_index.idx") | |
| # Model Init | |
| def init_model(model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): | |
| """Initialize SentenceTransformer model (cached by HuggingFace).""" | |
| logger.info(f"Loading embedding model: {model_name}") | |
| return SentenceTransformer(model_name) | |
| # Embeddings | |
| def build_embeddings(documents: List[Dict], model) -> np.ndarray: | |
| """ | |
| Creates or loads embeddings. | |
| Saves them compressed in float16 (.npz) to drastically reduce storage size. | |
| Always returns float32 (FAISS requires float32). | |
| """ | |
| os.makedirs(STORAGE_DIR, exist_ok=True) | |
| if os.path.exists(EMB_FILE): | |
| logger.info(f"Loading compressed embeddings from {EMB_FILE}") | |
| data = np.load(EMB_FILE) | |
| emb_f16 = data["emb"] | |
| embeddings = emb_f16.astype(np.float32) | |
| logger.info(f"Loaded embeddings: {embeddings.shape}") | |
| return embeddings | |
| # Compute embeddings from scratch | |
| logger.info("Creating embeddings from scratch... (first run may take time)") | |
| texts = [doc["text"] for doc in documents] | |
| embeddings = model.encode(texts, convert_to_numpy=True, batch_size=32) | |
| # Save compressed float16 | |
| emb_f16 = embeddings.astype(np.float16) | |
| np.savez_compressed(EMB_FILE, emb=emb_f16) | |
| logger.info(f"Saved compressed embeddings → {EMB_FILE} ({emb_f16.nbytes / 1e6:.1f} MB float16)") | |
| return embeddings.astype(np.float32) | |
| # FAISS Index | |
| def build_faiss_index(embeddings: np.ndarray): | |
| """ | |
| Create or load a FAISS index. | |
| If missing, builds it from embeddings and saves to disk. | |
| """ | |
| os.makedirs(STORAGE_DIR, exist_ok=True) | |
| dimension = embeddings.shape[1] | |
| if os.path.exists(FAISS_FILE): | |
| logger.info(f"Loading FAISS index from {FAISS_FILE}") | |
| index = faiss.read_index(FAISS_FILE) | |
| return index | |
| logger.info("Building new FAISS index (IndexFlatL2)...") | |
| index = faiss.IndexFlatL2(dimension) | |
| index.add(embeddings) | |
| faiss.write_index(index, FAISS_FILE) | |
| logger.info(f"FAISS index saved → {FAISS_FILE}") | |
| return index | |
| # BM25 | |
| def build_bm25(documents: List[Dict]): | |
| """ | |
| Build BM25 sparse index. | |
| """ | |
| from rank_bm25 import BM25Okapi | |
| texts = [doc["text"].split() for doc in documents] | |
| logger.info("Building BM25 index...") | |
| bm25 = BM25Okapi(texts) | |
| return bm25 | |
| # Semantic Search (via FAISS) | |
| def semantic_search(query: str, model, faiss_index, documents, k=5): | |
| """ | |
| Returns top-k documents ranked by dense semantic similarity (FAISS). | |
| """ | |
| query_emb = model.encode([query], convert_to_numpy=True).astype(np.float32) | |
| distances, indices = faiss_index.search(query_emb, k) | |
| results = [] | |
| for i, doc_id in enumerate(indices[0]): | |
| if doc_id == -1: | |
| continue | |
| results.append({ | |
| "score": float(distances[0][i]), | |
| "document": documents[doc_id] | |
| }) | |
| return results | |
| # BM25 Search | |
| def bm25_search(query: str, bm25, documents, k=5): | |
| """ | |
| Returns top-k documents ranked by sparse lexical BM25 similarity. | |
| """ | |
| scores = bm25.get_scores(query.split()) | |
| top_k = np.argsort(scores)[::-1][:k] | |
| results = [] | |
| for idx in top_k: | |
| results.append({ | |
| "score": float(scores[idx]), | |
| "document": documents[idx] | |
| }) | |
| return results | |
| # Hybrid Search (FAISS + BM25) | |
| def hybrid_search(query: str, model, faiss_index, bm25, documents, k=5, alpha=0.5): | |
| """ | |
| Combines semantic FAISS + lexical BM25 search. | |
| alpha = weight for semantic search (0..1) | |
| """ | |
| dense = semantic_search(query, model, faiss_index, documents, k) | |
| sparse = bm25_search(query, bm25, documents, k) | |
| # Normalize scores | |
| dense_scores = np.array([d["score"] for d in dense]) | |
| sparse_scores = np.array([s["score"] for s in sparse]) | |
| if len(dense_scores) > 0: | |
| dense_scores = (dense_scores - dense_scores.min()) / (dense_scores.max() - dense_scores.min() + 1e-9) | |
| if len(sparse_scores) > 0: | |
| sparse_scores = (sparse_scores - sparse_scores.min()) / (sparse_scores.max() - sparse_scores.min() + 1e-9) | |
| # Combine results | |
| combined = [] | |
| for i in range(k): | |
| doc_dense = dense[i]["document"] | |
| doc_sparse = sparse[i]["document"] | |
| score = alpha * dense_scores[i] + (1 - alpha) * sparse_scores[i] | |
| combined.append({ | |
| "score": float(score), | |
| "document": doc_dense if alpha >= 0.5 else doc_sparse | |
| }) | |
| # Sort final hybrid result | |
| combined = sorted(combined, key=lambda x: x["score"], reverse=True) | |
| return combined | |