zeta / scripts /reembed.py
rodrigo-moonray
Deploy zeta-only embeddings (NV-Embed-v2 + E5-small)
9b457ed
#!/usr/bin/env python3
"""
Re-embed documents with a different embedding model.
This script re-processes all existing PDFs and stores them in a new
collection using the specified embedding model.
Usage:
python scripts/reembed.py --model sentence-transformers/all-mpnet-base-v2
python scripts/reembed.py --model BAAI/bge-base-en-v1.5 --force
python scripts/reembed.py --list # List available models
"""
import sys
from pathlib import Path
# Add project root to Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
import typer
from typing import Optional
from src.config.settings import (
get_settings,
EMBEDDING_MODELS,
get_collection_name_for_model,
)
from src.ingestion.pdf_loader import PDFLoader
from src.ingestion.chunker import HierarchicalChunker
from src.embedding.embedder import Embedder
from src.embedding.vector_store import VectorStore
from src.utils.logging import setup_logging, get_logger
from tqdm import tqdm
import time
app = typer.Typer(
help="Re-embed documents with a different embedding model",
add_completion=False
)
logger = get_logger(__name__)
@app.command()
def reembed(
model: Optional[str] = typer.Option(
None,
"--model", "-m",
help="Embedding model to use (e.g., sentence-transformers/all-mpnet-base-v2)"
),
force: bool = typer.Option(
False,
"--force", "-f",
help="Force re-embedding even if collection already has data"
),
list_models: bool = typer.Option(
False,
"--list", "-l",
help="List available embedding models"
),
pdf_dir: Optional[Path] = typer.Option(
None,
"--pdf-dir", "-d",
help="Directory containing PDFs (defaults to data/pdfs, scans subdirectories)"
),
):
"""
Re-embed all PDF documents with a specified embedding model.
This creates a new ChromaDB collection with embeddings from the specified model,
allowing you to test different embedding models side-by-side.
"""
settings = get_settings()
setup_logging(log_level=settings.log_level)
# List models if requested
if list_models:
typer.echo("\nAvailable embedding models:\n")
for model_id, config in EMBEDDING_MODELS.items():
status = "ACTIVE" if model_id == settings.embedding_model else ""
collection = get_collection_name_for_model(model_id)
# Check if collection has data
try:
store = VectorStore(embedding_model=model_id)
stats = store.get_collection_stats()
chunks = stats.get("total_chunks", 0)
except:
chunks = 0
typer.echo(f" {config['name']:<15} ({config['dimensions']}d)")
typer.echo(f" ID: {model_id}")
typer.echo(f" Collection: {collection}")
typer.echo(f" Chunks: {chunks:,}")
if status:
typer.secho(f" [{status}]", fg=typer.colors.GREEN)
typer.echo()
raise typer.Exit(0)
# Validate model
if not model:
typer.secho("Error: --model is required", fg=typer.colors.RED, err=True)
typer.echo("\nUse --list to see available models")
raise typer.Exit(1)
if model not in EMBEDDING_MODELS:
typer.secho(f"Error: Unknown model: {model}", fg=typer.colors.RED, err=True)
typer.echo("\nAvailable models:")
for model_id in EMBEDDING_MODELS:
typer.echo(f" - {model_id}")
raise typer.Exit(1)
# Get PDF directory
if pdf_dir is None:
pdf_dir = project_root / "data" / "pdfs"
if not pdf_dir.exists():
typer.secho(f"Error: PDF directory not found: {pdf_dir}", fg=typer.colors.RED, err=True)
raise typer.Exit(1)
# Find all PDFs (including subdirectories)
pdf_files = list(pdf_dir.rglob("*.pdf"))
if not pdf_files:
typer.secho(f"Error: No PDF files found in {pdf_dir}", fg=typer.colors.RED, err=True)
raise typer.Exit(1)
# Check if collection already has data
collection_name = get_collection_name_for_model(model)
vector_store = VectorStore(embedding_model=model)
existing_stats = vector_store.get_collection_stats()
existing_chunks = existing_stats.get("total_chunks", 0)
if existing_chunks > 0 and not force:
typer.secho(
f"\nCollection '{collection_name}' already has {existing_chunks:,} chunks.",
fg=typer.colors.YELLOW
)
typer.echo("Use --force to overwrite existing embeddings.\n")
raise typer.Exit(1)
# Print header
model_config = EMBEDDING_MODELS[model]
typer.echo()
typer.secho("=" * 60, fg=typer.colors.CYAN)
typer.secho(" ZETA RESEARCHER - RE-EMBEDDING", fg=typer.colors.CYAN, bold=True)
typer.secho("=" * 60, fg=typer.colors.CYAN)
typer.echo()
typer.echo(f"Model: {model_config['name']} ({model_config['dimensions']}d)")
typer.echo(f"Model ID: {model}")
typer.echo(f"Collection: {collection_name}")
typer.echo(f"PDF Dir: {pdf_dir}")
typer.echo(f"PDF Files: {len(pdf_files)}")
typer.echo()
# Initialize components with specified model
typer.echo("Loading embedding model...")
pdf_loader = PDFLoader()
chunker = HierarchicalChunker()
embedder = Embedder(model_name=model)
# Force model load to verify it works
try:
_ = embedder.model
typer.secho(f"Model loaded successfully on {embedder.device}", fg=typer.colors.GREEN)
except Exception as e:
typer.secho(f"Error loading model: {e}", fg=typer.colors.RED, err=True)
raise typer.Exit(1)
typer.echo()
# Process each PDF
total_chunks = 0
total_pages = 0
successful = 0
failed = 0
start_time = time.time()
for pdf_path in tqdm(pdf_files, desc="Processing PDFs"):
try:
# Load PDF
document = pdf_loader.load(pdf_path)
total_pages += document.num_pages
# Chunk document
all_chunks = chunker.chunk_document(document)
child_chunks = [c for c in all_chunks if c.chunk_type == "child"]
if not child_chunks:
logger.warning(f"No chunks created for {pdf_path.name}")
continue
# Generate embeddings
embeddings = embedder.encode_batch(child_chunks)
# Store in collection
vector_store.add_chunks(child_chunks, embeddings)
total_chunks += len(child_chunks)
successful += 1
except Exception as e:
logger.error(f"Failed to process {pdf_path.name}: {e}")
failed += 1
continue
# Print summary
duration = time.time() - start_time
typer.echo()
typer.secho("=" * 60, fg=typer.colors.CYAN)
typer.secho(" RE-EMBEDDING COMPLETE", fg=typer.colors.CYAN, bold=True)
typer.secho("=" * 60, fg=typer.colors.CYAN)
typer.echo()
typer.echo(f"Model: {model_config['name']}")
typer.echo(f"Files: {successful}/{len(pdf_files)} successful")
if failed > 0:
typer.secho(f"Failed: {failed}", fg=typer.colors.RED)
typer.echo(f"Total pages: {total_pages:,}")
typer.echo(f"Total chunks: {total_chunks:,}")
typer.echo(f"Duration: {duration:.1f}s")
if total_pages > 0:
pages_per_min = (total_pages / duration) * 60
typer.echo(f"Performance: {pages_per_min:.1f} pages/minute")
typer.secho("=" * 60, fg=typer.colors.CYAN)
typer.echo()
if successful == len(pdf_files):
typer.secho("Re-embedding completed successfully!", fg=typer.colors.GREEN, bold=True)
typer.echo(f"\nYou can now switch to this model in the UI or via API:")
typer.echo(f" POST /api/embedding-models/switch")
typer.echo(f' {{"model_id": "{model}"}}')
else:
typer.secho("Re-embedding completed with some failures", fg=typer.colors.YELLOW, bold=True)
raise typer.Exit(0 if failed == 0 else 1)
if __name__ == "__main__":
app()