"""Ingest pipeline: PDF -> chunks -> embeddings -> Chroma vector store. For each PDF in rag/corpus//: 1. Read with pdfplumber, keep per-page text + page numbers 2. Chunk into 800-token windows with 120-token overlap (page-aware: a chunk records the page range it spans) 3. Embed via Voyage (input_type=document) 4. Store in Chroma with metadata: policy_id, insurer_slug, doc_type, policy_name, page_start, page_end, chunk_idx Run from project root: python -m rag.ingest Idempotent: a chunk is keyed by (policy_id, chunk_idx); re-running won't dup. """ from __future__ import annotations import asyncio import hashlib import json import re import time from pathlib import Path from typing import Iterator import chromadb import pdfplumber from chromadb.config import Settings as ChromaSettings from backend.config import settings from backend.providers.local_embeddings import LocalEmbeddings as ActiveEmbeddings ROOT = settings.CORPUS_DIR.parent.parent # project root # Hard cap on HNSW link_lists.bin size — guards against the ChromaDB bloat # pathology that filled the disk on 2026-05-14 (single file reached 277 GB # logical / 136 GB on-disk for only ~5K chunks). At M=16 link_lists.bin # should be ~1 MB for a corpus this size; 500 MB is 500× safety margin. # When tripped we abort the ingest run loudly rather than letting the index # keep growing into a disk-fill incident. HNSW_BLOAT_THRESHOLD_BYTES = 500 * 1024 * 1024 # 500 MB def _abort_if_hnsw_bloated() -> None: if not settings.VECTORS_DIR.exists(): return for f in settings.VECTORS_DIR.rglob("link_lists.bin"): try: sz = f.stat().st_size except OSError: continue if sz > HNSW_BLOAT_THRESHOLD_BYTES: raise RuntimeError( "ChromaDB HNSW bloat tripwire: " f"{f} is {sz / 1e9:.2f} GB (threshold " f"{HNSW_BLOAT_THRESHOLD_BYTES / 1e6:.0f} MB). Aborting ingest. " "Delete rag/vectors and re-clone the dataset from HF Hub, then " "investigate the ChromaDB version / batch-size that triggered the bloat." ) # ---------- chunking ---------- # Rough token estimate: 1 token ~= 4 chars (English/legal text) CHARS_PER_TOKEN = 4 CHUNK_CHARS = settings.CHUNK_TOKENS * CHARS_PER_TOKEN # ~3200 OVERLAP_CHARS = settings.CHUNK_OVERLAP_TOKENS * CHARS_PER_TOKEN # ~480 def slugify(s: str) -> str: s = re.sub(r"[^a-zA-Z0-9]+", "-", s.lower()).strip("-") return re.sub(r"-+", "-", s) def policy_id_for(pdf_path: Path) -> str: """Derive a stable policy_id from path: __""" insurer = pdf_path.parent.name stem = pdf_path.stem # e.g. family-health-optima__wordings return f"{insurer}__{stem}" def read_pdf_pages(pdf_path: Path) -> list[tuple[int, str]]: """Return [(page_number, page_text), ...]. Page numbers are 1-indexed.""" out: list[tuple[int, str]] = [] with pdfplumber.open(pdf_path) as pdf: for i, page in enumerate(pdf.pages, start=1): text = page.extract_text() or "" # Normalize whitespace lightly text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n{3,}", "\n\n", text) out.append((i, text)) return out def chunk_pages( pages: list[tuple[int, str]], target_chars: int = CHUNK_CHARS, overlap_chars: int = OVERLAP_CHARS, ) -> Iterator[dict]: """Yield chunks with page-range metadata. Strategy: - Concatenate all page texts with markers so we can map char positions back to pages - Slide a window of target_chars across the joined text with overlap """ # Build the joined text with page boundaries we can recover later page_markers: list[tuple[int, int]] = [] # (start_char, page_no) joined = [] pos = 0 for page_no, text in pages: page_markers.append((pos, page_no)) joined.append(text) joined.append("\n\n") pos += len(text) + 2 full_text = "".join(joined) def page_at(char_pos: int) -> int: # Binary search would be faster; tiny lists so linear is fine last = page_markers[0][1] for start, p in page_markers: if start > char_pos: return last last = p return last if not full_text.strip(): return start = 0 chunk_idx = 0 n = len(full_text) while start < n: end = min(start + target_chars, n) # Prefer to end on a sentence boundary if one is within ~200 chars of end if end < n: window = full_text[end - 200 : end + 200] local_dot = window.rfind(". ") if local_dot != -1: # Shift end to that boundary end = (end - 200) + local_dot + 2 text = full_text[start:end].strip() if text: yield { "chunk_idx": chunk_idx, "text": text, "page_start": page_at(start), "page_end": page_at(min(end - 1, n - 1)), "char_start": start, "char_end": end, } chunk_idx += 1 if end >= n: break start = max(end - overlap_chars, start + 1) # ---------- Chroma persistence ---------- def get_chroma_collection(): client = chromadb.PersistentClient( path=str(settings.VECTORS_DIR), settings=ChromaSettings(anonymized_telemetry=False), ) return client.get_or_create_collection( name="policies", metadata={"hnsw:space": "cosine"}, ) def get_quarantine_collection(): """SEPARATE Chroma collection for user-uploaded PDFs. Physical isolation, not metadata-tag isolation: user uploads can NEVER accidentally surface in another user's policy retrieval because they're in a different on-disk index. Retrieval queries this collection only when the request supplies a session_id matching the upload's session. """ client = chromadb.PersistentClient( path=str(settings.VECTORS_DIR), settings=ChromaSettings(anonymized_telemetry=False), ) return client.get_or_create_collection( name="user_uploads_quarantine", metadata={"hnsw:space": "cosine"}, ) # Top-level corpus directories that the main `policies` collection ingests. # Anything outside this allowlist must be rejected loudly — prevents a stray # regulatory PDF landing in an insurer folder, or vice-versa. ALLOWED_TOP_LEVEL_DIRS = { "regulatory", # IRDAI / Govt mandates → doc_type='regulatory' "acko", "aditya-birla", "bajaj-allianz", "care-health", "cholamandalam", "go-digit", "hdfc-ergo", "icici-lombard", "iffco-tokio", "manipalcigna", "national-insurance", "new-india", "niva-bupa", "oriental-insurance", "reliance-general", "royal-sundaram", "sbi-general", "star-health", "tata-aig", "united-india", # `user-upload/` is intentionally NOT in this set — user uploads go to # the quarantine collection, never the main `policies` collection. } # ---------- pipeline ---------- def discover_pdfs() -> list[Path]: """All PDFs under rag/corpus/*/*.pdf, in deterministic order. Hardened: any PDF whose parent dir name isn't in ALLOWED_TOP_LEVEL_DIRS is REJECTED with a loud RuntimeError. Prevents accidental cross- contamination — a regulatory PDF in an insurer folder won't get tagged as a policy, a user upload won't sneak into the main corpus. """ pdfs: list[Path] = [] rejected: list[str] = [] for insurer_dir in sorted(settings.CORPUS_DIR.iterdir()): if not insurer_dir.is_dir(): continue if insurer_dir.name == "user-upload": continue # quarantine path — handled by /api/upload-policy if insurer_dir.name not in ALLOWED_TOP_LEVEL_DIRS: rejected.append(insurer_dir.name) continue for pdf in sorted(insurer_dir.glob("*.pdf")): pdfs.append(pdf) if rejected: raise RuntimeError( "Unknown top-level corpus dirs found (not in ALLOWED_TOP_LEVEL_DIRS): " f"{rejected}. Either add them to the allowlist or move them out of " f"{settings.CORPUS_DIR}." ) return pdfs def load_manifest() -> dict: """Map URL -> insurer_name + policy_name + doc_type from _manifest.json.""" mf = settings.CORPUS_DIR / "_manifest.json" if not mf.exists(): return {} data = json.loads(mf.read_text()) out = {} for r in data.get("results", []): if not r.get("ok"): continue # local_path is relative to project root out[r["local_path"]] = r return out async def ingest_one( pdf_path: Path, manifest_entry: dict, embedder: ActiveEmbeddings, collection, ): policy_id = policy_id_for(pdf_path) insurer_slug = pdf_path.parent.name policy_name = manifest_entry.get("policy_name", pdf_path.stem) doc_type = manifest_entry.get("doc_type", "unknown") source_url = manifest_entry.get("url", "") # PDFs under rag/corpus/regulatory/ are IRDAI / Govt mandates. Tag them # so retrieve.py can apply the regulatory-intent boost. if pdf_path.parent.name == "regulatory" or insurer_slug == "regulatory": doc_type = "regulatory" # Skip if already ingested existing = collection.get(where={"policy_id": policy_id}, limit=1) if existing and existing.get("ids"): print(f" SKIP (already ingested): {policy_id}") return 0 try: pages = read_pdf_pages(pdf_path) except Exception as e: print(f" FAIL pdfplumber: {policy_id} | {type(e).__name__}: {e}") return 0 chunks = list(chunk_pages(pages)) if not chunks: print(f" EMPTY: {policy_id} (no text extracted)") return 0 texts = [c["text"] for c in chunks] try: vectors = await embedder.embed(texts, input_type="document") except Exception as e: print(f" FAIL embed: {policy_id} | {type(e).__name__}: {e}") return 0 ids = [f"{policy_id}::chunk{c['chunk_idx']}" for c in chunks] metadatas = [ { "policy_id": policy_id, "insurer_slug": insurer_slug, "policy_name": policy_name, "doc_type": doc_type, "source_url": source_url, "page_start": c["page_start"], "page_end": c["page_end"], "chunk_idx": c["chunk_idx"], "local_path": str(pdf_path.relative_to(ROOT)), } for c in chunks ] collection.add( ids=ids, documents=texts, embeddings=vectors, metadatas=metadatas, ) _abort_if_hnsw_bloated() return len(chunks) async def main(): settings.VECTORS_DIR.mkdir(parents=True, exist_ok=True) _abort_if_hnsw_bloated() # fail fast if a prior run left a bloated index pdfs = discover_pdfs() manifest = load_manifest() collection = get_chroma_collection() embedder = ActiveEmbeddings() print(f"Ingesting {len(pdfs)} PDFs into Chroma at {settings.VECTORS_DIR}\n") total_chunks = 0 t0 = time.time() for i, pdf in enumerate(pdfs, 1): rel = str(pdf.relative_to(ROOT)) entry = manifest.get(rel, {}) print(f"[{i}/{len(pdfs)}] {pdf.parent.name} | {pdf.stem[:50]}") n = await ingest_one(pdf, entry, embedder, collection) total_chunks += n if n: print(f" -> {n} chunks") elapsed = time.time() - t0 final_count = collection.count() print(f"\nDone in {elapsed:.1f}s. {total_chunks} new chunks added. Collection now has {final_count} chunks total.") if __name__ == "__main__": asyncio.run(main())