library_agent / 4_embed_chromadb.py
Narayana02's picture
Upload 10 files
c769427 verified
# Import necessary libraries
import json # For handling JSON data
import os # For interacting with the operating system
import asyncio # For asynchronous programming
import aiofiles # For asynchronous file operations
import chromadb # For vector database operations
from chromadb.utils import embedding_functions # For creating embedding functions
from termcolor import colored # For colored console output
# Function to load chunked books from a JSON file
async def load_chunked_books():
# Open and read the JSON file asynchronously
async with aiofiles.open('chunked_books.json', 'r', encoding='utf-8') as file:
return json.loads(await file.read())
# Function to embed a single chunk into the collection
async def embed_chunk(collection, book_title, i, chunk, semaphore):
# Use a semaphore to limit concurrent operations
async with semaphore:
# Add the chunk to the collection with metadata
collection.add(
documents=[chunk],
metadatas=[{"book_title": book_title, "chunk_index": i}],
ids=[f"{book_title}_chunk_{i}"]
)
# Function to process all chunks from the books
async def process_chunks(chunked_books, collection, max_books=None):
# Create a semaphore to limit concurrent operations to 30000
semaphore = asyncio.Semaphore(30000)
tasks = []
# Iterate through books and their chunks
for i, (book_title, chunks) in enumerate(chunked_books.items()):
# Break if max_books limit is reached
if max_books is not None and i >= max_books:
break
print(colored(f"Processing {book_title}", "green"))
# Create a task for each chunk
for j, chunk in enumerate(chunks):
task = asyncio.create_task(embed_chunk(collection, book_title, j, chunk, semaphore))
tasks.append(task)
total_chunks = len(tasks)
print(colored(f"Embedding {total_chunks} chunks", "blue"))
# Wait for all tasks to complete
await asyncio.gather(*tasks)
# Main function to orchestrate the embedding process
async def main():
# Load the chunked books
chunked_books = await load_chunked_books()
# Initialize Chroma client with persistence
client = chromadb.PersistentClient(path="./chroma_db")
# Create OpenAI embedding function
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="text-embedding-3-large"
)
# Create or get the collection for storing embeddings
collection = client.get_or_create_collection(
name="books_collection",
embedding_function=openai_ef,
metadata={"hnsw:space": "cosine"}
)
# Process and embed all chunks
await process_chunks(chunked_books, collection, max_books=None)
# Calculate and print the total number of embedded chunks
total_chunks = sum(len(chunks) for chunks in chunked_books.values())
print(f"Embedded and stored {total_chunks} chunks in the Chroma collection.")
# Entry point of the script
if __name__ == "__main__":
# Run the main function asynchronously
asyncio.run(main())