Spaces:
Configuration error
Configuration error
| import asyncio | |
| import logging | |
| from typing import List, Dict, Any | |
| from document_loader import DocumentLoader | |
| from embedder import Embedder | |
| from vector_store import VectorStore | |
| from preprocessor import TextPreprocessor | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class EmbeddingPipeline: | |
| """ | |
| Main class to manage the entire embedding pipeline: | |
| 1. Load documents | |
| 2. Preprocess text | |
| 3. Create embeddings | |
| 4. Store in vector database | |
| """ | |
| def __init__(self): | |
| self.document_loader = DocumentLoader() | |
| self.embedder = Embedder() | |
| self.vector_store = VectorStore() | |
| self.preprocessor = TextPreprocessor() | |
| async def process_directory(self, directory_path: str, chunk_size: int = 512, overlap: int = 50) -> int: | |
| """ | |
| Process all documents in a directory: load, embed, and store. | |
| Args: | |
| directory_path: Path to the directory containing documents | |
| chunk_size: Size of text chunks | |
| overlap: Overlap between chunks | |
| Returns: | |
| Number of documents processed | |
| """ | |
| # Create the collection if it doesn't exist | |
| self.vector_store.create_collection() | |
| # Load documents from the directory | |
| logger.info(f"Loading documents from {directory_path}") | |
| documents = self.document_loader.load_documents_from_directory( | |
| directory_path, | |
| chunk_size=chunk_size, | |
| overlap=overlap | |
| ) | |
| logger.info(f"Loaded {len(documents)} documents") | |
| if not documents: | |
| logger.warning("No documents found to process") | |
| return 0 | |
| # Embed the documents | |
| logger.info("Creating embeddings...") | |
| embedded_documents = await self.embedder.embed_documents(documents) | |
| # Filter out any documents that failed to embed | |
| valid_documents = [ | |
| doc for doc in embedded_documents | |
| if doc.get('embedding') and len(doc['embedding']) > 0 | |
| ] | |
| logger.info(f"Successfully embedded {len(valid_documents)} documents") | |
| # Add documents to vector store | |
| if valid_documents: | |
| self.vector_store.add_documents(valid_documents) | |
| logger.info(f"Added {len(valid_documents)} documents to vector store") | |
| return len(valid_documents) | |
| def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Search for documents similar to the query. | |
| Args: | |
| query: The search query | |
| top_k: Number of results to return | |
| Returns: | |
| List of matching documents with scores | |
| """ | |
| # Create embedding for the query | |
| query_embedding = asyncio.run(self.embedder.create_embedding(query)) | |
| # Search in the vector store | |
| results = self.vector_store.search_similar(query_embedding, top_k) | |
| return results | |
| def main(): | |
| """ | |
| Example usage of the embedding pipeline. | |
| """ | |
| import os | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Physical AI Textbook Embedding Pipeline") | |
| parser.add_argument("--directory", type=str, required=True, | |
| help="Directory containing documents to process") | |
| parser.add_argument("--chunk-size", type=int, default=512, | |
| help="Size of text chunks") | |
| parser.add_argument("--overlap", type=int, default=50, | |
| help="Overlap between chunks") | |
| parser.add_argument("--search", type=str, | |
| help="Search query to test the vector store") | |
| args = parser.parse_args() | |
| pipeline = EmbeddingPipeline() | |
| if args.search: | |
| # Perform a search | |
| logger.info(f"Searching for: {args.search}") | |
| results = pipeline.search(args.search) | |
| for i, result in enumerate(results): | |
| print(f"\nResult {i+1} (Score: {result['score']:.4f}):") | |
| print(f"Source: {result['source']}") | |
| print(f"Content preview: {result['content'][:200]}...") | |
| else: | |
| # Process documents in the directory | |
| logger.info("Starting embedding pipeline...") | |
| processed_count = asyncio.run( | |
| pipeline.process_directory( | |
| args.directory, | |
| chunk_size=args.chunk_size, | |
| overlap=args.overlap | |
| ) | |
| ) | |
| logger.info(f"Processed {processed_count} documents") | |
| # Show document count in the collection | |
| doc_count = pipeline.vector_store.get_all_documents_count() | |
| logger.info(f"Total documents in vector store: {doc_count}") | |
| if __name__ == "__main__": | |
| main() |