juliaturc commited on
Commit
e8553c3
·
1 Parent(s): 40b4763

Support for indexing with Marqo

Browse files
Files changed (4) hide show
  1. requirements.txt +1 -0
  2. src/chunker.py +17 -0
  3. src/embedder.py +70 -21
  4. src/index.py +39 -6
requirements.txt CHANGED
@@ -4,6 +4,7 @@ gradio==4.42.0
4
  langchain==0.2.14
5
  langchain-community==0.2.12
6
  langchain-openai==0.1.22
 
7
  nbformat==5.10.4
8
  openai==1.42.0
9
  pinecone==5.0.1
 
4
  langchain==0.2.14
5
  langchain-community==0.2.12
6
  langchain-openai==0.1.22
7
+ marqo==3.7.0
8
  nbformat==5.10.4
9
  openai==1.42.0
10
  pinecone==5.0.1
src/chunker.py CHANGED
@@ -30,6 +30,23 @@ class Chunk:
30
  """The text content to be embedded. Might contain information beyond just the text snippet from the file."""
31
  return self._content
32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
  def populate_content(self, file_content: str):
34
  """Populates the content of the chunk with the file path and file content."""
35
  self._content = (
 
30
  """The text content to be embedded. Might contain information beyond just the text snippet from the file."""
31
  return self._content
32
 
33
+ @property
34
+ def to_dict(self):
35
+ """Converts the chunk to a dictionary that can be passed to a vector store."""
36
+ # Some vector stores require the IDs to be ASCII.
37
+ filename_ascii = self.filename.encode("ascii", "ignore").decode("ascii")
38
+ return {
39
+ # Some vector stores require the IDs to be ASCII.
40
+ "id": f"{filename_ascii}_{self.start_byte}_{self.end_byte}",
41
+ "filename": self.filename,
42
+ "start_byte": self.start_byte,
43
+ "end_byte": self.end_byte,
44
+ # Note to developer: When choosing a large chunk size, you might exceed the vector store's metadata
45
+ # size limit. In that case, you can simply store the start/end bytes above, and fetch the content
46
+ # directly from the repository when needed.
47
+ "text": self.content,
48
+ }
49
+
50
  def populate_content(self, file_content: str):
51
  """Populates the content of the chunk with the file path and file content."""
52
  self._content = (
src/embedder.py CHANGED
@@ -11,6 +11,7 @@ from openai import OpenAI
11
 
12
  from chunker import Chunk, Chunker
13
  from repo_manager import RepoManager
 
14
 
15
  Vector = Tuple[Dict, List[float]] # (metadata, embedding)
16
 
@@ -19,7 +20,7 @@ class BatchEmbedder(ABC):
19
  """Abstract class for batch embedding of a repository."""
20
 
21
  @abstractmethod
22
- def embed_repo(self, chunks_per_batch: int):
23
  """Issues batch embedding jobs for the entire repository."""
24
 
25
  @abstractmethod
@@ -62,7 +63,7 @@ class OpenAIBatchEmbedder(BatchEmbedder):
62
  openai_batch_id = self._issue_job_for_chunks(
63
  sub_batch, batch_id=f"{repo_name}/{len(self.openai_batch_ids)}"
64
  )
65
- self.openai_batch_ids[openai_batch_id] = self._metadata_for_chunks(sub_batch)
66
  if max_embedding_jobs and len(self.openai_batch_ids) >= max_embedding_jobs:
67
  logging.info("Reached the maximum number of embedding jobs. Stopping.")
68
  return
@@ -71,7 +72,7 @@ class OpenAIBatchEmbedder(BatchEmbedder):
71
  # Finally, commit the last batch.
72
  if batch:
73
  openai_batch_id = self._issue_job_for_chunks(batch, batch_id=f"{repo_name}/{len(self.openai_batch_ids)}")
74
- self.openai_batch_ids[openai_batch_id] = self._metadata_for_chunks(batch)
75
  logging.info("Issued %d jobs for %d chunks.", len(self.openai_batch_ids), chunk_count)
76
 
77
  # Save the job IDs to a file, just in case this script is terminated by mistake.
@@ -171,22 +172,70 @@ class OpenAIBatchEmbedder(BatchEmbedder):
171
  },
172
  }
