Spaces:
Running
Running
| import os | |
| import threading | |
| import json | |
| import hashlib | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_core.documents import Document | |
| from typing import List | |
| from langchain_core.embeddings import Embeddings | |
| from typing import List | |
| class CachedEmbeddings(Embeddings): | |
| """ | |
| Wrapper for embeddings to cache results locally. | |
| Avoids re-computing embeddings for identical text. | |
| """ | |
| def __init__(self, wrapped_embeddings, cache_path="rag/embeddings_cache.json"): | |
| self.wrapped = wrapped_embeddings | |
| self.cache_path = cache_path | |
| self.cache = {} | |
| self._load_cache() | |
| self._lock = threading.Lock() | |
| def _load_cache(self): | |
| if os.path.exists(self.cache_path): | |
| try: | |
| with open(self.cache_path, "r", encoding="utf-8") as f: | |
| self.cache = json.load(f) | |
| except: self.cache = {} | |
| def _save_cache(self): | |
| with self._lock: | |
| try: | |
| os.makedirs(os.path.dirname(self.cache_path), exist_ok=True) | |
| with open(self.cache_path, "w", encoding="utf-8") as f: | |
| json.dump(self.cache, f) | |
| except Exception as e: | |
| print(f"Failed to save embedding cache: {e}") | |
| def embed_documents(self, texts: List[str]) -> List[List[float]]: | |
| results = [] | |
| texts_to_embed = [] | |
| indices_to_embed = [] | |
| # Check cache | |
| for i, text in enumerate(texts): | |
| h = hashlib.md5(text.encode()).hexdigest() | |
| if h in self.cache: | |
| results.append(self.cache[h]) | |
| else: | |
| results.append(None) # Placeholder | |
| texts_to_embed.append(text) | |
| indices_to_embed.append(i) | |
| # Compute missing | |
| if texts_to_embed: | |
| print(f"Computing embeddings for {len(texts_to_embed)} new items...") | |
| new_embeddings = self.wrapped.embed_documents(texts_to_embed) | |
| for idx, emb, text in zip(indices_to_embed, new_embeddings, texts_to_embed): | |
| results[idx] = emb | |
| h = hashlib.md5(text.encode()).hexdigest() | |
| self.cache[h] = emb | |
| # Save incrementally | |
| self._save_cache() | |
| return results | |
| def embed_query(self, text: str) -> List[float]: | |
| h = hashlib.md5(text.encode()).hexdigest() | |
| if h in self.cache: | |
| return self.cache[h] | |
| emb = self.wrapped.embed_query(text) | |
| self.cache[h] = emb | |
| self._save_cache() | |
| return emb | |
| class VectorStoreManager: | |
| _embeddings = None | |
| _lock = threading.Lock() | |
| def __init__(self, index_path: str = "rag/faiss_index"): | |
| self.index_path = index_path | |
| if VectorStoreManager._embeddings is None: | |
| # Load embeddings model once | |
| base_embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2" | |
| ) | |
| # Wrap with caching | |
| VectorStoreManager._embeddings = CachedEmbeddings(base_embeddings) | |
| self.embeddings = VectorStoreManager._embeddings | |
| def create_vector_store(self, documents: List[Document], batch_size: int = 100): | |
| """ | |
| Creates a new FAISS index from the provided documents and saves it locally. | |
| Uses batching and tqdm to show progress. | |
| """ | |
| if not documents: | |
| print("No documents to index.") | |
| return | |
| from tqdm import tqdm | |
| print(f"Creating vector store with {len(documents)} chunks...") | |
| # Initialize with first batch | |
| first_batch = documents[:batch_size] | |
| vector_store = FAISS.from_documents(first_batch, self.embeddings) | |
| # Add remaining batches with progress bar | |
| if len(documents) > batch_size: | |
| for i in tqdm(range(batch_size, len(documents), batch_size), desc="Creating Vectors"): | |
| batch = documents[i : i + batch_size] | |
| vector_store.add_documents(batch) | |
| # Save to disk | |
| with VectorStoreManager._lock: | |
| vector_store.save_local(self.index_path) | |
| print(f"Vector store saved to {self.index_path}") | |
| return vector_store | |
| def load_vector_store(self): | |
| """ | |
| Loads the existing FAISS index from disk. | |
| """ | |
| if not os.path.exists(self.index_path): | |
| raise FileNotFoundError(f"Index not found at {self.index_path}. Run ingestion first.") | |
| with VectorStoreManager._lock: | |
| return FAISS.load_local( | |
| self.index_path, | |
| self.embeddings, | |
| allow_dangerous_deserialization=True | |
| ) | |
| def update_vector_store(self, documents: List[Document], batch_size: int = 100): | |
| """ | |
| Loads an existing index, adds new documents, and saves. | |
| Uses batching and tqdm to show progress. | |
| """ | |
| if not documents: | |
| return | |
| if not os.path.exists(self.index_path): | |
| print("Index doesn't exist. Creating new one...") | |
| return self.create_vector_store(documents, batch_size=batch_size) | |
| from tqdm import tqdm | |
| print(f"Updating existing index with {len(documents)} new chunks...") | |
| vector_store = self.load_vector_store() | |
| for i in tqdm(range(0, len(documents), batch_size), desc="Updating Vectors"): | |
| batch = documents[i : i + batch_size] | |
| vector_store.add_documents(batch) | |
| with VectorStoreManager._lock: | |
| vector_store.save_local(self.index_path) | |
| print(f"Update complete. Saved to {self.index_path}") | |
| def delete_documents_by_source(self, source_path: str): | |
| """ | |
| Removes all documents from the index that match the given source path. | |
| """ | |
| if not os.path.exists(self.index_path): | |
| return | |
| vector_store = self.load_vector_store() | |
| # Identify IDs to delete | |
| ids_to_delete = [] | |
| # docstore._dict is {id: Document} | |
| for doc_id, doc in vector_store.docstore._dict.items(): | |
| # Check source match (handle both absolute and relative discrepancies if needed) | |
| # source_path should be consistent with how it was ingested | |
| if doc.metadata.get("source") == source_path: | |
| ids_to_delete.append(doc_id) | |
| if ids_to_delete: | |
| print(f"Deleting {len(ids_to_delete)} chunks for source: {source_path}") | |
| with VectorStoreManager._lock: | |
| vector_store.delete(ids_to_delete) | |
| vector_store.save_local(self.index_path) | |
| print("Deletion complete and index saved.") | |
| else: | |
| print(f"No documents found for source: {source_path}") | |