PGC-AI-Chatbot / scripts /ingest_docs.py
Jacooo's picture
Deploy from GitHub: a6a371c
2c8b2df verified
"""
PGC Knowledge Base Ingestion Script (v3 — BGE-M3 1024-dim)
===========================================================
Reads pre-chunked semantic passages from data/vector_database.json,
generates BAAI/bge-m3 embeddings (GPU-accelerated locally), and uploads
to the Supabase knowledge_chunks table for pgvector similarity search.
Pipeline:
data/vector_database.json
|
v
Embed (BAAI/bge-m3 ONNX, 1024-dim, CUDA on RTX 3050)
|
v
Supabase knowledge_chunks (pgvector, vector(1024))
Run this LOCALLY whenever vector_database.json is regenerated.
Run scripts/sql/2026-05-02-bge-m3-cutover.sql in Supabase first if
migrating from a previous embedding model.
Usage:
cd "c:\\\\Users\\\\justi\\\\Jac ITB\\\\TA PGC Smartness\\\\AI Chatbot"
pip install fastembed python-dotenv httpx
python scripts/ingest_docs.py
IMPORTANT: If you change EMBEDDING_MODEL, run the cutover SQL in Supabase
and re-run this script — mixed-dimension vectors will silently corrupt
search results.
"""
import json
import os
import sys
import httpx
from pathlib import Path
from typing import List, Dict
from dotenv import load_dotenv
# On Windows, register PyTorch's bundled CUDA DLLs (cublasLt64_12.dll, cudnn64_9.dll,
# etc.) with the OS DLL search path so that onnxruntime-gpu can load CUDAExecutionProvider
# without requiring a system-wide CUDA Toolkit install.
if sys.platform == "win32":
try:
import torch as _torch
_torch_lib = Path(_torch.__file__).parent / "lib"
if _torch_lib.exists():
os.add_dll_directory(str(_torch_lib))
except (ImportError, OSError):
pass
from app.knowledge_chunking import build_normalized_child_chunks
from app.embedding_runtime import EMBEDDING_DIM, EMBEDDING_MODEL, get_runtime_config
# Load .env from project root
load_dotenv(Path(__file__).parent.parent / ".env")
# =============================================================================
# CONFIGURATION
# =============================================================================
SUPABASE_URL = os.getenv("SUPABASE_URL", "")
SUPABASE_KEY = os.getenv("SUPABASE_KEY", "")
BATCH_SIZE = 20 # Upload batch size (20 rows × 1024-dim floats ≈ 10MB JSON payload)
MAX_RETRIES = 3 # Retry failed batches with backoff
VECTOR_DB_PATH = Path(__file__).parent.parent / "data" / "vector_database.json"
# =============================================================================
# EMBEDDING
# =============================================================================
def load_embedding_model():
"""
Load the fastembed BGE-M3 ONNX embedding model for local ingestion.
Uses GPU (CUDAExecutionProvider) on the RTX 3050 with CPU fallback.
The model is registered via add_custom_model with CLS pooling before
instantiation. Files are downloaded from HuggingFace on first run
(~1.2 GB for full-precision ONNX) and cached locally.
Returns:
fastembed.TextEmbedding instance.
Raises:
ImportError: If fastembed is not installed.
"""
try:
from fastembed import TextEmbedding
from fastembed.common.model_description import PoolingType, ModelSource
except ImportError:
print("[ERROR] fastembed not installed. Run: pip install fastembed")
sys.exit(1)
cfg = get_runtime_config("ingest")
print(f"Loading embedding model: {cfg.model_name} (source: {cfg.source})")
print(f"Providers: {cfg.providers}")
print("(Downloads ~1.2 GB on first run — please wait...)")
try:
TextEmbedding.add_custom_model(
model=cfg.model_name,
pooling=PoolingType.CLS,
normalization=True,
sources=ModelSource(hf=cfg.source),
dim=cfg.dim,
model_file=cfg.model_file,
)
except ValueError as exc:
if "already registered" in str(exc).lower():
print("[Info] Custom model already registered — skipping")
else:
raise
try:
return TextEmbedding(model_name=cfg.model_name, providers=cfg.providers)
except ValueError as exc:
if "CUDAExecutionProvider" in str(exc):
print("[Warning] CUDAExecutionProvider unavailable — falling back to CPU")
return TextEmbedding(model_name=cfg.model_name, providers=["CPUExecutionProvider"])
raise
def embed_batch(model, texts: List[str]) -> List[List[float]]:
"""
Embed a batch of passage strings into 1024-dim float vectors.
Uses the native passage_embed() method which applies the correct passage
prefix for BGE-M3 at ingest time. This is the counterpart to
query_embed() used in app/vector_store.py at search time.
Args:
model: fastembed.TextEmbedding instance.
texts: List of raw chunk texts (prefix applied internally).
Returns:
List of 1024-element float lists, one per input text.
"""
# batch_size=32 keeps GPU memory under ~1 GB per batch on the RTX 3050 4 GB.
# Default (256) causes OOM when model weights already occupy ~600 MB VRAM.
return [emb.tolist() for emb in model.passage_embed(texts, batch_size=32)]
def validate_embedding_dimensions(embeddings: List[List[float]]) -> None:
"""Raise ValueError if any embedding does not have EMBEDDING_DIM dimensions."""
for index, embedding in enumerate(embeddings):
if len(embedding) != EMBEDDING_DIM:
raise ValueError(
f"Embedding #{index} has length {len(embedding)}; expected {EMBEDDING_DIM}"
)
def build_upload_rows(children, embeddings: List[List[float]]) -> List[Dict]:
"""Assemble Supabase row dicts preserving source/filename/page metadata."""
return [
{
"content": child.content,
"source": child.source,
"filename": child.filename,
"page_number": child.page_number,
"embedding": embedding,
}
for child, embedding in zip(children, embeddings)
]
# =============================================================================
# SUPABASE UPLOAD
# =============================================================================
def _get_headers() -> Dict[str, str]:
"""Return Supabase REST API auth headers."""
return {
"apikey": SUPABASE_KEY,
"Authorization": f"Bearer {SUPABASE_KEY}",
"Content-Type": "application/json",
"Prefer": "return=minimal",
}
def clear_existing_chunks() -> bool:
"""
Delete all existing rows from knowledge_chunks before re-ingestion.
This prevents duplicate chunks from accumulating across multiple runs.
Uses a filter that matches all rows (id > 0).
Returns:
True if successful, False otherwise.
"""
print("\n[...] Clearing existing knowledge_chunks from Supabase...")
with httpx.Client(timeout=30.0) as client:
response = client.delete(
f"{SUPABASE_URL}/rest/v1/knowledge_chunks",
headers={**_get_headers(), "Prefer": "return=minimal"},
params={"id": "gt.0"}, # DELETE WHERE id > 0 (all rows)
)
if response.status_code in (200, 204):
print("[OK] Existing chunks cleared.")
return True
else:
print(f"[WARN] Clear failed: {response.status_code}{response.text}")
print(" Proceeding with upload (may create duplicates).")
return False
def upload_chunks_to_supabase(rows: List[Dict]) -> int:
"""
Batch-upload embedded chunks to the Supabase knowledge_chunks table.
Rows are sent in batches of BATCH_SIZE. Failed batches are retried up
to MAX_RETRIES times with exponential backoff before being skipped.
Args:
rows: List of dicts with keys:
content, source, filename, page_number, embedding.
Returns:
Number of successfully uploaded rows.
"""
import time
uploaded = 0
total_batches = (len(rows) + BATCH_SIZE - 1) // BATCH_SIZE
for i in range(0, len(rows), BATCH_SIZE):
batch = rows[i: i + BATCH_SIZE]
batch_num = i // BATCH_SIZE + 1
success = False
for attempt in range(1, MAX_RETRIES + 1):
try:
with httpx.Client(timeout=60.0) as client:
response = client.post(
f"{SUPABASE_URL}/rest/v1/knowledge_chunks",
headers=_get_headers(),
json=batch,
)
if response.status_code in (200, 201):
uploaded += len(batch)
print(f" Batch {batch_num}/{total_batches}: {len(batch)} chunks uploaded")
success = True
break
else:
print(
f" [WARN] Batch {batch_num} attempt {attempt}: "
f"{response.status_code} -- {response.text[:80]}"
)
except Exception as e:
print(f" [WARN] Batch {batch_num} attempt {attempt} error: {e}")
if attempt < MAX_RETRIES:
wait = 2 ** attempt
print(f" Retrying in {wait}s...")
time.sleep(wait)
if not success:
print(f" [FAIL] Batch {batch_num}/{total_batches} failed after {MAX_RETRIES} attempts — skipping")
return uploaded
# =============================================================================
# MAIN INGESTION PIPELINE
# =============================================================================
def main():
print("=" * 60)
print(" PGC Knowledge Base Ingestion (v2 - Hybrid Paragraph-Page)")
print("=" * 60)
# Validate credentials
if not SUPABASE_URL or not SUPABASE_KEY:
print("\n[ERROR] SUPABASE_URL and SUPABASE_KEY must be set in .env")
sys.exit(1)
# Validate source file
if not VECTOR_DB_PATH.exists():
print(f"\n[ERROR] vector_database.json not found at: {VECTOR_DB_PATH}")
print("Run scripts/md_to_json_chunker.py first to generate it.")
sys.exit(1)
# Load chunks
print(f"\nSource: {VECTOR_DB_PATH}")
with open(VECTOR_DB_PATH, "r", encoding="utf-8") as f:
raw_chunks: List[Dict] = json.load(f)
print(f"Loaded {len(raw_chunks)} raw chunks from vector_database.json")
# Show source breakdown (before filtering)
source_counts: Dict[str, int] = {}
for chunk in raw_chunks:
src = chunk.get("metadata", {}).get("source_file", "unknown")
source_counts[src] = source_counts.get(src, 0) + 1
print("\nSource breakdown (raw):")
for src, count in sorted(source_counts.items()):
print(f" - {src}: {count} chunks")
# Apply chunk quality filter + splitter (via shared normalized pipeline)
children, stats = build_normalized_child_chunks(raw_chunks)
print(f"[Chunks] {stats.total_raw} raw -> {stats.skipped} skipped (too short) "
f"+ {stats.split_count} split (too long) -> {stats.total_final} final chunks")
# Load embedding model once
print()
model = load_embedding_model()
print("[OK] Embedding model loaded\n")
# Clear existing rows to prevent duplicates
clear_existing_chunks()
# Embed + format rows
print(f"\n[...] Embedding {len(children)} chunks...")
texts = [child.content for child in children]
# Embed in one pass (fastembed handles internal batching efficiently)
embeddings = embed_batch(model, texts)
validate_embedding_dimensions(embeddings)
print(f"[OK] Embeddings generated ({len(embeddings)} x {EMBEDDING_DIM}-dim)")
# Build upload rows
rows = build_upload_rows(children, embeddings)
# Upload to Supabase
print(f"\n[...] Uploading {len(rows)} rows to Supabase (batch size: {BATCH_SIZE})...")
uploaded = upload_chunks_to_supabase(rows)
# Summary
print(f"\n{'=' * 60}")
print(f" [OK] Ingestion complete: {uploaded}/{len(rows)} chunks uploaded")
print(f" Model: {EMBEDDING_MODEL}")
print(f" Table: knowledge_chunks (Supabase pgvector)")
print(f"{'=' * 60}\n")
if __name__ == "__main__":
main()