#!/usr/bin/env python3 """ Re-embed documents with a different embedding model. This script re-processes all existing PDFs and stores them in a new collection using the specified embedding model. Usage: python scripts/reembed.py --model sentence-transformers/all-mpnet-base-v2 python scripts/reembed.py --model BAAI/bge-base-en-v1.5 --force python scripts/reembed.py --list # List available models """ import sys from pathlib import Path # Add project root to Python path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) import typer from typing import Optional from src.config.settings import ( get_settings, EMBEDDING_MODELS, get_collection_name_for_model, ) from src.ingestion.pdf_loader import PDFLoader from src.ingestion.chunker import HierarchicalChunker from src.embedding.embedder import Embedder from src.embedding.vector_store import VectorStore from src.utils.logging import setup_logging, get_logger from tqdm import tqdm import time app = typer.Typer( help="Re-embed documents with a different embedding model", add_completion=False ) logger = get_logger(__name__) @app.command() def reembed( model: Optional[str] = typer.Option( None, "--model", "-m", help="Embedding model to use (e.g., sentence-transformers/all-mpnet-base-v2)" ), force: bool = typer.Option( False, "--force", "-f", help="Force re-embedding even if collection already has data" ), list_models: bool = typer.Option( False, "--list", "-l", help="List available embedding models" ), pdf_dir: Optional[Path] = typer.Option( None, "--pdf-dir", "-d", help="Directory containing PDFs (defaults to data/pdfs, scans subdirectories)" ), ): """ Re-embed all PDF documents with a specified embedding model. This creates a new ChromaDB collection with embeddings from the specified model, allowing you to test different embedding models side-by-side. """ settings = get_settings() setup_logging(log_level=settings.log_level) # List models if requested if list_models: typer.echo("\nAvailable embedding models:\n") for model_id, config in EMBEDDING_MODELS.items(): status = "ACTIVE" if model_id == settings.embedding_model else "" collection = get_collection_name_for_model(model_id) # Check if collection has data try: store = VectorStore(embedding_model=model_id) stats = store.get_collection_stats() chunks = stats.get("total_chunks", 0) except: chunks = 0 typer.echo(f" {config['name']:<15} ({config['dimensions']}d)") typer.echo(f" ID: {model_id}") typer.echo(f" Collection: {collection}") typer.echo(f" Chunks: {chunks:,}") if status: typer.secho(f" [{status}]", fg=typer.colors.GREEN) typer.echo() raise typer.Exit(0) # Validate model if not model: typer.secho("Error: --model is required", fg=typer.colors.RED, err=True) typer.echo("\nUse --list to see available models") raise typer.Exit(1) if model not in EMBEDDING_MODELS: typer.secho(f"Error: Unknown model: {model}", fg=typer.colors.RED, err=True) typer.echo("\nAvailable models:") for model_id in EMBEDDING_MODELS: typer.echo(f" - {model_id}") raise typer.Exit(1) # Get PDF directory if pdf_dir is None: pdf_dir = project_root / "data" / "pdfs" if not pdf_dir.exists(): typer.secho(f"Error: PDF directory not found: {pdf_dir}", fg=typer.colors.RED, err=True) raise typer.Exit(1) # Find all PDFs (including subdirectories) pdf_files = list(pdf_dir.rglob("*.pdf")) if not pdf_files: typer.secho(f"Error: No PDF files found in {pdf_dir}", fg=typer.colors.RED, err=True) raise typer.Exit(1) # Check if collection already has data collection_name = get_collection_name_for_model(model) vector_store = VectorStore(embedding_model=model) existing_stats = vector_store.get_collection_stats() existing_chunks = existing_stats.get("total_chunks", 0) if existing_chunks > 0 and not force: typer.secho( f"\nCollection '{collection_name}' already has {existing_chunks:,} chunks.", fg=typer.colors.YELLOW ) typer.echo("Use --force to overwrite existing embeddings.\n") raise typer.Exit(1) # Print header model_config = EMBEDDING_MODELS[model] typer.echo() typer.secho("=" * 60, fg=typer.colors.CYAN) typer.secho(" ZETA RESEARCHER - RE-EMBEDDING", fg=typer.colors.CYAN, bold=True) typer.secho("=" * 60, fg=typer.colors.CYAN) typer.echo() typer.echo(f"Model: {model_config['name']} ({model_config['dimensions']}d)") typer.echo(f"Model ID: {model}") typer.echo(f"Collection: {collection_name}") typer.echo(f"PDF Dir: {pdf_dir}") typer.echo(f"PDF Files: {len(pdf_files)}") typer.echo() # Initialize components with specified model typer.echo("Loading embedding model...") pdf_loader = PDFLoader() chunker = HierarchicalChunker() embedder = Embedder(model_name=model) # Force model load to verify it works try: _ = embedder.model typer.secho(f"Model loaded successfully on {embedder.device}", fg=typer.colors.GREEN) except Exception as e: typer.secho(f"Error loading model: {e}", fg=typer.colors.RED, err=True) raise typer.Exit(1) typer.echo() # Process each PDF total_chunks = 0 total_pages = 0 successful = 0 failed = 0 start_time = time.time() for pdf_path in tqdm(pdf_files, desc="Processing PDFs"): try: # Load PDF document = pdf_loader.load(pdf_path) total_pages += document.num_pages # Chunk document all_chunks = chunker.chunk_document(document) child_chunks = [c for c in all_chunks if c.chunk_type == "child"] if not child_chunks: logger.warning(f"No chunks created for {pdf_path.name}") continue # Generate embeddings embeddings = embedder.encode_batch(child_chunks) # Store in collection vector_store.add_chunks(child_chunks, embeddings) total_chunks += len(child_chunks) successful += 1 except Exception as e: logger.error(f"Failed to process {pdf_path.name}: {e}") failed += 1 continue # Print summary duration = time.time() - start_time typer.echo() typer.secho("=" * 60, fg=typer.colors.CYAN) typer.secho(" RE-EMBEDDING COMPLETE", fg=typer.colors.CYAN, bold=True) typer.secho("=" * 60, fg=typer.colors.CYAN) typer.echo() typer.echo(f"Model: {model_config['name']}") typer.echo(f"Files: {successful}/{len(pdf_files)} successful") if failed > 0: typer.secho(f"Failed: {failed}", fg=typer.colors.RED) typer.echo(f"Total pages: {total_pages:,}") typer.echo(f"Total chunks: {total_chunks:,}") typer.echo(f"Duration: {duration:.1f}s") if total_pages > 0: pages_per_min = (total_pages / duration) * 60 typer.echo(f"Performance: {pages_per_min:.1f} pages/minute") typer.secho("=" * 60, fg=typer.colors.CYAN) typer.echo() if successful == len(pdf_files): typer.secho("Re-embedding completed successfully!", fg=typer.colors.GREEN, bold=True) typer.echo(f"\nYou can now switch to this model in the UI or via API:") typer.echo(f" POST /api/embedding-models/switch") typer.echo(f' {{"model_id": "{model}"}}') else: typer.secho("Re-embedding completed with some failures", fg=typer.colors.YELLOW, bold=True) raise typer.Exit(0 if failed == 0 else 1) if __name__ == "__main__": app()