Docker_Deploy / src /python /embedding_pipeline.py
Shaheryar Shah
Add backend files for RAG Chatbot Docker deployment
bec06d9
import asyncio
import logging
from typing import List, Dict, Any
from document_loader import DocumentLoader
from embedder import Embedder
from vector_store import VectorStore
from preprocessor import TextPreprocessor
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EmbeddingPipeline:
"""
Main class to manage the entire embedding pipeline:
1. Load documents
2. Preprocess text
3. Create embeddings
4. Store in vector database
"""
def __init__(self):
self.document_loader = DocumentLoader()
self.embedder = Embedder()
self.vector_store = VectorStore()
self.preprocessor = TextPreprocessor()
async def process_directory(self, directory_path: str, chunk_size: int = 512, overlap: int = 50) -> int:
"""
Process all documents in a directory: load, embed, and store.
Args:
directory_path: Path to the directory containing documents
chunk_size: Size of text chunks
overlap: Overlap between chunks
Returns:
Number of documents processed
"""
# Create the collection if it doesn't exist
self.vector_store.create_collection()
# Load documents from the directory
logger.info(f"Loading documents from {directory_path}")
documents = self.document_loader.load_documents_from_directory(
directory_path,
chunk_size=chunk_size,
overlap=overlap
)
logger.info(f"Loaded {len(documents)} documents")
if not documents:
logger.warning("No documents found to process")
return 0
# Embed the documents
logger.info("Creating embeddings...")
embedded_documents = await self.embedder.embed_documents(documents)
# Filter out any documents that failed to embed
valid_documents = [
doc for doc in embedded_documents
if doc.get('embedding') and len(doc['embedding']) > 0
]
logger.info(f"Successfully embedded {len(valid_documents)} documents")
# Add documents to vector store
if valid_documents:
self.vector_store.add_documents(valid_documents)
logger.info(f"Added {len(valid_documents)} documents to vector store")
return len(valid_documents)
def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""
Search for documents similar to the query.
Args:
query: The search query
top_k: Number of results to return
Returns:
List of matching documents with scores
"""
# Create embedding for the query
query_embedding = asyncio.run(self.embedder.create_embedding(query))
# Search in the vector store
results = self.vector_store.search_similar(query_embedding, top_k)
return results
def main():
"""
Example usage of the embedding pipeline.
"""
import os
import argparse
parser = argparse.ArgumentParser(description="Physical AI Textbook Embedding Pipeline")
parser.add_argument("--directory", type=str, required=True,
help="Directory containing documents to process")
parser.add_argument("--chunk-size", type=int, default=512,
help="Size of text chunks")
parser.add_argument("--overlap", type=int, default=50,
help="Overlap between chunks")
parser.add_argument("--search", type=str,
help="Search query to test the vector store")
args = parser.parse_args()
pipeline = EmbeddingPipeline()
if args.search:
# Perform a search
logger.info(f"Searching for: {args.search}")
results = pipeline.search(args.search)
for i, result in enumerate(results):
print(f"\nResult {i+1} (Score: {result['score']:.4f}):")
print(f"Source: {result['source']}")
print(f"Content preview: {result['content'][:200]}...")
else:
# Process documents in the directory
logger.info("Starting embedding pipeline...")
processed_count = asyncio.run(
pipeline.process_directory(
args.directory,
chunk_size=args.chunk_size,
overlap=args.overlap
)
)
logger.info(f"Processed {processed_count} documents")
# Show document count in the collection
doc_count = pipeline.vector_store.get_all_documents_count()
logger.info(f"Total documents in vector store: {doc_count}")
if __name__ == "__main__":
main()