Spaces:
Running
Running
| """Vector store abstraction and implementations.""" | |
| from abc import ABC, abstractmethod | |
| from typing import Dict, Generator, List, Tuple | |
| import marqo | |
| from langchain_community.vectorstores import Marqo, Pinecone as LangChainPinecone | |
| from langchain_core.documents import Document | |
| from langchain_openai import OpenAIEmbeddings | |
| from pinecone import Pinecone | |
| Vector = Tuple[Dict, List[float]] # (metadata, embedding) | |
| class VectorStore(ABC): | |
| """Abstract class for a vector store.""" | |
| def ensure_exists(self): | |
| """Ensures that the vector store exists. Creates it if it doesn't.""" | |
| def upsert_batch(self, vectors: List[Vector]): | |
| """Upserts a batch of vectors.""" | |
| def upsert(self, vectors: Generator[Vector, None, None]): | |
| """Upserts in batches of 100, since vector stores have a limit on upsert size.""" | |
| batch = [] | |
| for metadata, embedding in vectors: | |
| batch.append((metadata, embedding)) | |
| if len(batch) == 100: | |
| self.upsert_batch(batch) | |
| batch = [] | |
| if batch: | |
| self.upsert_batch(batch) | |
| def to_langchain(self): | |
| """Converts the vector store to a LangChain vector store object.""" | |
| class PineconeVectorStore(VectorStore): | |
| """Vector store implementation using Pinecone.""" | |
| def __init__(self, index_name: str, namespace: str, dimension: int): | |
| self.index_name = index_name | |
| self.dimension = dimension | |
| self.client = Pinecone() | |
| self.index = self.client.Index(self.index_name) | |
| self.namespace = namespace | |
| def ensure_exists(self): | |
| if self.index_name not in self.client.list_indexes().names(): | |
| self.client.create_index(name=self.index_name, dimension=self.dimension, metric="cosine") | |
| def upsert_batch(self, vectors: List[Vector]): | |
| pinecone_vectors = [ | |
| (metadata.get("id", str(i)), embedding, metadata) for i, (metadata, embedding) in enumerate(vectors) | |
| ] | |
| self.index.upsert(vectors=pinecone_vectors, namespace=self.namespace) | |
| def to_langchain(self): | |
| return LangChainPinecone.from_existing_index( | |
| index_name=self.index_name, embedding=OpenAIEmbeddings(), namespace=self.namespace | |
| ) | |
| class MarqoVectorStore(VectorStore): | |
| """Vector store implementation using Marqo.""" | |
| def __init__(self, url: str, index_name: str): | |
| self.client = marqo.Client(url=url) | |
| self.index_name = index_name | |
| def ensure_exists(self): | |
| pass | |
| def upsert_batch(self, vectors: List[Vector]): | |
| # Since Marqo is both an embedder and a vector store, the embedder is already doing the upsert. | |
| pass | |
| def to_langchain(self): | |
| vectorstore = Marqo(client=self.client, index_name=self.index_name) | |
| # Monkey-patch the _construct_documents_from_results_without_score method to not expect a "metadata" field in | |
| # the result, and instead take the "filename" directly from the result. | |
| def patched_method(self, results): | |
| documents: List[Document] = [] | |
| for res in results["hits"]: | |
| documents.append(Document(page_content=res["text"], metadata={"filename": res["filename"]})) | |
| return documents | |
| vectorstore._construct_documents_from_results_without_score = patched_method.__get__( | |
| vectorstore, vectorstore.__class__ | |
| ) | |
| return vectorstore | |
| def build_from_args(args: dict) -> VectorStore: | |
| """Builds a vector store from the given command-line arguments.""" | |
| if args.vector_store_type == "pinecone": | |
| dimension = args.embedding_size if "embedding_size" in args else None | |
| return PineconeVectorStore(index_name=args.index_name, namespace=args.repo_id, dimension=dimension) | |
| elif args.vector_store_type == "marqo": | |
| return MarqoVectorStore(url=args.marqo_url, index_name=args.index_name) | |
| else: | |
| raise ValueError(f"Unrecognized vector store type {args.vector_store_type}") | |