from sentence_transformers import SentenceTransformer import numpy as np model = SentenceTransformer("all-MiniLM-L6-v2", cache_folder=".cache") dim = model.get_sentence_embedding_dimension() def embedd(documents): return model.encode(documents, convert_to_numpy=True) class VectorStore: def __init__(self, embeddings): pass def add(self, documents): pass def search(self, query_vector, top_k=5): pass import faiss import json class FaissVectorStore(VectorStore): def __init__(self): self.index = None self.texts = [] self.metadata = [] def add(self, items): # items: list of dicts with 'raw_text' and meta raw_texts = [x["raw_text"] for x in items] with open('test.json', 'w') as f: json.dump(raw_texts, f, indent=2) metas = [{k: v for k, v in x.items() if k != "raw_text"} for x in items] vectors = model.encode(raw_texts, convert_to_numpy=True).astype('float32') if self.index is None: d = vectors.shape[1] self.index = faiss.IndexFlatL2(d) self.index.add(vectors) self.texts.extend(raw_texts) self.metadata.extend(metas) def search(self, queries, k=3): q_vecs = model.encode(queries, convert_to_numpy=True).astype('float32') D, I = self.index.search(q_vecs, k) results = [] for i in range(len(queries)): results.append([ { "score": 1 - D[i][j] / 2, "text": self.texts[I[i][j]], "meta": self.metadata[I[i][j]] } for j in range(k) ]) return results from annoy import AnnoyIndex class AnnoyVectorStore(VectorStore): def __init__(self, n_trees=10): self.index = None self.texts = [] self.metadata = [] self.n_trees = n_trees def add(self, items): # items: list of dicts raw_texts = [x["raw_text"] for x in items] metas = [{k: v for k, v in x.items() if k != "raw_text"} for x in items] vectors = model.encode(raw_texts, convert_to_numpy=True) if self.index is None: self.index = AnnoyIndex(dim, 'angular') start = len(self.texts) for i, vec in enumerate(vectors): self.index.add_item(start + i, vec) self.texts.extend(raw_texts) self.metadata.extend(metas) self.index.build(self.n_trees) def search(self, queries, k=3): q_vecs = model.encode(queries, convert_to_numpy=True) results = [] for q in q_vecs: idxs, dists = self.index.get_nns_by_vector(q, k, include_distances=True) results.append([ { "score": 1 - dists[i] / 2, "text": self.texts[idxs[i]], "meta": self.metadata[idxs[i]] } for i in range(len(idxs)) ]) return results