Spaces:
Running
Running
| """ | |
| vector_store/embedder.py | |
| ========================= | |
| ALZDETECT-AI β Enterprise Embedder + Pinecone Upserter. | |
| WHAT: Converts 19,771 chunks β 768-dim vectors β Pinecone. | |
| WHY: PubMedBERT understands biomedical language β | |
| "pTau217" and "phosphorylated tau" map to nearby vectors. | |
| Keyword search would miss this entirely. | |
| WHO: Run once from Colab T4 GPU notebook. | |
| Called by scripts/run_pipeline.py | |
| WHERE: Reads β data/processed/chunks.json | |
| Writes β Pinecone index (alzdetect) | |
| WHEN: Once per plan. After chunking. Resumes from checkpoint. | |
| WORST-CASE DESIGN: | |
| - Colab timeout mid-run β checkpoint every 1,000 vectors | |
| - Pinecone upsert fail β retry 3 times with backoff | |
| - Wrong embedding dim β caught before first upsert | |
| - Chunk text empty β skipped, logged, counted | |
| - GPU not available β falls back to CPU with warning | |
| """ | |
| import json | |
| import time | |
| import os | |
| from pathlib import Path | |
| from typing import Optional | |
| import numpy as np | |
| from pydantic import BaseModel, Field | |
| from sentence_transformers import SentenceTransformer | |
| from pinecone import Pinecone | |
| from loguru import logger | |
| from tqdm import tqdm | |
| from configs.settings import get_settings | |
| # ββ Embed diagnostic model ββββββββββββββββββββββββββββββββββββββββ | |
| class EmbedDiagnostic(BaseModel): | |
| """ | |
| RE inspector for embedding stage. | |
| Analogy: Barcode printer quality report. | |
| How many barcodes printed, how many failed, | |
| how many are in the scanner (Pinecone). | |
| """ | |
| total_chunks: int | |
| total_embedded: int | |
| total_upserted: int | |
| total_failed: int | |
| embedding_model: str | |
| embedding_dim: int | |
| pinecone_index: str | |
| duration_secs: float | |
| vectors_in_index: int | |
| def log_summary(self) -> None: | |
| logger.info("=" * 60) | |
| logger.info("[EMBED-DIAGNOSTIC] Run complete") | |
| logger.info(f" Total chunks : {self.total_chunks:,}") | |
| logger.info(f" Embedded : {self.total_embedded:,}") | |
| logger.info(f" Upserted : {self.total_upserted:,}") | |
| logger.info(f" Failed : {self.total_failed:,}") | |
| logger.info(f" Model : {self.embedding_model}") | |
| logger.info(f" Dimensions : {self.embedding_dim}") | |
| logger.info(f" Pinecone index : {self.pinecone_index}") | |
| logger.info(f" Vectors in index : {self.vectors_in_index:,}") | |
| logger.info(f" Duration : {self.duration_secs:.1f}s") | |
| logger.info("=" * 60) | |
| # ββ Core embedder class βββββββββββββββββββββββββββββββββββββββββββ | |
| class ChunkEmbedder: | |
| """ | |
| Enterprise embedder + Pinecone upserter. | |
| Analogy: The barcode printing and filing team. | |
| Takes index cards (chunks), prints barcodes (embeddings), | |
| files them in the barcode library (Pinecone). | |
| Usage: | |
| embedder = ChunkEmbedder() | |
| diagnostic = embedder.run() | |
| """ | |
| _MAX_RETRIES: int = 3 | |
| _RETRY_BACKOFF: float = 2.0 | |
| _UPSERT_BATCH: int = 100 # Pinecone max upsert batch | |
| _CHECKPOINT_EVERY: int = 1000 # save progress every N vectors | |
| def __init__(self) -> None: | |
| self.settings = get_settings() | |
| self._setup_paths() | |
| self._setup_model() | |
| self._setup_pinecone() | |
| def _setup_paths(self) -> None: | |
| """Paths for chunks input and checkpoint.""" | |
| self.chunks_path = self.settings.processed_data_path | |
| self.checkpoint_path = self.chunks_path.parent / "embed_checkpoint.json" | |
| logger.info(f"[EMBEDDER] Chunks path: {self.chunks_path}") | |
| def _setup_model(self) -> None: | |
| """ | |
| Load PubMedBERT embedding model. | |
| Worst-case: GPU not available β CPU fallback with warning. | |
| """ | |
| import torch | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| if device == "cpu": | |
| logger.warning( | |
| "[EMBEDDER] GPU not available β running on CPU. " | |
| "This will be slow. Use Colab T4 for faster embedding." | |
| ) | |
| else: | |
| logger.info(f"[EMBEDDER] GPU detected: {torch.cuda.get_device_name(0)}") | |
| logger.info(f"[EMBEDDER] Loading model: {self.settings.embedding_model}") | |
| self.model = SentenceTransformer( | |
| self.settings.embedding_model, | |
| device=device | |
| ) | |
| self.device = device | |
| logger.info(f"[EMBEDDER] Model loaded on {device}") | |
| def _setup_pinecone(self) -> None: | |
| """Connect to Pinecone and verify index exists.""" | |
| pc = Pinecone(api_key=self.settings.pinecone_api_key) | |
| self.index = pc.Index(self.settings.pinecone_index_name) | |
| stats = self.index.describe_index_stats() | |
| logger.info( | |
| f"[EMBEDDER] Pinecone connected | " | |
| f"index: {self.settings.pinecone_index_name} | " | |
| f"existing vectors: {stats.total_vector_count:,}" | |
| ) | |
| # Verify dimension matches β worst-case: wrong model used before | |
| if stats.dimension != self.settings.embedding_dim: | |
| raise ValueError( | |
| f"[EMBEDDER] Dimension mismatch! " | |
| f"Index has {stats.dimension} dims, " | |
| f"model produces {self.settings.embedding_dim} dims. " | |
| f"Delete index and recreate with correct dimensions." | |
| ) | |
| def _load_chunks(self) -> list[dict]: | |
| """Load chunks from JSON β fail fast if missing.""" | |
| if not self.chunks_path.exists(): | |
| logger.error(f"[EMBEDDER] Chunks not found: {self.chunks_path}") | |
| raise FileNotFoundError( | |
| f"Run chunker first. No file at: {self.chunks_path}" | |
| ) | |
| with open(self.chunks_path, encoding="utf-8") as f: | |
| chunks = json.load(f) | |
| logger.info(f"[EMBEDDER] Loaded {len(chunks):,} chunks") | |
| return chunks | |
| def _load_checkpoint(self) -> set[str]: | |
| """ | |
| Load set of already-upserted chunk IDs. | |
| Allows resuming after Colab timeout. | |
| """ | |
| if self.checkpoint_path.exists(): | |
| try: | |
| with open(self.checkpoint_path, encoding="utf-8") as f: | |
| done = set(json.load(f)) | |
| logger.info( | |
| f"[EMBEDDER] Checkpoint found β " | |
| f"{len(done):,} chunks already upserted" | |
| ) | |
| return done | |
| except Exception as e: | |
| logger.warning(f"[EMBEDDER] Checkpoint corrupt: {e} β starting fresh") | |
| logger.info("[EMBEDDER] No checkpoint β starting fresh") | |
| return set() | |
| def _save_checkpoint(self, done_ids: set[str]) -> None: | |
| """Save checkpoint β called every 1,000 upserted chunks.""" | |
| try: | |
| with open(self.checkpoint_path, "w", encoding="utf-8") as f: | |
| json.dump(list(done_ids), f) | |
| logger.debug(f"[EMBEDDER] Checkpoint saved: {len(done_ids):,} done") | |
| except Exception as e: | |
| logger.error(f"[EMBEDDER] Checkpoint save failed: {e}") | |
| def _upsert_batch( | |
| self, | |
| batch: list[dict], | |
| embeddings: np.ndarray, | |
| ) -> int: | |
| """ | |
| Upsert one batch to Pinecone with retry. | |
| Worst-case: network error β retry 3 times with backoff. | |
| Returns number of successfully upserted vectors. | |
| """ | |
| vectors = [] | |
| for chunk, embedding in zip(batch, embeddings): | |
| vectors.append({ | |
| "id": chunk["chunk_id"], | |
| "values": embedding.tolist(), | |
| "metadata": { | |
| "pmid": chunk.get("pmid", ""), | |
| "chunk_idx": chunk.get("chunk_idx", 0), | |
| "text": chunk.get("text", "")[:1000], | |
| "title": chunk.get("title", "")[:200], | |
| "year": chunk.get("year"), | |
| "keywords": chunk.get("keywords", [])[:10], | |
| "journal": chunk.get("journal", ""), | |
| "source_query": chunk.get("source_query", "unknown"), | |
| "source": chunk.get("source", "pubmed"), | |
| "word_count": chunk.get("word_count", 0), | |
| } | |
| }) | |
| for attempt in range(1, self._MAX_RETRIES + 1): | |
| try: | |
| self.index.upsert(vectors=vectors) | |
| return len(vectors) | |
| except Exception as e: | |
| logger.warning( | |
| f"[EMBEDDER] Upsert attempt {attempt}/{self._MAX_RETRIES} " | |
| f"failed: {e}" | |
| ) | |
| if attempt < self._MAX_RETRIES: | |
| time.sleep(self._RETRY_BACKOFF * attempt) | |
| else: | |
| logger.error("[EMBEDDER] Upsert failed after max retries") | |
| return 0 | |
| def run(self) -> EmbedDiagnostic: | |
| """ | |
| Main entry point β embed all chunks and upsert to Pinecone. | |
| Flow: | |
| 1. Load chunks | |
| 2. Load checkpoint (resume if crashed) | |
| 3. Filter already-done chunks | |
| 4. Embed in batches using GPU | |
| 5. Upsert each batch to Pinecone | |
| 6. Checkpoint every 1,000 vectors | |
| 7. Return EmbedDiagnostic | |
| """ | |
| start_time = time.time() | |
| logger.info("[EMBEDDER] Starting enterprise embedding + upsert") | |
| chunks = self._load_chunks() | |
| done_ids = self._load_checkpoint() | |
| # Filter already upserted chunks | |
| remaining = [c for c in chunks if c["chunk_id"] not in done_ids] | |
| logger.info( | |
| f"[EMBEDDER] {len(remaining):,} chunks to embed " | |
| f"({len(done_ids):,} already done)" | |
| ) | |
| total_upserted = len(done_ids) | |
| total_failed = 0 | |
| # Process in embedding batches | |
| batch_size = self.settings.embedding_batch_size | |
| for i in tqdm( | |
| range(0, len(remaining), batch_size), | |
| desc="Embedding", | |
| unit="batch" | |
| ): | |
| batch = remaining[i : i + batch_size] | |
| texts = [c["text"] for c in batch] | |
| # Embed batch | |
| try: | |
| embeddings = self.model.encode( | |
| texts, | |
| batch_size=batch_size, | |
| show_progress_bar=False, | |
| convert_to_numpy=True, | |
| device=self.device, | |
| ) | |
| except Exception as e: | |
| logger.error(f"[EMBEDDER] Embedding failed for batch {i}: {e}") | |
| total_failed += len(batch) | |
| continue | |
| # Upsert to Pinecone in sub-batches of 100 | |
| for j in range(0, len(batch), self._UPSERT_BATCH): | |
| sub_batch = batch[j : j + self._UPSERT_BATCH] | |
| sub_embeds = embeddings[j : j + self._UPSERT_BATCH] | |
| upserted = self._upsert_batch(sub_batch, sub_embeds) | |
| total_upserted += upserted | |
| if upserted == 0: | |
| total_failed += len(sub_batch) | |
| else: | |
| for chunk in sub_batch: | |
| done_ids.add(chunk["chunk_id"]) | |
| # Checkpoint every 1,000 | |
| if total_upserted % self._CHECKPOINT_EVERY < batch_size: | |
| self._save_checkpoint(done_ids) | |
| logger.info( | |
| f"[EMBEDDER] Progress: {total_upserted:,} / " | |
| f"{len(chunks):,} upserted" | |
| ) | |
| # Final checkpoint | |
| self._save_checkpoint(done_ids) | |
| # Get final vector count from Pinecone | |
| final_stats = self.index.describe_index_stats() | |
| duration = round(time.time() - start_time, 1) | |
| diagnostic = EmbedDiagnostic( | |
| total_chunks = len(chunks), | |
| total_embedded = len(chunks) - total_failed, | |
| total_upserted = total_upserted, | |
| total_failed = total_failed, | |
| embedding_model = self.settings.embedding_model, | |
| embedding_dim = self.settings.embedding_dim, | |
| pinecone_index = self.settings.pinecone_index_name, | |
| duration_secs = duration, | |
| vectors_in_index = final_stats.total_vector_count, | |
| ) | |
| diagnostic.log_summary() | |
| return diagnostic | |
| # ββ RE probe ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def diagnose_embedder() -> None: | |
| """ | |
| RE probe β check Pinecone index health without re-embedding. | |
| Usage: | |
| python -c "from vector_store.embedder import diagnose_embedder; diagnose_embedder()" | |
| """ | |
| settings = get_settings() | |
| pc = Pinecone(api_key=settings.pinecone_api_key) | |
| index = pc.Index(settings.pinecone_index_name) | |
| stats = index.describe_index_stats() | |
| logger.info("=" * 60) | |
| logger.info("[RE-EMBEDDER] Pinecone index health check") | |
| logger.info(f" Index name : {settings.pinecone_index_name}") | |
| logger.info(f" Total vectors : {stats.total_vector_count:,}") | |
| logger.info(f" Dimension : {stats.dimension}") | |
| logger.info(f" Namespaces : {dict(stats.namespaces)}") | |
| logger.info("=" * 60) |