Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import uuid | |
| import pickle | |
| import hashlib | |
| from typing import List, Dict | |
| import faiss | |
| import numpy as np | |
| from sentence_transformers import SentenceTransformer | |
| class VectorStore: | |
| def __init__(self): | |
| """Initialize FAISS vector store and embedding model""" | |
| model_name = os.getenv("EMBEDDING_MODEL", "all-MiniLM-L6-v2") | |
| self.embedding_model = SentenceTransformer(model_name) | |
| self.embedding_dim = 384 | |
| self.index_path = "faiss_index.bin" | |
| self.docs_path = "faiss_docs.pkl" | |
| if os.path.exists(self.index_path) and os.path.exists(self.docs_path): | |
| self.index = faiss.read_index(self.index_path) | |
| with open(self.docs_path, "rb") as f: | |
| self.docs = pickle.load(f) | |
| print(f"β Loaded existing FAISS index with {len(self.docs)} documents") | |
| else: | |
| self.index = faiss.IndexFlatL2(self.embedding_dim) | |
| self.docs = [] | |
| print("π Created new FAISS index") | |
| def add_documents(self, chunks: List[Dict]) -> bool: | |
| try: | |
| if not chunks: | |
| print("β οΈ No chunks to add") | |
| return False | |
| print(f"π₯ Adding {len(chunks)} chunks to FAISS vector store...") | |
| texts = [chunk['text'] for chunk in chunks] | |
| vectors = self.embedding_model.encode(texts, show_progress_bar=True) | |
| self.index.add(np.array(vectors).astype("float32")) | |
| # Add metadata | |
| for i, chunk in enumerate(chunks): | |
| chunk['vector_index'] = len(self.docs) + i | |
| chunk['chunk_id'] = chunk.get('chunk_id', i) | |
| self.docs.append(chunk) | |
| # Save index and docs | |
| faiss.write_index(self.index, self.index_path) | |
| with open(self.docs_path, "wb") as f: | |
| pickle.dump(self.docs, f) | |
| print(f"β Successfully added and saved {len(chunks)} documents.") | |
| return True | |
| except Exception as e: | |
| print(f"β Error adding documents: {str(e)}") | |
| return False | |
| def search_similar(self, query: str, top_k: int = 5) -> List[Dict]: | |
| try: | |
| query_vec = self.embedding_model.encode([query]) | |
| D, I = self.index.search(np.array(query_vec).astype("float32"), top_k) | |
| similar_docs = [] | |
| for i, idx in enumerate(I[0]): | |
| if idx < len(self.docs): | |
| doc = self.docs[idx] | |
| score = float(D[0][i]) # FAISS L2 distance | |
| similar_docs.append({ | |
| 'id': self._create_chunk_id(doc, idx), | |
| 'score': score, | |
| 'text': doc.get('text', ''), | |
| 'url': doc.get('url', ''), | |
| 'title': doc.get('title', ''), | |
| 'chunk_id': doc.get('chunk_id', 0) | |
| }) | |
| # Ensure sorted by closest match (smallest L2 distance) | |
| similar_docs = sorted(similar_docs, key=lambda x: x['score']) | |
| print("\nπ§ Retrieved Chunks:") | |
| for doc in similar_docs: | |
| print(f"- Score: {doc['score']:.2f} | Text: {doc['text'][:120]}...\n") | |
| return similar_docs | |
| except Exception as e: | |
| print(f"β Error searching: {str(e)}") | |
| return [] | |
| def get_index_stats(self) -> Dict: | |
| return { | |
| 'total_vectors': self.index.ntotal, | |
| 'dimension': self.embedding_dim | |
| } | |
| def delete_all(self) -> bool: | |
| try: | |
| self.index = faiss.IndexFlatL2(self.embedding_dim) | |
| self.docs = [] | |
| if os.path.exists(self.index_path): os.remove(self.index_path) | |
| if os.path.exists(self.docs_path): os.remove(self.docs_path) | |
| print("ποΈ All FAISS vectors and docs deleted") | |
| return True | |
| except Exception as e: | |
| print(f"β Error deleting vectors: {str(e)}") | |
| return False | |
| def _create_chunk_id(self, chunk: Dict, index: int) -> str: | |
| url = chunk.get('url', 'unknown') | |
| url_base = url.replace('https://', '').replace('http://', '').replace('/', '_') | |
| return f"{url_base}_{index}_{str(uuid.uuid4())[:8]}" | |
| # Test run | |
| if __name__ == "__main__": | |
| vs = VectorStore() | |
| sample_chunks = [ | |
| { | |
| 'text': 'Machine learning is a subset of artificial intelligence that focuses on algorithms.', | |
| 'url': 'https://cloud.google.com/learn/artificial-intelligence-vs-machine-learning?hl=en', | |
| 'title': 'Machine Learning Basics', | |
| 'chunk_id': 0 | |
| }, | |
| { | |
| 'text': 'Deep learning uses neural networks with multiple layers to learn complex patterns.', | |
| 'url': 'https://www.ibm.com/think/topics/deep-learning', | |
| 'title': 'Deep Learning Guide', | |
| 'chunk_id': 1 | |
| } | |
| ] | |
| if vs.add_documents(sample_chunks): | |
| results = vs.search_similar("What is machine learning?", top_k=2) | |
| for r in results: | |
| print(f"Score: {r['score']:.3f} | Text: {r['text'][:80]}...") | |
| print("π Stats:", vs.get_index_stats()) | |
| if vs.add_documents(sample_chunks): | |
| print("\nβ Added chunks:") | |
| for i, chunk in enumerate(vs.docs[-len(sample_chunks):]): | |
| print(f"\nπ Chunk {i}:") | |
| print(chunk['text']) # full text of the chunk | |