File size: 12,188 Bytes
0669579
f2ddb73
 
330dc8c
f2ddb73
 
330dc8c
 
 
 
 
f2ddb73
330dc8c
 
f2ddb73
330dc8c
 
f2ddb73
 
0669579
 
330dc8c
 
0669579
 
f2ddb73
 
 
0669579
 
 
330dc8c
 
0669579
 
 
 
 
f2ddb73
 
 
 
 
 
 
 
 
 
 
 
0fe58ad
f2ddb73
0fe58ad
0669579
 
 
330dc8c
0669579
330dc8c
0669579
 
 
 
f2ddb73
330dc8c
0669579
330dc8c
0669579
 
330dc8c
 
 
0669579
330dc8c
 
f2ddb73
0669579
f2ddb73
 
 
 
0669579
330dc8c
 
0669579
330dc8c
 
 
0669579
 
c40364a
0669579
330dc8c
 
0669579
f2ddb73
 
 
 
c40364a
2c8b2df
 
 
 
 
 
 
 
 
 
 
 
 
 
f2ddb73
 
 
 
 
 
 
0669579
 
 
330dc8c
f2ddb73
ada6aad
f2ddb73
 
 
330dc8c
 
 
c40364a
330dc8c
 
f2ddb73
330dc8c
f2ddb73
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0669579
 
330dc8c
0669579
330dc8c
0669579
330dc8c
 
0669579
 
 
 
 
 
 
 
330dc8c
0669579
330dc8c
0669579
330dc8c
 
0669579
330dc8c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0669579
 
330dc8c
0669579
330dc8c
 
 
 
 
 
 
 
 
 
 
0669579
330dc8c
0669579
330dc8c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0669579
330dc8c
0669579
 
330dc8c
 
 
0669579
 
330dc8c
 
 
0669579
330dc8c
0669579
330dc8c
 
 
 
 
 
 
 
 
 
 
 
ada6aad
330dc8c
ada6aad
330dc8c
ada6aad
330dc8c
ada6aad
330dc8c
 
 
ada6aad
330dc8c
 
 
0fe58ad
 
f2ddb73
 
ada6aad
330dc8c
 
0669579
330dc8c
 
 
 
 
 
0fe58ad
 
330dc8c
 
 
f2ddb73
 
330dc8c
 
f2ddb73
330dc8c
 
 
 
 
 
 
 
 
 
 
0669579
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
"""
PGC Knowledge Base Ingestion Script (v3 β€” BGE-M3 1024-dim)
===========================================================
Reads pre-chunked semantic passages from data/vector_database.json,
generates BAAI/bge-m3 embeddings (GPU-accelerated locally), and uploads
to the Supabase knowledge_chunks table for pgvector similarity search.

Pipeline:
    data/vector_database.json
           |
           v
    Embed (BAAI/bge-m3 ONNX, 1024-dim, CUDA on RTX 3050)
           |
           v
    Supabase knowledge_chunks (pgvector, vector(1024))

Run this LOCALLY whenever vector_database.json is regenerated.
Run scripts/sql/2026-05-02-bge-m3-cutover.sql in Supabase first if
migrating from a previous embedding model.

Usage:
    cd "c:\\\\Users\\\\justi\\\\Jac ITB\\\\TA PGC Smartness\\\\AI Chatbot"
    pip install fastembed python-dotenv httpx
    python scripts/ingest_docs.py

IMPORTANT: If you change EMBEDDING_MODEL, run the cutover SQL in Supabase
and re-run this script β€” mixed-dimension vectors will silently corrupt
search results.
"""

import json
import os
import sys
import httpx
from pathlib import Path
from typing import List, Dict
from dotenv import load_dotenv

# On Windows, register PyTorch's bundled CUDA DLLs (cublasLt64_12.dll, cudnn64_9.dll,
# etc.) with the OS DLL search path so that onnxruntime-gpu can load CUDAExecutionProvider
# without requiring a system-wide CUDA Toolkit install.
if sys.platform == "win32":
    try:
        import torch as _torch
        _torch_lib = Path(_torch.__file__).parent / "lib"
        if _torch_lib.exists():
            os.add_dll_directory(str(_torch_lib))
    except (ImportError, OSError):
        pass

from app.knowledge_chunking import build_normalized_child_chunks
from app.embedding_runtime import EMBEDDING_DIM, EMBEDDING_MODEL, get_runtime_config

# Load .env from project root
load_dotenv(Path(__file__).parent.parent / ".env")

# =============================================================================
# CONFIGURATION
# =============================================================================

SUPABASE_URL = os.getenv("SUPABASE_URL", "")
SUPABASE_KEY = os.getenv("SUPABASE_KEY", "")

BATCH_SIZE = 20   # Upload batch size (20 rows Γ— 1024-dim floats β‰ˆ 10MB JSON payload)
MAX_RETRIES = 3   # Retry failed batches with backoff

