Spaces:
Running
Running
| """ | |
| PGC Knowledge Base Ingestion Script (v3 — BGE-M3 1024-dim) | |
| =========================================================== | |
| Reads pre-chunked semantic passages from data/vector_database.json, | |
| generates BAAI/bge-m3 embeddings (GPU-accelerated locally), and uploads | |
| to the Supabase knowledge_chunks table for pgvector similarity search. | |
| Pipeline: | |
| data/vector_database.json | |
| | | |
| v | |
| Embed (BAAI/bge-m3 ONNX, 1024-dim, CUDA on RTX 3050) | |
| | | |
| v | |
| Supabase knowledge_chunks (pgvector, vector(1024)) | |
| Run this LOCALLY whenever vector_database.json is regenerated. | |
| Run scripts/sql/2026-05-02-bge-m3-cutover.sql in Supabase first if | |
| migrating from a previous embedding model. | |
| Usage: | |
| cd "c:\\\\Users\\\\justi\\\\Jac ITB\\\\TA PGC Smartness\\\\AI Chatbot" | |
| pip install fastembed python-dotenv httpx | |
| python scripts/ingest_docs.py | |
| IMPORTANT: If you change EMBEDDING_MODEL, run the cutover SQL in Supabase | |
| and re-run this script — mixed-dimension vectors will silently corrupt | |
| search results. | |
| """ | |
| import json | |
| import os | |
| import sys | |
| import httpx | |
| from pathlib import Path | |
| from typing import List, Dict | |
| from dotenv import load_dotenv | |
| # On Windows, register PyTorch's bundled CUDA DLLs (cublasLt64_12.dll, cudnn64_9.dll, | |
| # etc.) with the OS DLL search path so that onnxruntime-gpu can load CUDAExecutionProvider | |
| # without requiring a system-wide CUDA Toolkit install. | |
| if sys.platform == "win32": | |
| try: | |
| import torch as _torch | |
| _torch_lib = Path(_torch.__file__).parent / "lib" | |
| if _torch_lib.exists(): | |
| os.add_dll_directory(str(_torch_lib)) | |
| except (ImportError, OSError): | |
| pass | |
| from app.knowledge_chunking import build_normalized_child_chunks | |
| from app.embedding_runtime import EMBEDDING_DIM, EMBEDDING_MODEL, get_runtime_config | |
| # Load .env from project root | |
| load_dotenv(Path(__file__).parent.parent / ".env") | |
| # ============================================================================= | |
| # CONFIGURATION | |
| # ============================================================================= | |
| SUPABASE_URL = os.getenv("SUPABASE_URL", "") | |
| SUPABASE_KEY = os.getenv("SUPABASE_KEY", "") | |
| BATCH_SIZE = 20 # Upload batch size (20 rows × 1024-dim floats ≈ 10MB JSON payload) | |
| MAX_RETRIES = 3 # Retry failed batches with backoff | |
| VECTOR_DB_PATH = Path(__file__).parent.parent / "data" / "vector_database.json" | |
| # ============================================================================= | |
| # EMBEDDING | |
| # ============================================================================= | |
| def load_embedding_model(): | |
| """ | |
| Load the fastembed BGE-M3 ONNX embedding model for local ingestion. | |
| Uses GPU (CUDAExecutionProvider) on the RTX 3050 with CPU fallback. | |
| The model is registered via add_custom_model with CLS pooling before | |
| instantiation. Files are downloaded from HuggingFace on first run | |
| (~1.2 GB for full-precision ONNX) and cached locally. | |
| Returns: | |
| fastembed.TextEmbedding instance. | |
| Raises: | |
| ImportError: If fastembed is not installed. | |
| """ | |
| try: | |
| from fastembed import TextEmbedding | |
| from fastembed.common.model_description import PoolingType, ModelSource | |
| except ImportError: | |
| print("[ERROR] fastembed not installed. Run: pip install fastembed") | |
| sys.exit(1) | |
| cfg = get_runtime_config("ingest") | |
| print(f"Loading embedding model: {cfg.model_name} (source: {cfg.source})") | |
| print(f"Providers: {cfg.providers}") | |
| print("(Downloads ~1.2 GB on first run — please wait...)") | |
| try: | |
| TextEmbedding.add_custom_model( | |
| model=cfg.model_name, | |
| pooling=PoolingType.CLS, | |
| normalization=True, | |
| sources=ModelSource(hf=cfg.source), | |
| dim=cfg.dim, | |
| model_file=cfg.model_file, | |
| ) | |
| except ValueError as exc: | |
| if "already registered" in str(exc).lower(): | |
| print("[Info] Custom model already registered — skipping") | |
| else: | |
| raise | |
| try: | |
| return TextEmbedding(model_name=cfg.model_name, providers=cfg.providers) | |
| except ValueError as exc: | |
| if "CUDAExecutionProvider" in str(exc): | |
| print("[Warning] CUDAExecutionProvider unavailable — falling back to CPU") | |
| return TextEmbedding(model_name=cfg.model_name, providers=["CPUExecutionProvider"]) | |
| raise | |
| def embed_batch(model, texts: List[str]) -> List[List[float]]: | |
| """ | |
| Embed a batch of passage strings into 1024-dim float vectors. | |
| Uses the native passage_embed() method which applies the correct passage | |
| prefix for BGE-M3 at ingest time. This is the counterpart to | |
| query_embed() used in app/vector_store.py at search time. | |
| Args: | |
| model: fastembed.TextEmbedding instance. | |
| texts: List of raw chunk texts (prefix applied internally). | |
| Returns: | |
| List of 1024-element float lists, one per input text. | |
| """ | |
| # batch_size=32 keeps GPU memory under ~1 GB per batch on the RTX 3050 4 GB. | |
| # Default (256) causes OOM when model weights already occupy ~600 MB VRAM. | |
| return [emb.tolist() for emb in model.passage_embed(texts, batch_size=32)] | |
| def validate_embedding_dimensions(embeddings: List[List[float]]) -> None: | |
| """Raise ValueError if any embedding does not have EMBEDDING_DIM dimensions.""" | |
| for index, embedding in enumerate(embeddings): | |
| if len(embedding) != EMBEDDING_DIM: | |
| raise ValueError( | |
| f"Embedding #{index} has length {len(embedding)}; expected {EMBEDDING_DIM}" | |
| ) | |
| def build_upload_rows(children, embeddings: List[List[float]]) -> List[Dict]: | |
| """Assemble Supabase row dicts preserving source/filename/page metadata.""" | |
| return [ | |
| { | |
| "content": child.content, | |
| "source": child.source, | |
| "filename": child.filename, | |
| "page_number": child.page_number, | |
| "embedding": embedding, | |
| } | |
| for child, embedding in zip(children, embeddings) | |
| ] | |
| # ============================================================================= | |
| # SUPABASE UPLOAD | |
| # ============================================================================= | |
| def _get_headers() -> Dict[str, str]: | |
| """Return Supabase REST API auth headers.""" | |
| return { | |
| "apikey": SUPABASE_KEY, | |
| "Authorization": f"Bearer {SUPABASE_KEY}", | |
| "Content-Type": "application/json", | |
| "Prefer": "return=minimal", | |
| } | |
| def clear_existing_chunks() -> bool: | |
| """ | |
| Delete all existing rows from knowledge_chunks before re-ingestion. | |
| This prevents duplicate chunks from accumulating across multiple runs. | |
| Uses a filter that matches all rows (id > 0). | |
| Returns: | |
| True if successful, False otherwise. | |
| """ | |
| print("\n[...] Clearing existing knowledge_chunks from Supabase...") | |
| with httpx.Client(timeout=30.0) as client: | |
| response = client.delete( | |
| f"{SUPABASE_URL}/rest/v1/knowledge_chunks", | |
| headers={**_get_headers(), "Prefer": "return=minimal"}, | |
| params={"id": "gt.0"}, # DELETE WHERE id > 0 (all rows) | |
| ) | |
| if response.status_code in (200, 204): | |
| print("[OK] Existing chunks cleared.") | |
| return True | |
| else: | |
| print(f"[WARN] Clear failed: {response.status_code} — {response.text}") | |
| print(" Proceeding with upload (may create duplicates).") | |
| return False | |
| def upload_chunks_to_supabase(rows: List[Dict]) -> int: | |
| """ | |
| Batch-upload embedded chunks to the Supabase knowledge_chunks table. | |
| Rows are sent in batches of BATCH_SIZE. Failed batches are retried up | |
| to MAX_RETRIES times with exponential backoff before being skipped. | |
| Args: | |
| rows: List of dicts with keys: | |
| content, source, filename, page_number, embedding. | |
| Returns: | |
| Number of successfully uploaded rows. | |
| """ | |
| import time | |
| uploaded = 0 | |
| total_batches = (len(rows) + BATCH_SIZE - 1) // BATCH_SIZE | |
| for i in range(0, len(rows), BATCH_SIZE): | |
| batch = rows[i: i + BATCH_SIZE] | |
| batch_num = i // BATCH_SIZE + 1 | |
| success = False | |
| for attempt in range(1, MAX_RETRIES + 1): | |
| try: | |
| with httpx.Client(timeout=60.0) as client: | |
| response = client.post( | |
| f"{SUPABASE_URL}/rest/v1/knowledge_chunks", | |
| headers=_get_headers(), | |
| json=batch, | |
| ) | |
| if response.status_code in (200, 201): | |
| uploaded += len(batch) | |
| print(f" Batch {batch_num}/{total_batches}: {len(batch)} chunks uploaded") | |
| success = True | |
| break | |
| else: | |
| print( | |
| f" [WARN] Batch {batch_num} attempt {attempt}: " | |
| f"{response.status_code} -- {response.text[:80]}" | |
| ) | |
| except Exception as e: | |
| print(f" [WARN] Batch {batch_num} attempt {attempt} error: {e}") | |
| if attempt < MAX_RETRIES: | |
| wait = 2 ** attempt | |
| print(f" Retrying in {wait}s...") | |
| time.sleep(wait) | |
| if not success: | |
| print(f" [FAIL] Batch {batch_num}/{total_batches} failed after {MAX_RETRIES} attempts — skipping") | |
| return uploaded | |
| # ============================================================================= | |
| # MAIN INGESTION PIPELINE | |
| # ============================================================================= | |
| def main(): | |
| print("=" * 60) | |
| print(" PGC Knowledge Base Ingestion (v2 - Hybrid Paragraph-Page)") | |
| print("=" * 60) | |
| # Validate credentials | |
| if not SUPABASE_URL or not SUPABASE_KEY: | |
| print("\n[ERROR] SUPABASE_URL and SUPABASE_KEY must be set in .env") | |
| sys.exit(1) | |
| # Validate source file | |
| if not VECTOR_DB_PATH.exists(): | |
| print(f"\n[ERROR] vector_database.json not found at: {VECTOR_DB_PATH}") | |
| print("Run scripts/md_to_json_chunker.py first to generate it.") | |
| sys.exit(1) | |
| # Load chunks | |
| print(f"\nSource: {VECTOR_DB_PATH}") | |
| with open(VECTOR_DB_PATH, "r", encoding="utf-8") as f: | |
| raw_chunks: List[Dict] = json.load(f) | |
| print(f"Loaded {len(raw_chunks)} raw chunks from vector_database.json") | |
| # Show source breakdown (before filtering) | |
| source_counts: Dict[str, int] = {} | |
| for chunk in raw_chunks: | |
| src = chunk.get("metadata", {}).get("source_file", "unknown") | |
| source_counts[src] = source_counts.get(src, 0) + 1 | |
| print("\nSource breakdown (raw):") | |
| for src, count in sorted(source_counts.items()): | |
| print(f" - {src}: {count} chunks") | |
| # Apply chunk quality filter + splitter (via shared normalized pipeline) | |
| children, stats = build_normalized_child_chunks(raw_chunks) | |
| print(f"[Chunks] {stats.total_raw} raw -> {stats.skipped} skipped (too short) " | |
| f"+ {stats.split_count} split (too long) -> {stats.total_final} final chunks") | |
| # Load embedding model once | |
| print() | |
| model = load_embedding_model() | |
| print("[OK] Embedding model loaded\n") | |
| # Clear existing rows to prevent duplicates | |
| clear_existing_chunks() | |
| # Embed + format rows | |
| print(f"\n[...] Embedding {len(children)} chunks...") | |
| texts = [child.content for child in children] | |
| # Embed in one pass (fastembed handles internal batching efficiently) | |
| embeddings = embed_batch(model, texts) | |
| validate_embedding_dimensions(embeddings) | |
| print(f"[OK] Embeddings generated ({len(embeddings)} x {EMBEDDING_DIM}-dim)") | |
| # Build upload rows | |
| rows = build_upload_rows(children, embeddings) | |
| # Upload to Supabase | |
| print(f"\n[...] Uploading {len(rows)} rows to Supabase (batch size: {BATCH_SIZE})...") | |
| uploaded = upload_chunks_to_supabase(rows) | |
| # Summary | |
| print(f"\n{'=' * 60}") | |
| print(f" [OK] Ingestion complete: {uploaded}/{len(rows)} chunks uploaded") | |
| print(f" Model: {EMBEDDING_MODEL}") | |
| print(f" Table: knowledge_chunks (Supabase pgvector)") | |
| print(f"{'=' * 60}\n") | |
| if __name__ == "__main__": | |
| main() | |