code-crawler / src /index.py
juliaturc's picture
Clean up the structure of the code.
5f9eeb4
raw
history blame
5.69 kB
"""Runs a batch job to compute embeddings for an entire repo and stores them into a vector store."""
import argparse
import logging
import time
from chunker import UniversalChunker
from embedder import MarqoEmbedder, OpenAIBatchEmbedder
from repo_manager import RepoManager
from vector_store import build_from_args
logging.basicConfig(level=logging.INFO)
MAX_TOKENS_PER_CHUNK = 8192 # The ADA embedder from OpenAI has a maximum of 8192 tokens.
MAX_CHUNKS_PER_BATCH = 2048 # The OpenAI batch embedding API enforces a maximum of 2048 chunks per batch.
MAX_TOKENS_PER_JOB = 3_000_000 # The OpenAI batch embedding API enforces a maximum of 3M tokens processed at once.
def _read_extensions(path):
with open(path, "r") as f:
return {line.strip().lower() for line in f}
def main():
parser = argparse.ArgumentParser(description="Batch-embeds a repository")
parser.add_argument("repo_id", help="The ID of the repository to index")
parser.add_argument("--embedder_type", default="openai", choices=["openai", "marqo"])
parser.add_argument("--vector_store_type", default="pinecone", choices=["pinecone", "marqo"])
parser.add_argument(
"--local_dir",
default="repos",
help="The local directory to store the repository",
)
parser.add_argument(
"--tokens_per_chunk",
type=int,
default=800,
help="https://arxiv.org/pdf/2406.14497 recommends a value between 200-800.",
)
parser.add_argument(
"--chunks_per_batch",
type=int,
default=2000,
help="Maximum chunks per batch. We recommend 2000 for the OpenAI embedder. Marqo enforces a limit of 64.",
)
parser.add_argument("--index_name", required=True, help="Vector store index name")
parser.add_argument(
"--include",
help="Path to a file containing a list of extensions to include. One extension per line.",
)
parser.add_argument(
"--exclude",
default="src/sample-exclude.txt",
help="Path to a file containing a list of extensions to exclude. One extension per line.",
)
parser.add_argument(
"--max_embedding_jobs",
type=int,
help="Maximum number of embedding jobs to run. Specifying this might result in "
"indexing only part of the repository, but prevents you from burning through OpenAI credits.",
)
parser.add_argument(
"--marqo_url",
default="http://localhost:8882",
help="URL for the Marqo server. Required if using Marqo as embedder or vector store.",
)
parser.add_argument(
"--marqo_embedding_model",
default="hf/e5-base-v2",
help="The embedding model to use for Marqo.",
)
args = parser.parse_args()
# Validate embedder and vector store compatibility.
if args.embedder_type == "openai" and args.vector_store_type != "pinecone":
parser.error("When using OpenAI embedder, the vector store type must be Pinecone.")
if args.embedder_type == "marqo" and args.vector_store_type != "marqo":
parser.error("When using the marqo embedder, the vector store type must also be marqo.")
if args.embedder_type == "marqo" and args.chunks_per_batch > 64:
args.chunks_per_batch = 64
logging.warning("Marqo enforces a limit of 64 chunks per batch. Setting --chunks_per_batch to 64.")
# Validate other arguments.
if args.tokens_per_chunk > MAX_TOKENS_PER_CHUNK:
parser.error(f"The maximum number of tokens per chunk is {MAX_TOKENS_PER_CHUNK}.")
if args.chunks_per_batch > MAX_CHUNKS_PER_BATCH:
parser.error(f"The maximum number of chunks per batch is {MAX_CHUNKS_PER_BATCH}.")
if args.tokens_per_chunk * args.chunks_per_batch >= MAX_TOKENS_PER_JOB:
parser.error(f"The maximum number of chunks per job is {MAX_TOKENS_PER_JOB}.")
if args.include and args.exclude:
parser.error("At most one of --include and --exclude can be specified.")
included_extensions = _read_extensions(args.include) if args.include else None
excluded_extensions = _read_extensions(args.exclude) if args.exclude else None
logging.info("Cloning the repository...")
repo_manager = RepoManager(
args.repo_id,
local_dir=args.local_dir,
included_extensions=included_extensions,
excluded_extensions=excluded_extensions,
)
repo_manager.clone()
logging.info("Issuing embedding jobs...")
chunker = UniversalChunker(max_tokens=args.tokens_per_chunk)
if args.embedder_type == "openai":
embedder = OpenAIBatchEmbedder(repo_manager, chunker, args.local_dir)
elif args.embedder_type == "marqo":
embedder = MarqoEmbedder(
repo_manager, chunker, index_name=args.index_name, url=args.marqo_url, model=args.marqo_embedding_model
)
else:
raise ValueError(f"Unrecognized embedder type {args.embedder_type}")
embedder.embed_repo(args.chunks_per_batch, args.max_embedding_jobs)
if args.vector_store_type == "marqo":
# Marqo computes embeddings and stores them in the vector store at once, so we're done.
logging.info("Done!")
return
logging.info("Waiting for embeddings to be ready...")
while not embedder.embeddings_are_ready():
logging.info("Sleeping for 30 seconds...")
time.sleep(30)
logging.info("Moving embeddings to the vector store...")
# Note to developer: Replace this with your preferred vector store.
vector_store = build_from_args(args)
vector_store.ensure_exists()
vector_store.upsert(embedder.download_embeddings())
logging.info("Done!")
if __name__ == "__main__":
main()