Insurance-RAG / rag /vector_store.py
DeltaVenom's picture
Update app code and initialize runtime databases
72bff80
import os
import threading
import json
import hashlib
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_core.documents import Document
from typing import List
from langchain_core.embeddings import Embeddings
from typing import List
class CachedEmbeddings(Embeddings):
"""
Wrapper for embeddings to cache results locally.
Avoids re-computing embeddings for identical text.
"""
def __init__(self, wrapped_embeddings, cache_path="rag/embeddings_cache.json"):
self.wrapped = wrapped_embeddings
self.cache_path = cache_path
self.cache = {}
self._load_cache()
self._lock = threading.Lock()
def _load_cache(self):
if os.path.exists(self.cache_path):
try:
with open(self.cache_path, "r", encoding="utf-8") as f:
self.cache = json.load(f)
except: self.cache = {}
def _save_cache(self):
with self._lock:
try:
os.makedirs(os.path.dirname(self.cache_path), exist_ok=True)
with open(self.cache_path, "w", encoding="utf-8") as f:
json.dump(self.cache, f)
except Exception as e:
print(f"Failed to save embedding cache: {e}")
def embed_documents(self, texts: List[str]) -> List[List[float]]:
results = []
texts_to_embed = []
indices_to_embed = []
# Check cache
for i, text in enumerate(texts):
h = hashlib.md5(text.encode()).hexdigest()
if h in self.cache:
results.append(self.cache[h])
else:
results.append(None) # Placeholder
texts_to_embed.append(text)
indices_to_embed.append(i)
# Compute missing
if texts_to_embed:
print(f"Computing embeddings for {len(texts_to_embed)} new items...")
new_embeddings = self.wrapped.embed_documents(texts_to_embed)
for idx, emb, text in zip(indices_to_embed, new_embeddings, texts_to_embed):
results[idx] = emb
h = hashlib.md5(text.encode()).hexdigest()
self.cache[h] = emb
# Save incrementally
self._save_cache()
return results
def embed_query(self, text: str) -> List[float]:
h = hashlib.md5(text.encode()).hexdigest()
if h in self.cache:
return self.cache[h]
emb = self.wrapped.embed_query(text)
self.cache[h] = emb
self._save_cache()
return emb
class VectorStoreManager:
_embeddings = None
_lock = threading.Lock()
def __init__(self, index_path: str = "rag/faiss_index"):
self.index_path = index_path
if VectorStoreManager._embeddings is None:
# Load embeddings model once
base_embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
# Wrap with caching
VectorStoreManager._embeddings = CachedEmbeddings(base_embeddings)
self.embeddings = VectorStoreManager._embeddings
def create_vector_store(self, documents: List[Document], batch_size: int = 100):
"""
Creates a new FAISS index from the provided documents and saves it locally.
Uses batching and tqdm to show progress.
"""
if not documents:
print("No documents to index.")
return
from tqdm import tqdm
print(f"Creating vector store with {len(documents)} chunks...")
# Initialize with first batch
first_batch = documents[:batch_size]
vector_store = FAISS.from_documents(first_batch, self.embeddings)
# Add remaining batches with progress bar
if len(documents) > batch_size:
for i in tqdm(range(batch_size, len(documents), batch_size), desc="Creating Vectors"):
batch = documents[i : i + batch_size]
vector_store.add_documents(batch)
# Save to disk
with VectorStoreManager._lock:
vector_store.save_local(self.index_path)
print(f"Vector store saved to {self.index_path}")
return vector_store
def load_vector_store(self):
"""
Loads the existing FAISS index from disk.
"""
if not os.path.exists(self.index_path):
raise FileNotFoundError(f"Index not found at {self.index_path}. Run ingestion first.")
with VectorStoreManager._lock:
return FAISS.load_local(
self.index_path,
self.embeddings,
allow_dangerous_deserialization=True
)
def update_vector_store(self, documents: List[Document], batch_size: int = 100):
"""
Loads an existing index, adds new documents, and saves.
Uses batching and tqdm to show progress.
"""
if not documents:
return
if not os.path.exists(self.index_path):
print("Index doesn't exist. Creating new one...")
return self.create_vector_store(documents, batch_size=batch_size)
from tqdm import tqdm
print(f"Updating existing index with {len(documents)} new chunks...")
vector_store = self.load_vector_store()
for i in tqdm(range(0, len(documents), batch_size), desc="Updating Vectors"):
batch = documents[i : i + batch_size]
vector_store.add_documents(batch)
with VectorStoreManager._lock:
vector_store.save_local(self.index_path)
print(f"Update complete. Saved to {self.index_path}")
def delete_documents_by_source(self, source_path: str):
"""
Removes all documents from the index that match the given source path.
"""
if not os.path.exists(self.index_path):
return
vector_store = self.load_vector_store()
# Identify IDs to delete
ids_to_delete = []
# docstore._dict is {id: Document}
for doc_id, doc in vector_store.docstore._dict.items():
# Check source match (handle both absolute and relative discrepancies if needed)
# source_path should be consistent with how it was ingested
if doc.metadata.get("source") == source_path:
ids_to_delete.append(doc_id)
if ids_to_delete:
print(f"Deleting {len(ids_to_delete)} chunks for source: {source_path}")
with VectorStoreManager._lock:
vector_store.delete(ids_to_delete)
vector_store.save_local(self.index_path)
print("Deletion complete and index saved.")
else:
print(f"No documents found for source: {source_path}")