VECTOR_DB_PATH = Path(__file__).parent.parent / "data" / "vector_database.json"


# =============================================================================
# EMBEDDING
# =============================================================================

def load_embedding_model():
    """
    Load the fastembed BGE-M3 ONNX embedding model for local ingestion.

    Uses GPU (CUDAExecutionProvider) on the RTX 3050 with CPU fallback.
    The model is registered via add_custom_model with CLS pooling before
    instantiation. Files are downloaded from HuggingFace on first run
    (~1.2 GB for full-precision ONNX) and cached locally.

    Returns:
        fastembed.TextEmbedding instance.

    Raises:
        ImportError: If fastembed is not installed.
    """
    try:
        from fastembed import TextEmbedding
        from fastembed.common.model_description import PoolingType, ModelSource
    except ImportError:
        print("[ERROR] fastembed not installed. Run: pip install fastembed")
        sys.exit(1)

    cfg = get_runtime_config("ingest")
    print(f"Loading embedding model: {cfg.model_name} (source: {cfg.source})")
    print(f"Providers: {cfg.providers}")
    print("(Downloads ~1.2 GB on first run β€” please wait...)")

    try:
        TextEmbedding.add_custom_model(
            model=cfg.model_name,
            pooling=PoolingType.CLS,
            normalization=True,
            sources=ModelSource(hf=cfg.source),
            dim=cfg.dim,
            model_file=cfg.model_file,
        )
    except ValueError as exc:
        if "already registered" in str(exc).lower():
            print("[Info] Custom model already registered β€” skipping")
        else:
            raise
    try:
        return TextEmbedding(model_name=cfg.model_name, providers=cfg.providers)
    except ValueError as exc:
        if "CUDAExecutionProvider" in str(exc):
            print("[Warning] CUDAExecutionProvider unavailable β€” falling back to CPU")
            return TextEmbedding(model_name=cfg.model_name, providers=["CPUExecutionProvider"])
        raise


def embed_batch(model, texts: List[str]) -> List[List[float]]:
    """
    Embed a batch of passage strings into 1024-dim float vectors.

    Uses the native passage_embed() method which applies the correct passage
    prefix for BGE-M3 at ingest time. This is the counterpart to
    query_embed() used in app/vector_store.py at search time.

    Args:
        model: fastembed.TextEmbedding instance.
        texts: List of raw chunk texts (prefix applied internally).

    Returns:
        List of 1024-element float lists, one per input text.
    """
    # batch_size=32 keeps GPU memory under ~1 GB per batch on the RTX 3050 4 GB.
    # Default (256) causes OOM when model weights already occupy ~600 MB VRAM.
    return [emb.tolist() for emb in model.passage_embed(texts, batch_size=32)]


def validate_embedding_dimensions(embeddings: List[List[float]]) -> None:
    """Raise ValueError if any embedding does not have EMBEDDING_DIM dimensions."""
    for index, embedding in enumerate(embeddings):
        if len(embedding) != EMBEDDING_DIM:
            raise ValueError(
                f"Embedding #{index} has length {len(embedding)}; expected {EMBEDDING_DIM}"
            )


def build_upload_rows(children, embeddings: List[List[float]]) -> List[Dict]:
    """Assemble Supabase row dicts preserving source/filename/page metadata."""
    return [
        {
            "content": child.content,
            "source": child.source,
            "filename": child.filename,
            "page_number": child.page_number,
            "embedding": embedding,
        }
        for child, embedding in zip(children, embeddings)
    ]


# =============================================================================
# SUPABASE UPLOAD
# =============================================================================

def _get_headers() -> Dict[str, str]:
    """Return Supabase REST API auth headers."""
    return {
        "apikey": SUPABASE_KEY,
        "Authorization": f"Bearer {SUPABASE_KEY}",
        "Content-Type": "application/json",
        "Prefer": "return=minimal",
    }


def clear_existing_chunks() -> bool:
    """
    Delete all existing rows from knowledge_chunks before re-ingestion.

    This prevents duplicate chunks from accumulating across multiple runs.
    Uses a filter that matches all rows (id > 0).

    Returns:
        True if successful, False otherwise.
    """
    print("\n[...] Clearing existing knowledge_chunks from Supabase...")
    with httpx.Client(timeout=30.0) as client:
        response = client.delete(
            f"{SUPABASE_URL}/rest/v1/knowledge_chunks",
            headers={**_get_headers(), "Prefer": "return=minimal"},
            params={"id": "gt.0"},  # DELETE WHERE id > 0  (all rows)
        )
    if response.status_code in (200, 204):
        print("[OK] Existing chunks cleared.")
        return True
    else:
        print(f"[WARN] Clear failed: {response.status_code} β€” {response.text}")
        print("       Proceeding with upload (may create duplicates).")
        return False


