""" PGC Knowledge Base Ingestion Script (v3 — BGE-M3 1024-dim) =========================================================== Reads pre-chunked semantic passages from data/vector_database.json, generates BAAI/bge-m3 embeddings (GPU-accelerated locally), and uploads to the Supabase knowledge_chunks table for pgvector similarity search. Pipeline: data/vector_database.json | v Embed (BAAI/bge-m3 ONNX, 1024-dim, CUDA on RTX 3050) | v Supabase knowledge_chunks (pgvector, vector(1024)) Run this LOCALLY whenever vector_database.json is regenerated. Run scripts/sql/2026-05-02-bge-m3-cutover.sql in Supabase first if migrating from a previous embedding model. Usage: cd "c:\\\\Users\\\\justi\\\\Jac ITB\\\\TA PGC Smartness\\\\AI Chatbot" pip install fastembed python-dotenv httpx python scripts/ingest_docs.py IMPORTANT: If you change EMBEDDING_MODEL, run the cutover SQL in Supabase and re-run this script — mixed-dimension vectors will silently corrupt search results. """ import json import os import sys import httpx from pathlib import Path from typing import List, Dict from dotenv import load_dotenv # On Windows, register PyTorch's bundled CUDA DLLs (cublasLt64_12.dll, cudnn64_9.dll, # etc.) with the OS DLL search path so that onnxruntime-gpu can load CUDAExecutionProvider # without requiring a system-wide CUDA Toolkit install. if sys.platform == "win32": try: import torch as _torch _torch_lib = Path(_torch.__file__).parent / "lib" if _torch_lib.exists(): os.add_dll_directory(str(_torch_lib)) except (ImportError, OSError): pass from app.knowledge_chunking import build_normalized_child_chunks from app.embedding_runtime import EMBEDDING_DIM, EMBEDDING_MODEL, get_runtime_config # Load .env from project root load_dotenv(Path(__file__).parent.parent / ".env") # ============================================================================= # CONFIGURATION # ============================================================================= SUPABASE_URL = os.getenv("SUPABASE_URL", "") SUPABASE_KEY = os.getenv("SUPABASE_KEY", "") BATCH_SIZE = 20 # Upload batch size (20 rows × 1024-dim floats ≈ 10MB JSON payload) MAX_RETRIES = 3 # Retry failed batches with backoff VECTOR_DB_PATH = Path(__file__).parent.parent / "data" / "vector_database.json" # ============================================================================= # EMBEDDING # ============================================================================= def load_embedding_model(): """ Load the fastembed BGE-M3 ONNX embedding model for local ingestion. Uses GPU (CUDAExecutionProvider) on the RTX 3050 with CPU fallback. The model is registered via add_custom_model with CLS pooling before instantiation. Files are downloaded from HuggingFace on first run (~1.2 GB for full-precision ONNX) and cached locally. Returns: fastembed.TextEmbedding instance. Raises: ImportError: If fastembed is not installed. """ try: from fastembed import TextEmbedding from fastembed.common.model_description import PoolingType, ModelSource except ImportError: print("[ERROR] fastembed not installed. Run: pip install fastembed") sys.exit(1) cfg = get_runtime_config("ingest") print(f"Loading embedding model: {cfg.model_name} (source: {cfg.source})") print(f"Providers: {cfg.providers}") print("(Downloads ~1.2 GB on first run — please wait...)") try: TextEmbedding.add_custom_model( model=cfg.model_name, pooling=PoolingType.CLS, normalization=True, sources=ModelSource(hf=cfg.source), dim=cfg.dim, model_file=cfg.model_file, ) except ValueError as exc: if "already registered" in str(exc).lower(): print("[Info] Custom model already registered — skipping") else: raise try: return TextEmbedding(model_name=cfg.model_name, providers=cfg.providers) except ValueError as exc: if "CUDAExecutionProvider" in str(exc): print("[Warning] CUDAExecutionProvider unavailable — falling back to CPU") return TextEmbedding(model_name=cfg.model_name, providers=["CPUExecutionProvider"]) raise def embed_batch(model, texts: List[str]) -> List[List[float]]: """ Embed a batch of passage strings into 1024-dim float vectors. Uses the native passage_embed() method which applies the correct passage prefix for BGE-M3 at ingest time. This is the counterpart to query_embed() used in app/vector_store.py at search time. Args: model: fastembed.TextEmbedding instance. texts: List of raw chunk texts (prefix applied internally). Returns: List of 1024-element float lists, one per input text. """ # batch_size=32 keeps GPU memory under ~1 GB per batch on the RTX 3050 4 GB. # Default (256) causes OOM when model weights already occupy ~600 MB VRAM. return [emb.tolist() for emb in model.passage_embed(texts, batch_size=32)] def validate_embedding_dimensions(embeddings: List[List[float]]) -> None: """Raise ValueError if any embedding does not have EMBEDDING_DIM dimensions.""" for index, embedding in enumerate(embeddings): if len(embedding) != EMBEDDING_DIM: raise ValueError( f"Embedding #{index} has length {len(embedding)}; expected {EMBEDDING_DIM}" ) def build_upload_rows(children, embeddings: List[List[float]]) -> List[Dict]: """Assemble Supabase row dicts preserving source/filename/page metadata.""" return [ { "content": child.content, "source": child.source, "filename": child.filename, "page_number": child.page_number, "embedding": embedding, } for child, embedding in zip(children, embeddings) ] # ============================================================================= # SUPABASE UPLOAD # ============================================================================= def _get_headers() -> Dict[str, str]: """Return Supabase REST API auth headers.""" return { "apikey": SUPABASE_KEY, "Authorization": f"Bearer {SUPABASE_KEY}", "Content-Type": "application/json", "Prefer": "return=minimal", } def clear_existing_chunks() -> bool: """ Delete all existing rows from knowledge_chunks before re-ingestion. This prevents duplicate chunks from accumulating across multiple runs. Uses a filter that matches all rows (id > 0). Returns: True if successful, False otherwise. """ print("\n[...] Clearing existing knowledge_chunks from Supabase...") with httpx.Client(timeout=30.0) as client: response = client.delete( f"{SUPABASE_URL}/rest/v1/knowledge_chunks", headers={**_get_headers(), "Prefer": "return=minimal"}, params={"id": "gt.0"}, # DELETE WHERE id > 0 (all rows) ) if response.status_code in (200, 204): print("[OK] Existing chunks cleared.") return True else: print(f"[WARN] Clear failed: {response.status_code} — {response.text}") print(" Proceeding with upload (may create duplicates).") return False def upload_chunks_to_supabase(rows: List[Dict]) -> int: """ Batch-upload embedded chunks to the Supabase knowledge_chunks table. Rows are sent in batches of BATCH_SIZE. Failed batches are retried up to MAX_RETRIES times with exponential backoff before being skipped. Args: rows: List of dicts with keys: content, source, filename, page_number, embedding. Returns: Number of successfully uploaded rows. """ import time uploaded = 0 total_batches = (len(rows) + BATCH_SIZE - 1) // BATCH_SIZE for i in range(0, len(rows), BATCH_SIZE): batch = rows[i: i + BATCH_SIZE] batch_num = i // BATCH_SIZE + 1 success = False for attempt in range(1, MAX_RETRIES + 1): try: with httpx.Client(timeout=60.0) as client: response = client.post( f"{SUPABASE_URL}/rest/v1/knowledge_chunks", headers=_get_headers(), json=batch, ) if response.status_code in (200, 201): uploaded += len(batch) print(f" Batch {batch_num}/{total_batches}: {len(batch)} chunks uploaded") success = True break else: print( f" [WARN] Batch {batch_num} attempt {attempt}: " f"{response.status_code} -- {response.text[:80]}" ) except Exception as e: print(f" [WARN] Batch {batch_num} attempt {attempt} error: {e}") if attempt < MAX_RETRIES: wait = 2 ** attempt print(f" Retrying in {wait}s...") time.sleep(wait) if not success: print(f" [FAIL] Batch {batch_num}/{total_batches} failed after {MAX_RETRIES} attempts — skipping") return uploaded # ============================================================================= # MAIN INGESTION PIPELINE # ============================================================================= def main(): print("=" * 60) print(" PGC Knowledge Base Ingestion (v2 - Hybrid Paragraph-Page)") print("=" * 60) # Validate credentials if not SUPABASE_URL or not SUPABASE_KEY: print("\n[ERROR] SUPABASE_URL and SUPABASE_KEY must be set in .env") sys.exit(1) # Validate source file if not VECTOR_DB_PATH.exists(): print(f"\n[ERROR] vector_database.json not found at: {VECTOR_DB_PATH}") print("Run scripts/md_to_json_chunker.py first to generate it.") sys.exit(1) # Load chunks print(f"\nSource: {VECTOR_DB_PATH}") with open(VECTOR_DB_PATH, "r", encoding="utf-8") as f: raw_chunks: List[Dict] = json.load(f) print(f"Loaded {len(raw_chunks)} raw chunks from vector_database.json") # Show source breakdown (before filtering) source_counts: Dict[str, int] = {} for chunk in raw_chunks: src = chunk.get("metadata", {}).get("source_file", "unknown") source_counts[src] = source_counts.get(src, 0) + 1 print("\nSource breakdown (raw):") for src, count in sorted(source_counts.items()): print(f" - {src}: {count} chunks") # Apply chunk quality filter + splitter (via shared normalized pipeline) children, stats = build_normalized_child_chunks(raw_chunks) print(f"[Chunks] {stats.total_raw} raw -> {stats.skipped} skipped (too short) " f"+ {stats.split_count} split (too long) -> {stats.total_final} final chunks") # Load embedding model once print() model = load_embedding_model() print("[OK] Embedding model loaded\n") # Clear existing rows to prevent duplicates clear_existing_chunks() # Embed + format rows print(f"\n[...] Embedding {len(children)} chunks...") texts = [child.content for child in children] # Embed in one pass (fastembed handles internal batching efficiently) embeddings = embed_batch(model, texts) validate_embedding_dimensions(embeddings) print(f"[OK] Embeddings generated ({len(embeddings)} x {EMBEDDING_DIM}-dim)") # Build upload rows rows = build_upload_rows(children, embeddings) # Upload to Supabase print(f"\n[...] Uploading {len(rows)} rows to Supabase (batch size: {BATCH_SIZE})...") uploaded = upload_chunks_to_supabase(rows) # Summary print(f"\n{'=' * 60}") print(f" [OK] Ingestion complete: {uploaded}/{len(rows)} chunks uploaded") print(f" Model: {EMBEDDING_MODEL}") print(f" Table: knowledge_chunks (Supabase pgvector)") print(f"{'=' * 60}\n") if __name__ == "__main__": main()