Spaces:
Running
Running
File size: 3,979 Bytes
559dd34 5f9eeb4 559dd34 5f9eeb4 559dd34 57007fe 559dd34 5f9eeb4 559dd34 5f9eeb4 559dd34 57007fe 559dd34 57007fe 559dd34 5f9eeb4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | """Vector store abstraction and implementations."""
from abc import ABC, abstractmethod
from typing import Dict, Generator, List, Tuple
import marqo
from langchain_community.vectorstores import Marqo
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from pinecone import Pinecone
OPENAI_EMBEDDING_SIZE = 1536
Vector = Tuple[Dict, List[float]] # (metadata, embedding)
class VectorStore(ABC):
"""Abstract class for a vector store."""
@abstractmethod
def ensure_exists(self):
"""Ensures that the vector store exists. Creates it if it doesn't."""
@abstractmethod
def upsert_batch(self, vectors: List[Vector]):
"""Upserts a batch of vectors."""
def upsert(self, vectors: Generator[Vector, None, None]):
"""Upserts in batches of 100, since vector stores have a limit on upsert size."""
batch = []
for metadata, embedding in vectors:
batch.append((metadata, embedding))
if len(batch) == 100:
self.upsert_batch(batch)
batch = []
if batch:
self.upsert_batch(batch)
@abstractmethod
def to_langchain(self):
"""Converts the vector store to a LangChain vector store object."""
class PineconeVectorStore(VectorStore):
"""Vector store implementation using Pinecone."""
def __init__(self, index_name: str, namespace: str, dimension: int = OPENAI_EMBEDDING_SIZE):
self.index_name = index_name
self.dimension = dimension
self.client = Pinecone()
self.index = self.client.Index(self.index_name)
self.namespace = namespace
def ensure_exists(self):
if self.index_name not in self.client.list_indexes().names():
self.client.create_index(name=self.index_name, dimension=self.dimension, metric="cosine")
def upsert_batch(self, vectors: List[Vector]):
pinecone_vectors = [
(metadata.get("id", str(i)), embedding, metadata) for i, (metadata, embedding) in enumerate(vectors)
]
self.index.upsert(vectors=pinecone_vectors, namespace=self.namespace)
def to_langchain(self):
return Pinecone.from_existing_index(
index_name=self.index_name, embedding=OpenAIEmbeddings(), namespace=self.namespace
)
class MarqoVectorStore(VectorStore):
"""Vector store implementation using Marqo."""
def __init__(self, url: str, index_name: str):
self.client = marqo.Client(url=url)
self.index_name = index_name
def ensure_exists(self):
pass
def upsert_batch(self, vectors: List[Vector]):
# Since Marqo is both an embedder and a vector store, the embedder is already doing the upsert.
pass
def to_langchain(self):
vectorstore = Marqo(client=self.client, index_name=self.index_name)
# Monkey-patch the _construct_documents_from_results_without_score method to not expect a "metadata" field in
# the result, and instead take the "filename" directly from the result.
def patched_method(self, results):
documents: List[Document] = []
for res in results["hits"]:
documents.append(Document(page_content=res["text"], metadata={"filename": res["filename"]}))
return documents
vectorstore._construct_documents_from_results_without_score = patched_method.__get__(
vectorstore, vectorstore.__class__
)
return vectorstore
def build_from_args(args: dict) -> VectorStore:
"""Builds a vector store from the given command-line arguments."""
if args.vector_store_type == "pinecone":
return PineconeVectorStore(index_name=args.index_name, namespace=args.repo_id)
elif args.vector_store_type == "marqo":
return MarqoVectorStore(url=args.marqo_url, index_name=args.index_name)
else:
raise ValueError(f"Unrecognized vector store type {args.vector_store_type}")
|