173
 
174
- @staticmethod
175
- def _metadata_for_chunks(chunks):
176
- metadata = []
177
- for chunk in chunks:
178
- filename_ascii = chunk.filename.encode("ascii", "ignore").decode("ascii")
179
- metadata.append(
180
- {
181
- # Some vector stores require the IDs to be ASCII.
182
- "id": f"{filename_ascii}_{chunk.start_byte}_{chunk.end_byte}",
183
- "filename": chunk.filename,
184
- "start_byte": chunk.start_byte,
185
- "end_byte": chunk.end_byte,
186
- # Note to developer: When choosing a large chunk size, you might exceed the vector store's metadata
187
- # size limit. In that case, you can simply store the start/end bytes above, and fetch the content
188
- # directly from the repository when needed.
189
- "text": chunk.content,
190
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
191
  )
192
- return metadata
 
 
 
 
 
 
 
 
 
 
 
 
11
 
12
  from chunker import Chunk, Chunker
13
  from repo_manager import RepoManager
14
+ import marqo
15
 
16
  Vector = Tuple[Dict, List[float]] # (metadata, embedding)
17
 
 
20
  """Abstract class for batch embedding of a repository."""
21
 
22
  @abstractmethod
23
+ def embed_repo(self, chunks_per_batch: int, max_embedding_jobs: int = None):
24
  """Issues batch embedding jobs for the entire repository."""
25
 
26
  @abstractmethod
 
63
  openai_batch_id = self._issue_job_for_chunks(
64
  sub_batch, batch_id=f"{repo_name}/{len(self.openai_batch_ids)}"
65
  )
66
+ self.openai_batch_ids[openai_batch_id] = [chunk.to_dict for chunk in sub_batch]
67
  if max_embedding_jobs and len(self.openai_batch_ids) >= max_embedding_jobs:
68
  logging.info("Reached the maximum number of embedding jobs. Stopping.")
69
  return
 
72
  # Finally, commit the last batch.
73
  if batch:
74
  openai_batch_id = self._issue_job_for_chunks(batch, batch_id=f"{repo_name}/{len(self.openai_batch_ids)}")
75
+ self.openai_batch_ids[openai_batch_id] = [chunk.to_dict for chunk in batch]
76
  logging.info("Issued %d jobs for %d chunks.", len(self.openai_batch_ids), chunk_count)
77
 
78
  # Save the job IDs to a file, just in case this script is terminated by mistake.
 
172
  },
173
  }
174
 
175
+
176
+ class MarqoEmbedder(BatchEmbedder):
177
+ """Embedder that uses the open-source Marqo vector search engine.
178
+
179
+ Embeddings can be stored locally (in which case `url` the constructor should point to localhost) or in the cloud.
180
+ """
181
+
182
+ def __init__(self,
183
+ repo_manager: RepoManager,
184
+ chunker: Chunker,
185
+ index_name: str,
186
+ url: str,
187
+ model="hf/e5-base-v2"):
188
+ self.repo_manager = repo_manager
189
+ self.chunker = chunker
190
+ self.client = marqo.Client(url=url)
191
+ self.index = self.client.index(index_name)
192
+
193
+ all_index_names = [result["indexName"] for result in self.client.get_indexes()["results"]]
194
+ if not index_name in all_index_names:
195
+ self.client.create_index(index_name, model=model)
196
+
197
+ def embed_repo(self, chunks_per_batch: int, max_embedding_jobs: int = None):
198
+ """Issues batch embedding jobs for the entire repository."""
199
+ if chunks_per_batch > 64:
200
+ raise ValueError("Marqo enforces a limit of 64 chunks per batch.")
201
+
202
+ chunk_count = 0
203
+ batch = []
204
+
205
+ for filepath, content in self.repo_manager.walk():
206
+ chunks = self.chunker.chunk(filepath, content)
207
+ chunk_count += len(chunks)
208
+ batch.extend(chunks)
209
+
210
+ if len(batch) > chunks_per_batch:
211
+ for i in range(0, len(batch), chunks_per_batch):
212
+ sub_batch = batch[i : i + chunks_per_batch]
213
+ logging.info("Indexing %d chunks...", len(sub_batch))
214
+ self.index.add_documents(
215
+ documents=[chunk.to_dict for chunk in sub_batch],
216
+ tensor_fields=["text"]
217
+ )
218
+
219
+ if max_embedding_jobs and len(self.openai_batch_ids) >= max_embedding_jobs:
220
+ logging.info("Reached the maximum number of embedding jobs. Stopping.")
221
+ return
222
+ batch = []
223
+
224
+ # Finally, commit the last batch.
225
+ if batch:
226
+ self.index.add_documents(
227
+ documents=[chunk.to_dict for chunk in batch],
228
+ tensor_fields=["text"]
229
  )
