""" vector_store/embedder.py ========================= ALZDETECT-AI — Enterprise Embedder + Pinecone Upserter. WHAT: Converts 19,771 chunks → 768-dim vectors → Pinecone. WHY: PubMedBERT understands biomedical language — "pTau217" and "phosphorylated tau" map to nearby vectors. Keyword search would miss this entirely. WHO: Run once from Colab T4 GPU notebook. Called by scripts/run_pipeline.py WHERE: Reads → data/processed/chunks.json Writes → Pinecone index (alzdetect) WHEN: Once per plan. After chunking. Resumes from checkpoint. WORST-CASE DESIGN: - Colab timeout mid-run → checkpoint every 1,000 vectors - Pinecone upsert fail → retry 3 times with backoff - Wrong embedding dim → caught before first upsert - Chunk text empty → skipped, logged, counted - GPU not available → falls back to CPU with warning """ import json import time import os from pathlib import Path from typing import Optional import numpy as np from pydantic import BaseModel, Field from sentence_transformers import SentenceTransformer from pinecone import Pinecone from loguru import logger from tqdm import tqdm from configs.settings import get_settings # ── Embed diagnostic model ──────────────────────────────────────── class EmbedDiagnostic(BaseModel): """ RE inspector for embedding stage. Analogy: Barcode printer quality report. How many barcodes printed, how many failed, how many are in the scanner (Pinecone). """ total_chunks: int total_embedded: int total_upserted: int total_failed: int embedding_model: str embedding_dim: int pinecone_index: str duration_secs: float vectors_in_index: int def log_summary(self) -> None: logger.info("=" * 60) logger.info("[EMBED-DIAGNOSTIC] Run complete") logger.info(f" Total chunks : {self.total_chunks:,}") logger.info(f" Embedded : {self.total_embedded:,}") logger.info(f" Upserted : {self.total_upserted:,}") logger.info(f" Failed : {self.total_failed:,}") logger.info(f" Model : {self.embedding_model}") logger.info(f" Dimensions : {self.embedding_dim}") logger.info(f" Pinecone index : {self.pinecone_index}") logger.info(f" Vectors in index : {self.vectors_in_index:,}") logger.info(f" Duration : {self.duration_secs:.1f}s") logger.info("=" * 60) # ── Core embedder class ─────────────────────────────────────────── class ChunkEmbedder: """ Enterprise embedder + Pinecone upserter. Analogy: The barcode printing and filing team. Takes index cards (chunks), prints barcodes (embeddings), files them in the barcode library (Pinecone). Usage: embedder = ChunkEmbedder() diagnostic = embedder.run() """ _MAX_RETRIES: int = 3 _RETRY_BACKOFF: float = 2.0 _UPSERT_BATCH: int = 100 # Pinecone max upsert batch _CHECKPOINT_EVERY: int = 1000 # save progress every N vectors def __init__(self) -> None: self.settings = get_settings() self._setup_paths() self._setup_model() self._setup_pinecone() def _setup_paths(self) -> None: """Paths for chunks input and checkpoint.""" self.chunks_path = self.settings.processed_data_path self.checkpoint_path = self.chunks_path.parent / "embed_checkpoint.json" logger.info(f"[EMBEDDER] Chunks path: {self.chunks_path}") def _setup_model(self) -> None: """ Load PubMedBERT embedding model. Worst-case: GPU not available → CPU fallback with warning. """ import torch device = "cuda" if torch.cuda.is_available() else "cpu" if device == "cpu": logger.warning( "[EMBEDDER] GPU not available — running on CPU. " "This will be slow. Use Colab T4 for faster embedding." ) else: logger.info(f"[EMBEDDER] GPU detected: {torch.cuda.get_device_name(0)}") logger.info(f"[EMBEDDER] Loading model: {self.settings.embedding_model}") self.model = SentenceTransformer( self.settings.embedding_model, device=device ) self.device = device logger.info(f"[EMBEDDER] Model loaded on {device}") def _setup_pinecone(self) -> None: """Connect to Pinecone and verify index exists.""" pc = Pinecone(api_key=self.settings.pinecone_api_key) self.index = pc.Index(self.settings.pinecone_index_name) stats = self.index.describe_index_stats() logger.info( f"[EMBEDDER] Pinecone connected | " f"index: {self.settings.pinecone_index_name} | " f"existing vectors: {stats.total_vector_count:,}" ) # Verify dimension matches — worst-case: wrong model used before if stats.dimension != self.settings.embedding_dim: raise ValueError( f"[EMBEDDER] Dimension mismatch! " f"Index has {stats.dimension} dims, " f"model produces {self.settings.embedding_dim} dims. " f"Delete index and recreate with correct dimensions." ) def _load_chunks(self) -> list[dict]: """Load chunks from JSON — fail fast if missing.""" if not self.chunks_path.exists(): logger.error(f"[EMBEDDER] Chunks not found: {self.chunks_path}") raise FileNotFoundError( f"Run chunker first. No file at: {self.chunks_path}" ) with open(self.chunks_path, encoding="utf-8") as f: chunks = json.load(f) logger.info(f"[EMBEDDER] Loaded {len(chunks):,} chunks") return chunks def _load_checkpoint(self) -> set[str]: """ Load set of already-upserted chunk IDs. Allows resuming after Colab timeout. """ if self.checkpoint_path.exists(): try: with open(self.checkpoint_path, encoding="utf-8") as f: done = set(json.load(f)) logger.info( f"[EMBEDDER] Checkpoint found — " f"{len(done):,} chunks already upserted" ) return done except Exception as e: logger.warning(f"[EMBEDDER] Checkpoint corrupt: {e} — starting fresh") logger.info("[EMBEDDER] No checkpoint — starting fresh") return set() def _save_checkpoint(self, done_ids: set[str]) -> None: """Save checkpoint — called every 1,000 upserted chunks.""" try: with open(self.checkpoint_path, "w", encoding="utf-8") as f: json.dump(list(done_ids), f) logger.debug(f"[EMBEDDER] Checkpoint saved: {len(done_ids):,} done") except Exception as e: logger.error(f"[EMBEDDER] Checkpoint save failed: {e}") def _upsert_batch( self, batch: list[dict], embeddings: np.ndarray, ) -> int: """ Upsert one batch to Pinecone with retry. Worst-case: network error → retry 3 times with backoff. Returns number of successfully upserted vectors. """ vectors = [] for chunk, embedding in zip(batch, embeddings): vectors.append({ "id": chunk["chunk_id"], "values": embedding.tolist(), "metadata": { "pmid": chunk.get("pmid", ""), "chunk_idx": chunk.get("chunk_idx", 0), "text": chunk.get("text", "")[:1000], "title": chunk.get("title", "")[:200], "year": chunk.get("year"), "keywords": chunk.get("keywords", [])[:10], "journal": chunk.get("journal", ""), "source_query": chunk.get("source_query", "unknown"), "source": chunk.get("source", "pubmed"), "word_count": chunk.get("word_count", 0), } }) for attempt in range(1, self._MAX_RETRIES + 1): try: self.index.upsert(vectors=vectors) return len(vectors) except Exception as e: logger.warning( f"[EMBEDDER] Upsert attempt {attempt}/{self._MAX_RETRIES} " f"failed: {e}" ) if attempt < self._MAX_RETRIES: time.sleep(self._RETRY_BACKOFF * attempt) else: logger.error("[EMBEDDER] Upsert failed after max retries") return 0 def run(self) -> EmbedDiagnostic: """ Main entry point — embed all chunks and upsert to Pinecone. Flow: 1. Load chunks 2. Load checkpoint (resume if crashed) 3. Filter already-done chunks 4. Embed in batches using GPU 5. Upsert each batch to Pinecone 6. Checkpoint every 1,000 vectors 7. Return EmbedDiagnostic """ start_time = time.time() logger.info("[EMBEDDER] Starting enterprise embedding + upsert") chunks = self._load_chunks() done_ids = self._load_checkpoint() # Filter already upserted chunks remaining = [c for c in chunks if c["chunk_id"] not in done_ids] logger.info( f"[EMBEDDER] {len(remaining):,} chunks to embed " f"({len(done_ids):,} already done)" ) total_upserted = len(done_ids) total_failed = 0 # Process in embedding batches batch_size = self.settings.embedding_batch_size for i in tqdm( range(0, len(remaining), batch_size), desc="Embedding", unit="batch" ): batch = remaining[i : i + batch_size] texts = [c["text"] for c in batch] # Embed batch try: embeddings = self.model.encode( texts, batch_size=batch_size, show_progress_bar=False, convert_to_numpy=True, device=self.device, ) except Exception as e: logger.error(f"[EMBEDDER] Embedding failed for batch {i}: {e}") total_failed += len(batch) continue # Upsert to Pinecone in sub-batches of 100 for j in range(0, len(batch), self._UPSERT_BATCH): sub_batch = batch[j : j + self._UPSERT_BATCH] sub_embeds = embeddings[j : j + self._UPSERT_BATCH] upserted = self._upsert_batch(sub_batch, sub_embeds) total_upserted += upserted if upserted == 0: total_failed += len(sub_batch) else: for chunk in sub_batch: done_ids.add(chunk["chunk_id"]) # Checkpoint every 1,000 if total_upserted % self._CHECKPOINT_EVERY < batch_size: self._save_checkpoint(done_ids) logger.info( f"[EMBEDDER] Progress: {total_upserted:,} / " f"{len(chunks):,} upserted" ) # Final checkpoint self._save_checkpoint(done_ids) # Get final vector count from Pinecone final_stats = self.index.describe_index_stats() duration = round(time.time() - start_time, 1) diagnostic = EmbedDiagnostic( total_chunks = len(chunks), total_embedded = len(chunks) - total_failed, total_upserted = total_upserted, total_failed = total_failed, embedding_model = self.settings.embedding_model, embedding_dim = self.settings.embedding_dim, pinecone_index = self.settings.pinecone_index_name, duration_secs = duration, vectors_in_index = final_stats.total_vector_count, ) diagnostic.log_summary() return diagnostic # ── RE probe ────────────────────────────────────────────────────── def diagnose_embedder() -> None: """ RE probe — check Pinecone index health without re-embedding. Usage: python -c "from vector_store.embedder import diagnose_embedder; diagnose_embedder()" """ settings = get_settings() pc = Pinecone(api_key=settings.pinecone_api_key) index = pc.Index(settings.pinecone_index_name) stats = index.describe_index_stats() logger.info("=" * 60) logger.info("[RE-EMBEDDER] Pinecone index health check") logger.info(f" Index name : {settings.pinecone_index_name}") logger.info(f" Total vectors : {stats.total_vector_count:,}") logger.info(f" Dimension : {stats.dimension}") logger.info(f" Namespaces : {dict(stats.namespaces)}") logger.info("=" * 60)