Spaces:
Running
Running
| """ | |
| Orchestrates embedding generation for all chunks. | |
| FLOW: | |
| 1. Load all chunk files from data/chunks/ | |
| 2. Check cache - skip already-embedded chunks | |
| 3. Embed remaining chunks in batches | |
| 4. Save to cache | |
| 5. Report statistics | |
| """ | |
| import json | |
| import numpy as np | |
| from pathlib import Path | |
| from tqdm import tqdm | |
| from src.embeddings.embedding_model import EmbeddingModel | |
| from src.embeddings.embedding_cache import EmbeddingCache | |
| from src.utils.logger import get_logger | |
| from config.settings import CHUNKS_DIR, EMBEDDING_BATCH_SIZE | |
| logger = get_logger(__name__) | |
| class EmbeddingPipeline: | |
| """ | |
| Loads all semantic chunks and generates embeddings for them | |
| """ | |
| def __init__(self): | |
| self.model = EmbeddingModel() | |
| self.cache = EmbeddingCache() | |
| def load_all_chunks(self) -> tuple[list[str], list[str], list[dict]]: | |
| """ | |
| Load all chunk texts, IDs, and metadata from disk. | |
| Returns: | |
| texts: List of chunk text strings | |
| chunk_ids: List of chunk ID strings (same order) | |
| metadata: List of chunk metadata dicts (same order) | |
| """ | |
| chunk_file = list(CHUNKS_DIR.glob("*semantic.json")) | |
| logger.info(f"Loading chunks from {len(chunk_file)} files...") | |
| texts = [] | |
| chunk_ids = [] | |
| metadata = [] | |
| for cf in tqdm(chunk_file, desc = "Loading chunks"): | |
| with open(cf, "r", encoding = 'utf-8') as f: | |
| chunks = json.load(f) | |
| for chunk in chunks: | |
| texts.append(chunk["text"]) | |
| chunk_ids.append(chunk["chunk_id"]) | |
| metadata.append( | |
| { | |
| k: v for k, v in chunk.items() | |
| if k != "text" # Don't duplicate text in metadata | |
| } | |
| ) | |
| logger.info(f"Loaded {len(texts):,} chunks total") | |
| return texts, chunk_ids, metadata | |
| def run(self) -> dict: | |
| """ | |
| Main pipeline: embed all chunks and save to cache. | |
| Returns: | |
| Statistics dictionary | |
| """ | |
| # Load all chunks from disk | |
| texts, chunk_ids, metadata = self.load_all_chunks() | |
| if not texts: | |
| logger.error("No chunks found. Run run_chunking.py first.") | |
| return {} | |
| # Check if we already have a complete cache | |
| if self.cache.exists(): | |
| self.cache.load() | |
| if self.cache.size == len(texts): | |
| logger.info( | |
| f"Cache complete: {self.cache.size:,} embeddings already exist." | |
| f"Nothing to do." | |
| ) | |
| return { | |
| "total": len(texts), | |
| "embedded": 0, | |
| "from_cache": self.cache.size, | |
| "status": "cache_hit" | |
| } | |
| else: | |
| logger.info( | |
| f"Partial cache: {self.cache.size:,} / {len(texts):,} " | |
| f"Re-embedding all for consistency." | |
| ) | |
| # Embed all chunks | |
| logger.info(f"Embedding {len(texts):,} chunks with BGE-base-en-v1.5...") | |
| logger.info(f"Batch size: {EMBEDDING_BATCH_SIZE}") | |
| logger.info( | |
| f"Estimated time: " | |
| f"{len(texts) / EMBEDDING_BATCH_SIZE * 0.5:.0f} seconds on CPU" | |
| ) | |
| # embed_documents handles batching internally and shows progress bar | |
| embeddings = self.model.embed_documents( | |
| texts, | |
| batch_size = EMBEDDING_BATCH_SIZE, | |
| show_progress = True, | |
| ) | |
| # Verify shape | |
| assert embeddings.shape == (len(texts), 768), ( | |
| f"Expected ({len(texts)}, 768), got {embeddings.shape}" | |
| ) | |
| # Save to disk | |
| self.cache.save(embeddings, chunk_ids) | |
| # Also save metadata separately (needed for Qdrant in Phase 7) | |
| metadata_path = CHUNKS_DIR.parent / "embeddings" / "chunk_metadata.json" | |
| with open(metadata_path, "w", encoding = 'utf-8') as f: | |
| json.dump(metadata, f, ensure_ascii = False) | |
| logger.info(f"Metadata saved to {metadata_path}") | |
| stats = { | |
| "total_chunks": len(texts), | |
| "embedding_shape": list(embeddings.shape), | |
| "embedding_dim": embeddings.shape[1], | |
| "cache_size_mb": round( | |
| embeddings.nbytes / 1024 / 1024, 1 | |
| ), | |
| "status": "complete" | |
| } | |
| logger.info(f"Embedding pipeline completed: {stats}") | |
| return stats |