Financial_RAG / vector_store.py
amaherovskyi's picture
Update vector_store.py
5b933aa verified
import os
import logging
import numpy as np
import faiss
from typing import List, Dict
from sentence_transformers import SentenceTransformer
logger = logging.getLogger(__name__)
# Constants
STORAGE_DIR = "storage"
EMB_FILE = os.path.join(STORAGE_DIR, "embeddings_float16.npz")
FAISS_FILE = os.path.join(STORAGE_DIR, "faiss_index.idx")
# Model Init
def init_model(model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
"""Initialize SentenceTransformer model (cached by HuggingFace)."""
logger.info(f"Loading embedding model: {model_name}")
return SentenceTransformer(model_name)
# Embeddings
def build_embeddings(documents: List[Dict], model) -> np.ndarray:
"""
Creates or loads embeddings.
Saves them compressed in float16 (.npz) to drastically reduce storage size.
Always returns float32 (FAISS requires float32).
"""
os.makedirs(STORAGE_DIR, exist_ok=True)
if os.path.exists(EMB_FILE):
logger.info(f"Loading compressed embeddings from {EMB_FILE}")
data = np.load(EMB_FILE)
emb_f16 = data["emb"]
embeddings = emb_f16.astype(np.float32)
logger.info(f"Loaded embeddings: {embeddings.shape}")
return embeddings
# Compute embeddings from scratch
logger.info("Creating embeddings from scratch... (first run may take time)")
texts = [doc["text"] for doc in documents]
embeddings = model.encode(texts, convert_to_numpy=True, batch_size=32)
# Save compressed float16
emb_f16 = embeddings.astype(np.float16)
np.savez_compressed(EMB_FILE, emb=emb_f16)
logger.info(f"Saved compressed embeddings → {EMB_FILE} ({emb_f16.nbytes / 1e6:.1f} MB float16)")
return embeddings.astype(np.float32)
# FAISS Index
def build_faiss_index(embeddings: np.ndarray):
"""
Create or load a FAISS index.
If missing, builds it from embeddings and saves to disk.
"""
os.makedirs(STORAGE_DIR, exist_ok=True)
dimension = embeddings.shape[1]
if os.path.exists(FAISS_FILE):
logger.info(f"Loading FAISS index from {FAISS_FILE}")
index = faiss.read_index(FAISS_FILE)
return index
logger.info("Building new FAISS index (IndexFlatL2)...")
index = faiss.IndexFlatL2(dimension)
index.add(embeddings)
faiss.write_index(index, FAISS_FILE)
logger.info(f"FAISS index saved → {FAISS_FILE}")
return index
# BM25
def build_bm25(documents: List[Dict]):
"""
Build BM25 sparse index.
"""
from rank_bm25 import BM25Okapi
texts = [doc["text"].split() for doc in documents]
logger.info("Building BM25 index...")
bm25 = BM25Okapi(texts)
return bm25
# Semantic Search (via FAISS)
def semantic_search(query: str, model, faiss_index, documents, k=5):
"""
Returns top-k documents ranked by dense semantic similarity (FAISS).
"""
query_emb = model.encode([query], convert_to_numpy=True).astype(np.float32)
distances, indices = faiss_index.search(query_emb, k)
results = []
for i, doc_id in enumerate(indices[0]):
if doc_id == -1:
continue
results.append({
"score": float(distances[0][i]),
"document": documents[doc_id]
})
return results
# BM25 Search
def bm25_search(query: str, bm25, documents, k=5):
"""
Returns top-k documents ranked by sparse lexical BM25 similarity.
"""
scores = bm25.get_scores(query.split())
top_k = np.argsort(scores)[::-1][:k]
results = []
for idx in top_k:
results.append({
"score": float(scores[idx]),
"document": documents[idx]
})
return results
# Hybrid Search (FAISS + BM25)
def hybrid_search(query: str, model, faiss_index, bm25, documents, k=5, alpha=0.5):
"""
Combines semantic FAISS + lexical BM25 search.
alpha = weight for semantic search (0..1)
"""
dense = semantic_search(query, model, faiss_index, documents, k)
sparse = bm25_search(query, bm25, documents, k)
# Normalize scores
dense_scores = np.array([d["score"] for d in dense])
sparse_scores = np.array([s["score"] for s in sparse])
if len(dense_scores) > 0:
dense_scores = (dense_scores - dense_scores.min()) / (dense_scores.max() - dense_scores.min() + 1e-9)
if len(sparse_scores) > 0:
sparse_scores = (sparse_scores - sparse_scores.min()) / (sparse_scores.max() - sparse_scores.min() + 1e-9)
# Combine results
combined = []
for i in range(k):
doc_dense = dense[i]["document"]
doc_sparse = sparse[i]["document"]
score = alpha * dense_scores[i] + (1 - alpha) * sparse_scores[i]
combined.append({
"score": float(score),
"document": doc_dense if alpha >= 0.5 else doc_sparse
})
# Sort final hybrid result
combined = sorted(combined, key=lambda x: x["score"], reverse=True)
return combined