Spaces:
Running
Running
File size: 5,692 Bytes
559dd34 57007fe 559dd34 5f9eeb4 559dd34 57007fe 559dd34 d5c979a 559dd34 e8553c3 559dd34 57007fe 559dd34 57007fe d5c979a eab5126 d5c979a eab5126 40b4763 eab5126 57007fe eab5126 d5c979a e8553c3 559dd34 e8553c3 57007fe e8553c3 559dd34 57007fe 559dd34 57007fe 559dd34 d5c979a 559dd34 d5c979a 559dd34 e8553c3 57007fe e8553c3 eab5126 559dd34 e8553c3 559dd34 5f9eeb4 559dd34 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | """Runs a batch job to compute embeddings for an entire repo and stores them into a vector store."""
import argparse
import logging
import time
from chunker import UniversalChunker
from embedder import MarqoEmbedder, OpenAIBatchEmbedder
from repo_manager import RepoManager
from vector_store import build_from_args
logging.basicConfig(level=logging.INFO)
MAX_TOKENS_PER_CHUNK = 8192 # The ADA embedder from OpenAI has a maximum of 8192 tokens.
MAX_CHUNKS_PER_BATCH = 2048 # The OpenAI batch embedding API enforces a maximum of 2048 chunks per batch.
MAX_TOKENS_PER_JOB = 3_000_000 # The OpenAI batch embedding API enforces a maximum of 3M tokens processed at once.
def _read_extensions(path):
with open(path, "r") as f:
return {line.strip().lower() for line in f}
def main():
parser = argparse.ArgumentParser(description="Batch-embeds a repository")
parser.add_argument("repo_id", help="The ID of the repository to index")
parser.add_argument("--embedder_type", default="openai", choices=["openai", "marqo"])
parser.add_argument("--vector_store_type", default="pinecone", choices=["pinecone", "marqo"])
parser.add_argument(
"--local_dir",
default="repos",
help="The local directory to store the repository",
)
parser.add_argument(
"--tokens_per_chunk",
type=int,
default=800,
help="https://arxiv.org/pdf/2406.14497 recommends a value between 200-800.",
)
parser.add_argument(
"--chunks_per_batch",
type=int,
default=2000,
help="Maximum chunks per batch. We recommend 2000 for the OpenAI embedder. Marqo enforces a limit of 64.",
)
parser.add_argument("--index_name", required=True, help="Vector store index name")
parser.add_argument(
"--include",
help="Path to a file containing a list of extensions to include. One extension per line.",
)
parser.add_argument(
"--exclude",
default="src/sample-exclude.txt",
help="Path to a file containing a list of extensions to exclude. One extension per line.",
)
parser.add_argument(
"--max_embedding_jobs",
type=int,
help="Maximum number of embedding jobs to run. Specifying this might result in "
"indexing only part of the repository, but prevents you from burning through OpenAI credits.",
)
parser.add_argument(
"--marqo_url",
default="http://localhost:8882",
help="URL for the Marqo server. Required if using Marqo as embedder or vector store.",
)
parser.add_argument(
"--marqo_embedding_model",
default="hf/e5-base-v2",
help="The embedding model to use for Marqo.",
)
args = parser.parse_args()
# Validate embedder and vector store compatibility.
if args.embedder_type == "openai" and args.vector_store_type != "pinecone":
parser.error("When using OpenAI embedder, the vector store type must be Pinecone.")
if args.embedder_type == "marqo" and args.vector_store_type != "marqo":
parser.error("When using the marqo embedder, the vector store type must also be marqo.")
if args.embedder_type == "marqo" and args.chunks_per_batch > 64:
args.chunks_per_batch = 64
logging.warning("Marqo enforces a limit of 64 chunks per batch. Setting --chunks_per_batch to 64.")
# Validate other arguments.
if args.tokens_per_chunk > MAX_TOKENS_PER_CHUNK:
parser.error(f"The maximum number of tokens per chunk is {MAX_TOKENS_PER_CHUNK}.")
if args.chunks_per_batch > MAX_CHUNKS_PER_BATCH:
parser.error(f"The maximum number of chunks per batch is {MAX_CHUNKS_PER_BATCH}.")
if args.tokens_per_chunk * args.chunks_per_batch >= MAX_TOKENS_PER_JOB:
parser.error(f"The maximum number of chunks per job is {MAX_TOKENS_PER_JOB}.")
if args.include and args.exclude:
parser.error("At most one of --include and --exclude can be specified.")
included_extensions = _read_extensions(args.include) if args.include else None
excluded_extensions = _read_extensions(args.exclude) if args.exclude else None
logging.info("Cloning the repository...")
repo_manager = RepoManager(
args.repo_id,
local_dir=args.local_dir,
included_extensions=included_extensions,
excluded_extensions=excluded_extensions,
)
repo_manager.clone()
logging.info("Issuing embedding jobs...")
chunker = UniversalChunker(max_tokens=args.tokens_per_chunk)
if args.embedder_type == "openai":
embedder = OpenAIBatchEmbedder(repo_manager, chunker, args.local_dir)
elif args.embedder_type == "marqo":
embedder = MarqoEmbedder(
repo_manager, chunker, index_name=args.index_name, url=args.marqo_url, model=args.marqo_embedding_model
)
else:
raise ValueError(f"Unrecognized embedder type {args.embedder_type}")
embedder.embed_repo(args.chunks_per_batch, args.max_embedding_jobs)
if args.vector_store_type == "marqo":
# Marqo computes embeddings and stores them in the vector store at once, so we're done.
logging.info("Done!")
return
logging.info("Waiting for embeddings to be ready...")
while not embedder.embeddings_are_ready():
logging.info("Sleeping for 30 seconds...")
time.sleep(30)
logging.info("Moving embeddings to the vector store...")
# Note to developer: Replace this with your preferred vector store.
vector_store = build_from_args(args)
vector_store.ensure_exists()
vector_store.upsert(embedder.download_embeddings())
logging.info("Done!")
if __name__ == "__main__":
main()
|