AlzDetectAI / vector_store /embedder.py
tpriyadata
add changes in embedded py
6d0d692
"""
vector_store/embedder.py
=========================
ALZDETECT-AI β€” Enterprise Embedder + Pinecone Upserter.
WHAT: Converts 19,771 chunks β†’ 768-dim vectors β†’ Pinecone.
WHY: PubMedBERT understands biomedical language β€”
"pTau217" and "phosphorylated tau" map to nearby vectors.
Keyword search would miss this entirely.
WHO: Run once from Colab T4 GPU notebook.
Called by scripts/run_pipeline.py
WHERE: Reads β†’ data/processed/chunks.json
Writes β†’ Pinecone index (alzdetect)
WHEN: Once per plan. After chunking. Resumes from checkpoint.
WORST-CASE DESIGN:
- Colab timeout mid-run β†’ checkpoint every 1,000 vectors
- Pinecone upsert fail β†’ retry 3 times with backoff
- Wrong embedding dim β†’ caught before first upsert
- Chunk text empty β†’ skipped, logged, counted
- GPU not available β†’ falls back to CPU with warning
"""
import json
import time
import os
from pathlib import Path
from typing import Optional
import numpy as np
from pydantic import BaseModel, Field
from sentence_transformers import SentenceTransformer
from pinecone import Pinecone
from loguru import logger
from tqdm import tqdm
from configs.settings import get_settings
# ── Embed diagnostic model ────────────────────────────────────────
class EmbedDiagnostic(BaseModel):
"""
RE inspector for embedding stage.
Analogy: Barcode printer quality report.
How many barcodes printed, how many failed,
how many are in the scanner (Pinecone).
"""
total_chunks: int
total_embedded: int
total_upserted: int
total_failed: int
embedding_model: str
embedding_dim: int
pinecone_index: str
duration_secs: float
vectors_in_index: int
def log_summary(self) -> None:
logger.info("=" * 60)
logger.info("[EMBED-DIAGNOSTIC] Run complete")
logger.info(f" Total chunks : {self.total_chunks:,}")
logger.info(f" Embedded : {self.total_embedded:,}")
logger.info(f" Upserted : {self.total_upserted:,}")
logger.info(f" Failed : {self.total_failed:,}")
logger.info(f" Model : {self.embedding_model}")
logger.info(f" Dimensions : {self.embedding_dim}")
logger.info(f" Pinecone index : {self.pinecone_index}")
logger.info(f" Vectors in index : {self.vectors_in_index:,}")
logger.info(f" Duration : {self.duration_secs:.1f}s")
logger.info("=" * 60)
# ── Core embedder class ───────────────────────────────────────────
class ChunkEmbedder:
"""
Enterprise embedder + Pinecone upserter.
Analogy: The barcode printing and filing team.
Takes index cards (chunks), prints barcodes (embeddings),
files them in the barcode library (Pinecone).
Usage:
embedder = ChunkEmbedder()
diagnostic = embedder.run()
"""
_MAX_RETRIES: int = 3
_RETRY_BACKOFF: float = 2.0
_UPSERT_BATCH: int = 100 # Pinecone max upsert batch
_CHECKPOINT_EVERY: int = 1000 # save progress every N vectors
def __init__(self) -> None:
self.settings = get_settings()
self._setup_paths()
self._setup_model()
self._setup_pinecone()
def _setup_paths(self) -> None:
"""Paths for chunks input and checkpoint."""
self.chunks_path = self.settings.processed_data_path
self.checkpoint_path = self.chunks_path.parent / "embed_checkpoint.json"
logger.info(f"[EMBEDDER] Chunks path: {self.chunks_path}")
def _setup_model(self) -> None:
"""
Load PubMedBERT embedding model.
Worst-case: GPU not available β†’ CPU fallback with warning.
"""
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
if device == "cpu":
logger.warning(
"[EMBEDDER] GPU not available β€” running on CPU. "
"This will be slow. Use Colab T4 for faster embedding."
)
else:
logger.info(f"[EMBEDDER] GPU detected: {torch.cuda.get_device_name(0)}")
logger.info(f"[EMBEDDER] Loading model: {self.settings.embedding_model}")
self.model = SentenceTransformer(
self.settings.embedding_model,
device=device
)
self.device = device
logger.info(f"[EMBEDDER] Model loaded on {device}")
def _setup_pinecone(self) -> None:
"""Connect to Pinecone and verify index exists."""
pc = Pinecone(api_key=self.settings.pinecone_api_key)
self.index = pc.Index(self.settings.pinecone_index_name)
stats = self.index.describe_index_stats()
logger.info(
f"[EMBEDDER] Pinecone connected | "
f"index: {self.settings.pinecone_index_name} | "
f"existing vectors: {stats.total_vector_count:,}"
)
# Verify dimension matches β€” worst-case: wrong model used before
if stats.dimension != self.settings.embedding_dim:
raise ValueError(
f"[EMBEDDER] Dimension mismatch! "
f"Index has {stats.dimension} dims, "
f"model produces {self.settings.embedding_dim} dims. "
f"Delete index and recreate with correct dimensions."
)
def _load_chunks(self) -> list[dict]:
"""Load chunks from JSON β€” fail fast if missing."""
if not self.chunks_path.exists():
logger.error(f"[EMBEDDER] Chunks not found: {self.chunks_path}")
raise FileNotFoundError(
f"Run chunker first. No file at: {self.chunks_path}"
)
with open(self.chunks_path, encoding="utf-8") as f:
chunks = json.load(f)
logger.info(f"[EMBEDDER] Loaded {len(chunks):,} chunks")
return chunks
def _load_checkpoint(self) -> set[str]:
"""
Load set of already-upserted chunk IDs.
Allows resuming after Colab timeout.
"""
if self.checkpoint_path.exists():
try:
with open(self.checkpoint_path, encoding="utf-8") as f:
done = set(json.load(f))
logger.info(
f"[EMBEDDER] Checkpoint found β€” "
f"{len(done):,} chunks already upserted"
)
return done
except Exception as e:
logger.warning(f"[EMBEDDER] Checkpoint corrupt: {e} β€” starting fresh")
logger.info("[EMBEDDER] No checkpoint β€” starting fresh")
return set()
def _save_checkpoint(self, done_ids: set[str]) -> None:
"""Save checkpoint β€” called every 1,000 upserted chunks."""
try:
with open(self.checkpoint_path, "w", encoding="utf-8") as f:
json.dump(list(done_ids), f)
logger.debug(f"[EMBEDDER] Checkpoint saved: {len(done_ids):,} done")
except Exception as e:
logger.error(f"[EMBEDDER] Checkpoint save failed: {e}")
def _upsert_batch(
self,
batch: list[dict],
embeddings: np.ndarray,
) -> int:
"""
Upsert one batch to Pinecone with retry.
Worst-case: network error β†’ retry 3 times with backoff.
Returns number of successfully upserted vectors.
"""
vectors = []
for chunk, embedding in zip(batch, embeddings):
vectors.append({
"id": chunk["chunk_id"],
"values": embedding.tolist(),
"metadata": {
"pmid": chunk.get("pmid", ""),
"chunk_idx": chunk.get("chunk_idx", 0),
"text": chunk.get("text", "")[:1000],
"title": chunk.get("title", "")[:200],
"year": chunk.get("year"),
"keywords": chunk.get("keywords", [])[:10],
"journal": chunk.get("journal", ""),
"source_query": chunk.get("source_query", "unknown"),
"source": chunk.get("source", "pubmed"),
"word_count": chunk.get("word_count", 0),
}
})
for attempt in range(1, self._MAX_RETRIES + 1):
try:
self.index.upsert(vectors=vectors)
return len(vectors)
except Exception as e:
logger.warning(
f"[EMBEDDER] Upsert attempt {attempt}/{self._MAX_RETRIES} "
f"failed: {e}"
)
if attempt < self._MAX_RETRIES:
time.sleep(self._RETRY_BACKOFF * attempt)
else:
logger.error("[EMBEDDER] Upsert failed after max retries")
return 0
def run(self) -> EmbedDiagnostic:
"""
Main entry point β€” embed all chunks and upsert to Pinecone.
Flow:
1. Load chunks
2. Load checkpoint (resume if crashed)
3. Filter already-done chunks
4. Embed in batches using GPU
5. Upsert each batch to Pinecone
6. Checkpoint every 1,000 vectors
7. Return EmbedDiagnostic
"""
start_time = time.time()
logger.info("[EMBEDDER] Starting enterprise embedding + upsert")
chunks = self._load_chunks()
done_ids = self._load_checkpoint()
# Filter already upserted chunks
remaining = [c for c in chunks if c["chunk_id"] not in done_ids]
logger.info(
f"[EMBEDDER] {len(remaining):,} chunks to embed "
f"({len(done_ids):,} already done)"
)
total_upserted = len(done_ids)
total_failed = 0
# Process in embedding batches
batch_size = self.settings.embedding_batch_size
for i in tqdm(
range(0, len(remaining), batch_size),
desc="Embedding",
unit="batch"
):
batch = remaining[i : i + batch_size]
texts = [c["text"] for c in batch]
# Embed batch
try:
embeddings = self.model.encode(
texts,
batch_size=batch_size,
show_progress_bar=False,
convert_to_numpy=True,
device=self.device,
)
except Exception as e:
logger.error(f"[EMBEDDER] Embedding failed for batch {i}: {e}")
total_failed += len(batch)
continue
# Upsert to Pinecone in sub-batches of 100
for j in range(0, len(batch), self._UPSERT_BATCH):
sub_batch = batch[j : j + self._UPSERT_BATCH]
sub_embeds = embeddings[j : j + self._UPSERT_BATCH]
upserted = self._upsert_batch(sub_batch, sub_embeds)
total_upserted += upserted
if upserted == 0:
total_failed += len(sub_batch)
else:
for chunk in sub_batch:
done_ids.add(chunk["chunk_id"])
# Checkpoint every 1,000
if total_upserted % self._CHECKPOINT_EVERY < batch_size:
self._save_checkpoint(done_ids)
logger.info(
f"[EMBEDDER] Progress: {total_upserted:,} / "
f"{len(chunks):,} upserted"
)
# Final checkpoint
self._save_checkpoint(done_ids)
# Get final vector count from Pinecone
final_stats = self.index.describe_index_stats()
duration = round(time.time() - start_time, 1)
diagnostic = EmbedDiagnostic(
total_chunks = len(chunks),
total_embedded = len(chunks) - total_failed,
total_upserted = total_upserted,
total_failed = total_failed,
embedding_model = self.settings.embedding_model,
embedding_dim = self.settings.embedding_dim,
pinecone_index = self.settings.pinecone_index_name,
duration_secs = duration,
vectors_in_index = final_stats.total_vector_count,
)
diagnostic.log_summary()
return diagnostic
# ── RE probe ──────────────────────────────────────────────────────
def diagnose_embedder() -> None:
"""
RE probe β€” check Pinecone index health without re-embedding.
Usage:
python -c "from vector_store.embedder import diagnose_embedder; diagnose_embedder()"
"""
settings = get_settings()
pc = Pinecone(api_key=settings.pinecone_api_key)
index = pc.Index(settings.pinecone_index_name)
stats = index.describe_index_stats()
logger.info("=" * 60)
logger.info("[RE-EMBEDDER] Pinecone index health check")
logger.info(f" Index name : {settings.pinecone_index_name}")
logger.info(f" Total vectors : {stats.total_vector_count:,}")
logger.info(f" Dimension : {stats.dimension}")
logger.info(f" Namespaces : {dict(stats.namespaces)}")
logger.info("=" * 60)