230
+ logging.info(f"Successfully embedded {chunk_count} chunks.")
231
+
232
+ def embeddings_are_ready(self) -> bool:
233
+ """Checks whether the batch embedding jobs are done."""
234
+ # Marqo indexes documents synchronously, so once embed_repo() returns, the embeddings are ready.
235
+ return True
236
+
237
+ def download_embeddings(self) -> Generator[Vector, None, None]:
238
+ """Yields (chunk_metadata, embedding) pairs for each chunk in the repository."""
239
+ # Marqo stores embeddings as they are created, so they're already in the vector store. No need to download them
240
+ # as we would with e.g. OpenAI, Cohere, or some other cloud-based embedding service.
241
+ return []
src/index.py CHANGED
@@ -5,7 +5,7 @@ import logging
5
  import time
6
 
7
  from chunker import UniversalChunker
8
- from embedder import OpenAIBatchEmbedder
9
  from repo_manager import RepoManager
10
  from vector_store import PineconeVectorStore
11
 
@@ -29,6 +29,8 @@ def _read_extensions(path):
29
  def main():
30
  parser = argparse.ArgumentParser(description="Batch-embeds a repository")
31
  parser.add_argument("repo_id", help="The ID of the repository to index")
 
 
32
  parser.add_argument(
33
  "--local_dir",
34
  default="repos",
@@ -44,7 +46,7 @@ def main():
44
  "--chunks_per_batch", type=int, default=2000, help="Maximum chunks per batch"
45
  )
46
  parser.add_argument(
47
- "--pinecone_index_name", required=True, help="Pinecone index name"
48
  )
49
  parser.add_argument(
50
  "--include",
@@ -60,10 +62,25 @@ def main():
60
  help="Maximum number of embedding jobs to run. Specifying this might result in "
61
  "indexing only part of the repository, but prevents you from burning through OpenAI credits.",
62
  )
63
-
 
 
 
 
 
 
 
 
 
64
  args = parser.parse_args()
65
 
66
- # Validate the arguments.
 
 
 
 
 
 
67
  if args.tokens_per_chunk > MAX_TOKENS_PER_CHUNK:
