code-crawler / src /vector_store.py
juliaturc's picture
Relax gradio version (causes urllib3 conflict in certain setups) (#20)
dfb1df2
raw
history blame
4.07 kB
"""Vector store abstraction and implementations."""
from abc import ABC, abstractmethod
from typing import Dict, Generator, List, Tuple
import marqo
from langchain_community.vectorstores import Marqo, Pinecone as LangChainPinecone
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from pinecone import Pinecone
Vector = Tuple[Dict, List[float]] # (metadata, embedding)
class VectorStore(ABC):
"""Abstract class for a vector store."""
@abstractmethod
def ensure_exists(self):
"""Ensures that the vector store exists. Creates it if it doesn't."""
@abstractmethod
def upsert_batch(self, vectors: List[Vector]):
"""Upserts a batch of vectors."""
def upsert(self, vectors: Generator[Vector, None, None]):
"""Upserts in batches of 100, since vector stores have a limit on upsert size."""
batch = []
for metadata, embedding in vectors:
batch.append((metadata, embedding))
if len(batch) == 100:
self.upsert_batch(batch)
batch = []
if batch:
self.upsert_batch(batch)
@abstractmethod
def to_langchain(self):
"""Converts the vector store to a LangChain vector store object."""
class PineconeVectorStore(VectorStore):
"""Vector store implementation using Pinecone."""
def __init__(self, index_name: str, namespace: str, dimension: int):
self.index_name = index_name
self.dimension = dimension
self.client = Pinecone()
self.index = self.client.Index(self.index_name)
self.namespace = namespace
def ensure_exists(self):
if self.index_name not in self.client.list_indexes().names():
self.client.create_index(name=self.index_name, dimension=self.dimension, metric="cosine")
def upsert_batch(self, vectors: List[Vector]):
pinecone_vectors = [
(metadata.get("id", str(i)), embedding, metadata) for i, (metadata, embedding) in enumerate(vectors)
]
self.index.upsert(vectors=pinecone_vectors, namespace=self.namespace)
def to_langchain(self):
return LangChainPinecone.from_existing_index(
index_name=self.index_name, embedding=OpenAIEmbeddings(), namespace=self.namespace
)
class MarqoVectorStore(VectorStore):
"""Vector store implementation using Marqo."""
def __init__(self, url: str, index_name: str):
self.client = marqo.Client(url=url)
self.index_name = index_name
def ensure_exists(self):
pass
def upsert_batch(self, vectors: List[Vector]):
# Since Marqo is both an embedder and a vector store, the embedder is already doing the upsert.
pass
def to_langchain(self):
vectorstore = Marqo(client=self.client, index_name=self.index_name)
# Monkey-patch the _construct_documents_from_results_without_score method to not expect a "metadata" field in
# the result, and instead take the "filename" directly from the result.
def patched_method(self, results):
documents: List[Document] = []
for res in results["hits"]:
documents.append(Document(page_content=res["text"], metadata={"filename": res["filename"]}))
return documents
vectorstore._construct_documents_from_results_without_score = patched_method.__get__(
vectorstore, vectorstore.__class__
)
return vectorstore
def build_from_args(args: dict) -> VectorStore:
"""Builds a vector store from the given command-line arguments."""
if args.vector_store_type == "pinecone":
dimension = args.embedding_size if "embedding_size" in args else None
return PineconeVectorStore(index_name=args.index_name, namespace=args.repo_id, dimension=dimension)
elif args.vector_store_type == "marqo":
return MarqoVectorStore(url=args.marqo_url, index_name=args.index_name)
else:
raise ValueError(f"Unrecognized vector store type {args.vector_store_type}")