File size: 3,979 Bytes
559dd34
 
 
 
 
5f9eeb4
 
 
 
559dd34
 
5f9eeb4
559dd34
 
 
 
 
57007fe
559dd34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5f9eeb4
 
 
 
559dd34
 
 
 
5f9eeb4
559dd34
 
 
 
 
 
 
 
57007fe
559dd34
 
 
57007fe
559dd34
 
5f9eeb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""Vector store abstraction and implementations."""

from abc import ABC, abstractmethod
from typing import Dict, Generator, List, Tuple

import marqo
from langchain_community.vectorstores import Marqo
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from pinecone import Pinecone

OPENAI_EMBEDDING_SIZE = 1536
Vector = Tuple[Dict, List[float]]  # (metadata, embedding)


class VectorStore(ABC):
    """Abstract class for a vector store."""

    @abstractmethod
    def ensure_exists(self):
        """Ensures that the vector store exists. Creates it if it doesn't."""

    @abstractmethod
    def upsert_batch(self, vectors: List[Vector]):
        """Upserts a batch of vectors."""

    def upsert(self, vectors: Generator[Vector, None, None]):
        """Upserts in batches of 100, since vector stores have a limit on upsert size."""
        batch = []
        for metadata, embedding in vectors:
            batch.append((metadata, embedding))
            if len(batch) == 100:
                self.upsert_batch(batch)
                batch = []
        if batch:
            self.upsert_batch(batch)

    @abstractmethod
    def to_langchain(self):
        """Converts the vector store to a LangChain vector store object."""


class PineconeVectorStore(VectorStore):
    """Vector store implementation using Pinecone."""

    def __init__(self, index_name: str, namespace: str, dimension: int = OPENAI_EMBEDDING_SIZE):
        self.index_name = index_name
        self.dimension = dimension
        self.client = Pinecone()
        self.index = self.client.Index(self.index_name)
        self.namespace = namespace

    def ensure_exists(self):
        if self.index_name not in self.client.list_indexes().names():
            self.client.create_index(name=self.index_name, dimension=self.dimension, metric="cosine")

    def upsert_batch(self, vectors: List[Vector]):
        pinecone_vectors = [
            (metadata.get("id", str(i)), embedding, metadata) for i, (metadata, embedding) in enumerate(vectors)
        ]
        self.index.upsert(vectors=pinecone_vectors, namespace=self.namespace)

    def to_langchain(self):
        return Pinecone.from_existing_index(
            index_name=self.index_name, embedding=OpenAIEmbeddings(), namespace=self.namespace
        )


class MarqoVectorStore(VectorStore):
    """Vector store implementation using Marqo."""

    def __init__(self, url: str, index_name: str):
        self.client = marqo.Client(url=url)
        self.index_name = index_name

    def ensure_exists(self):
        pass

    def upsert_batch(self, vectors: List[Vector]):
        # Since Marqo is both an embedder and a vector store, the embedder is already doing the upsert.
        pass

    def to_langchain(self):
        vectorstore = Marqo(client=self.client, index_name=self.index_name)

        # Monkey-patch the _construct_documents_from_results_without_score method to not expect a "metadata" field in
        # the result, and instead take the "filename" directly from the result.
        def patched_method(self, results):
            documents: List[Document] = []
            for res in results["hits"]:
                documents.append(Document(page_content=res["text"], metadata={"filename": res["filename"]}))
            return documents

        vectorstore._construct_documents_from_results_without_score = patched_method.__get__(
            vectorstore, vectorstore.__class__
        )
        return vectorstore


def build_from_args(args: dict) -> VectorStore:
    """Builds a vector store from the given command-line arguments."""
    if args.vector_store_type == "pinecone":
        return PineconeVectorStore(index_name=args.index_name, namespace=args.repo_id)
    elif args.vector_store_type == "marqo":
        return MarqoVectorStore(url=args.marqo_url, index_name=args.index_name)
    else:
        raise ValueError(f"Unrecognized vector store type {args.vector_store_type}")