68
  parser.error(
69
  f"The maximum number of tokens per chunk is {MAX_TOKENS_PER_CHUNK}."
@@ -91,9 +108,25 @@ def main():
91
 
92
  logging.info("Issuing embedding jobs...")
93
  chunker = UniversalChunker(max_tokens=args.tokens_per_chunk)
94
- embedder = OpenAIBatchEmbedder(repo_manager, chunker, args.local_dir)
 
 
 
 
 
 
 
 
 
 
 
95
  embedder.embed_repo(args.chunks_per_batch, args.max_embedding_jobs)
96
 
 
 
 
 
 
97
  logging.info("Waiting for embeddings to be ready...")
98
  while not embedder.embeddings_are_ready():
99
  logging.info("Sleeping for 30 seconds...")
@@ -102,7 +135,7 @@ def main():
102
  logging.info("Moving embeddings to the vector store...")
103
  # Note to developer: Replace this with your preferred vector store.
104
  vector_store = PineconeVectorStore(
105
- index_name=args.pinecone_index_name,
106
  dimension=OPENAI_EMBEDDING_SIZE,
107
  namespace=repo_manager.repo_id,
108
  )
 
5
  import time
6
 
7
  from chunker import UniversalChunker
8
+ from embedder import OpenAIBatchEmbedder, MarqoEmbedder
9
  from repo_manager import RepoManager
10
  from vector_store import PineconeVectorStore
11
 
 
29
  def main():
30
  parser = argparse.ArgumentParser(description="Batch-embeds a repository")
31
  parser.add_argument("repo_id", help="The ID of the repository to index")
32
+ parser.add_argument("--embedder_type", default="openai", choices=["openai", "marqo"])
33
+ parser.add_argument("--vector_store_type", default="pinecone", choices=["pinecone", "marqo"])
34
  parser.add_argument(
35
  "--local_dir",
36
  default="repos",
 
46
  "--chunks_per_batch", type=int, default=2000, help="Maximum chunks per batch"
47
  )
48
  parser.add_argument(
49
+ "--index_name", required=True, help="Vector store index name"
50
  )
51
  parser.add_argument(
52
  "--include",
 
62
  help="Maximum number of embedding jobs to run. Specifying this might result in "
63
  "indexing only part of the repository, but prevents you from burning through OpenAI credits.",
64
  )
65
+ parser.add_argument(
66
+ "--marqo_url",
67
+ default="http://localhost:8882",
68
+ help="URL for the Marqo server. Required if using Marqo as embedder or vector store.",
69
+ )
70
+ parser.add_argument(
71
+ "--marqo_embedding_model",
72
+ default="hf/e5-base-v2",
73
+ help="The embedding model to use for Marqo.",
74
+ )
75
  args = parser.parse_args()
76
 
77
+ # Validate embedder and vector store compatibility.
78
+ if args.embedder_type == "openai" and args.vector_store_type != "pinecone":
79
+ parser.error("When using OpenAI embedder, the vector store type must be Pinecone.")
80
+ if args.embedder_type == "marqo" and args.vector_store_type != "marqo":
81
+ parser.error("When using the marqo embedder, the vector store type must also be marqo.")
82
+
83
+ # Validate other arguments.
84
  if args.tokens_per_chunk > MAX_TOKENS_PER_CHUNK:
85
  parser.error(
86
  f"The maximum number of tokens per chunk is {MAX_TOKENS_PER_CHUNK}."
 
108
 
109
  logging.info("Issuing embedding jobs...")
110
  chunker = UniversalChunker(max_tokens=args.tokens_per_chunk)
111
+
112
+ if args.embedder_type == "openai":
113
+ embedder = OpenAIBatchEmbedder(repo_manager, chunker, args.local_dir)
114
+ elif args.embedder_type == "marqo":
115
+ embedder = MarqoEmbedder(repo_manager,
116
+ chunker,
117
+ index_name=args.index_name,
118
+ url=args.marqo_url,
119
+ model=args.marqo_embedding_model)
120
+ else:
121
+ raise ValueError(f"Unrecognized embedder type {args.embedder_type}")
122
+
123
  embedder.embed_repo(args.chunks_per_batch, args.max_embedding_jobs)
124
 
125
+ if args.vector_store_type == "marqo":
126
+ # Marqo computes embeddings and stores them in the vector store at once, so we're done.
127
+ logging.info("Done!")
128
+ return
129
+
130
  logging.info("Waiting for embeddings to be ready...")
131
  while not embedder.embeddings_are_ready():
132
  logging.info("Sleeping for 30 seconds...")
 
135
  logging.info("Moving embeddings to the vector store...")
136
  # Note to developer: Replace this with your preferred vector store.
137
  vector_store = PineconeVectorStore(
138
+ index_name=args.index_name,
139
  dimension=OPENAI_EMBEDDING_SIZE,
140
  namespace=repo_manager.repo_id,
141
  )