def upload_chunks_to_supabase(rows: List[Dict]) -> int:
    """
    Batch-upload embedded chunks to the Supabase knowledge_chunks table.

    Rows are sent in batches of BATCH_SIZE. Failed batches are retried up
    to MAX_RETRIES times with exponential backoff before being skipped.

    Args:
        rows: List of dicts with keys:
              content, source, filename, page_number, embedding.

    Returns:
        Number of successfully uploaded rows.
    """
    import time

    uploaded = 0
    total_batches = (len(rows) + BATCH_SIZE - 1) // BATCH_SIZE

    for i in range(0, len(rows), BATCH_SIZE):
        batch = rows[i: i + BATCH_SIZE]
        batch_num = i // BATCH_SIZE + 1
        success = False

        for attempt in range(1, MAX_RETRIES + 1):
            try:
                with httpx.Client(timeout=60.0) as client:
                    response = client.post(
                        f"{SUPABASE_URL}/rest/v1/knowledge_chunks",
                        headers=_get_headers(),
                        json=batch,
                    )

                if response.status_code in (200, 201):
                    uploaded += len(batch)
                    print(f"  Batch {batch_num}/{total_batches}: {len(batch)} chunks uploaded")
                    success = True
                    break
                else:
                    print(
                        f"  [WARN] Batch {batch_num} attempt {attempt}: "
                        f"{response.status_code} -- {response.text[:80]}"
                    )
            except Exception as e:
                print(f"  [WARN] Batch {batch_num} attempt {attempt} error: {e}")

            if attempt < MAX_RETRIES:
                wait = 2 ** attempt
                print(f"  Retrying in {wait}s...")
                time.sleep(wait)

        if not success:
            print(f"  [FAIL] Batch {batch_num}/{total_batches} failed after {MAX_RETRIES} attempts β€” skipping")

    return uploaded


# =============================================================================
# MAIN INGESTION PIPELINE
# =============================================================================

def main():
    print("=" * 60)
    print("  PGC Knowledge Base Ingestion (v2 - Hybrid Paragraph-Page)")
    print("=" * 60)

    # Validate credentials
    if not SUPABASE_URL or not SUPABASE_KEY:
        print("\n[ERROR] SUPABASE_URL and SUPABASE_KEY must be set in .env")
        sys.exit(1)

    # Validate source file
    if not VECTOR_DB_PATH.exists():
        print(f"\n[ERROR] vector_database.json not found at: {VECTOR_DB_PATH}")
        print("Run scripts/md_to_json_chunker.py first to generate it.")
        sys.exit(1)

    # Load chunks
    print(f"\nSource: {VECTOR_DB_PATH}")
    with open(VECTOR_DB_PATH, "r", encoding="utf-8") as f:
        raw_chunks: List[Dict] = json.load(f)

    print(f"Loaded {len(raw_chunks)} raw chunks from vector_database.json")

    # Show source breakdown (before filtering)
    source_counts: Dict[str, int] = {}
    for chunk in raw_chunks:
        src = chunk.get("metadata", {}).get("source_file", "unknown")
        source_counts[src] = source_counts.get(src, 0) + 1

    print("\nSource breakdown (raw):")
    for src, count in sorted(source_counts.items()):
        print(f"  - {src}: {count} chunks")

    # Apply chunk quality filter + splitter (via shared normalized pipeline)
    children, stats = build_normalized_child_chunks(raw_chunks)
    print(f"[Chunks] {stats.total_raw} raw -> {stats.skipped} skipped (too short) "
          f"+ {stats.split_count} split (too long) -> {stats.total_final} final chunks")

    # Load embedding model once
    print()
    model = load_embedding_model()
    print("[OK] Embedding model loaded\n")

    # Clear existing rows to prevent duplicates
    clear_existing_chunks()

    # Embed + format rows
    print(f"\n[...] Embedding {len(children)} chunks...")
    texts = [child.content for child in children]

    # Embed in one pass (fastembed handles internal batching efficiently)
    embeddings = embed_batch(model, texts)
    validate_embedding_dimensions(embeddings)
    print(f"[OK] Embeddings generated ({len(embeddings)} x {EMBEDDING_DIM}-dim)")

    # Build upload rows
    rows = build_upload_rows(children, embeddings)

    # Upload to Supabase
    print(f"\n[...] Uploading {len(rows)} rows to Supabase (batch size: {BATCH_SIZE})...")
    uploaded = upload_chunks_to_supabase(rows)

    # Summary
    print(f"\n{'=' * 60}")
    print(f"  [OK] Ingestion complete: {uploaded}/{len(rows)} chunks uploaded")
    print(f"  Model: {EMBEDDING_MODEL}")
    print(f"  Table: knowledge_chunks (Supabase pgvector)")
    print(f"{'=' * 60}\n")


if __name__ == "__main__":
    main()