Spaces:
Sleeping
Sleeping
| # Import necessary libraries | |
| import json # For handling JSON data | |
| import os # For interacting with the operating system | |
| import asyncio # For asynchronous programming | |
| import aiofiles # For asynchronous file operations | |
| import chromadb # For vector database operations | |
| from chromadb.utils import embedding_functions # For creating embedding functions | |
| from termcolor import colored # For colored console output | |
| # Function to load chunked books from a JSON file | |
| async def load_chunked_books(): | |
| # Open and read the JSON file asynchronously | |
| async with aiofiles.open('chunked_books.json', 'r', encoding='utf-8') as file: | |
| return json.loads(await file.read()) | |
| # Function to embed a single chunk into the collection | |
| async def embed_chunk(collection, book_title, i, chunk, semaphore): | |
| # Use a semaphore to limit concurrent operations | |
| async with semaphore: | |
| # Add the chunk to the collection with metadata | |
| collection.add( | |
| documents=[chunk], | |
| metadatas=[{"book_title": book_title, "chunk_index": i}], | |
| ids=[f"{book_title}_chunk_{i}"] | |
| ) | |
| # Function to process all chunks from the books | |
| async def process_chunks(chunked_books, collection, max_books=None): | |
| # Create a semaphore to limit concurrent operations to 30000 | |
| semaphore = asyncio.Semaphore(30000) | |
| tasks = [] | |
| # Iterate through books and their chunks | |
| for i, (book_title, chunks) in enumerate(chunked_books.items()): | |
| # Break if max_books limit is reached | |
| if max_books is not None and i >= max_books: | |
| break | |
| print(colored(f"Processing {book_title}", "green")) | |
| # Create a task for each chunk | |
| for j, chunk in enumerate(chunks): | |
| task = asyncio.create_task(embed_chunk(collection, book_title, j, chunk, semaphore)) | |
| tasks.append(task) | |
| total_chunks = len(tasks) | |
| print(colored(f"Embedding {total_chunks} chunks", "blue")) | |
| # Wait for all tasks to complete | |
| await asyncio.gather(*tasks) | |
| # Main function to orchestrate the embedding process | |
| async def main(): | |
| # Load the chunked books | |
| chunked_books = await load_chunked_books() | |
| # Initialize Chroma client with persistence | |
| client = chromadb.PersistentClient(path="./chroma_db") | |
| # Create OpenAI embedding function | |
| openai_ef = embedding_functions.OpenAIEmbeddingFunction( | |
| api_key=os.getenv("OPENAI_API_KEY"), | |
| model_name="text-embedding-3-large" | |
| ) | |
| # Create or get the collection for storing embeddings | |
| collection = client.get_or_create_collection( | |
| name="books_collection", | |
| embedding_function=openai_ef, | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| # Process and embed all chunks | |
| await process_chunks(chunked_books, collection, max_books=None) | |
| # Calculate and print the total number of embedded chunks | |
| total_chunks = sum(len(chunks) for chunks in chunked_books.values()) | |
| print(f"Embedded and stored {total_chunks} chunks in the Chroma collection.") | |
| # Entry point of the script | |
| if __name__ == "__main__": | |
| # Run the main function asynchronously | |
| asyncio.run(main()) | |