InsuranceBot / rag /ingest.py
rohitsar567's picture
fix: KI-018..022 β€” routing, profile telemetry, NIM timeouts, judge JSON, chat clear
bcb7079
Raw
History Blame Contribute Delete
11.8 kB
"""Ingest pipeline: PDF -> chunks -> embeddings -> Chroma vector store.
For each PDF in rag/corpus/<insurer>/:
1. Read with pdfplumber, keep per-page text + page numbers
2. Chunk into 800-token windows with 120-token overlap (page-aware:
a chunk records the page range it spans)
3. Embed via Voyage (input_type=document)
4. Store in Chroma with metadata: policy_id, insurer_slug, doc_type,
policy_name, page_start, page_end, chunk_idx
Run from project root:
python -m rag.ingest
Idempotent: a chunk is keyed by (policy_id, chunk_idx); re-running won't dup.
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import re
import time
from pathlib import Path
from typing import Iterator
import chromadb
import pdfplumber
from chromadb.config import Settings as ChromaSettings
from backend.config import settings
from backend.providers.local_embeddings import LocalEmbeddings as ActiveEmbeddings
ROOT = settings.CORPUS_DIR.parent.parent # project root
# Hard cap on HNSW link_lists.bin size β€” guards against the ChromaDB bloat
# pathology that filled the disk on 2026-05-14 (single file reached 277 GB
# logical / 136 GB on-disk for only ~5K chunks). At M=16 link_lists.bin
# should be ~1 MB for a corpus this size; 500 MB is 500Γ— safety margin.
# When tripped we abort the ingest run loudly rather than letting the index
# keep growing into a disk-fill incident.
HNSW_BLOAT_THRESHOLD_BYTES = 500 * 1024 * 1024 # 500 MB
def _abort_if_hnsw_bloated() -> None:
if not settings.VECTORS_DIR.exists():
return
for f in settings.VECTORS_DIR.rglob("link_lists.bin"):
try:
sz = f.stat().st_size
except OSError:
continue
if sz > HNSW_BLOAT_THRESHOLD_BYTES:
raise RuntimeError(
"ChromaDB HNSW bloat tripwire: "
f"{f} is {sz / 1e9:.2f} GB (threshold "
f"{HNSW_BLOAT_THRESHOLD_BYTES / 1e6:.0f} MB). Aborting ingest. "
"Delete rag/vectors and re-clone the dataset from HF Hub, then "
"investigate the ChromaDB version / batch-size that triggered the bloat."
)
# ---------- chunking ----------
# Rough token estimate: 1 token ~= 4 chars (English/legal text)
CHARS_PER_TOKEN = 4
CHUNK_CHARS = settings.CHUNK_TOKENS * CHARS_PER_TOKEN # ~3200
OVERLAP_CHARS = settings.CHUNK_OVERLAP_TOKENS * CHARS_PER_TOKEN # ~480
def slugify(s: str) -> str:
s = re.sub(r"[^a-zA-Z0-9]+", "-", s.lower()).strip("-")
return re.sub(r"-+", "-", s)
def policy_id_for(pdf_path: Path) -> str:
"""Derive a stable policy_id from path: <insurer-slug>__<filename-stem>"""
insurer = pdf_path.parent.name
stem = pdf_path.stem # e.g. family-health-optima__wordings
return f"{insurer}__{stem}"
def read_pdf_pages(pdf_path: Path) -> list[tuple[int, str]]:
"""Return [(page_number, page_text), ...]. Page numbers are 1-indexed."""
out: list[tuple[int, str]] = []
with pdfplumber.open(pdf_path) as pdf:
for i, page in enumerate(pdf.pages, start=1):
text = page.extract_text() or ""
# Normalize whitespace lightly
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
out.append((i, text))
return out
def chunk_pages(
pages: list[tuple[int, str]],
target_chars: int = CHUNK_CHARS,
overlap_chars: int = OVERLAP_CHARS,
) -> Iterator[dict]:
"""Yield chunks with page-range metadata.
Strategy:
- Concatenate all page texts with markers so we can map char positions
back to pages
- Slide a window of target_chars across the joined text with overlap
"""
# Build the joined text with page boundaries we can recover later
page_markers: list[tuple[int, int]] = [] # (start_char, page_no)
joined = []
pos = 0
for page_no, text in pages:
page_markers.append((pos, page_no))
joined.append(text)
joined.append("\n\n")
pos += len(text) + 2
full_text = "".join(joined)
def page_at(char_pos: int) -> int:
# Binary search would be faster; tiny lists so linear is fine
last = page_markers[0][1]
for start, p in page_markers:
if start > char_pos:
return last
last = p
return last
if not full_text.strip():
return
start = 0
chunk_idx = 0
n = len(full_text)
while start < n:
end = min(start + target_chars, n)
# Prefer to end on a sentence boundary if one is within ~200 chars of end
if end < n:
window = full_text[end - 200 : end + 200]
local_dot = window.rfind(". ")
if local_dot != -1:
# Shift end to that boundary
end = (end - 200) + local_dot + 2
text = full_text[start:end].strip()
if text:
yield {
"chunk_idx": chunk_idx,
"text": text,
"page_start": page_at(start),
"page_end": page_at(min(end - 1, n - 1)),
"char_start": start,
"char_end": end,
}
chunk_idx += 1
if end >= n:
break
start = max(end - overlap_chars, start + 1)
# ---------- Chroma persistence ----------
def get_chroma_collection():
client = chromadb.PersistentClient(
path=str(settings.VECTORS_DIR),
settings=ChromaSettings(anonymized_telemetry=False),
)
return client.get_or_create_collection(
name="policies",
metadata={"hnsw:space": "cosine"},
)
def get_quarantine_collection():
"""SEPARATE Chroma collection for user-uploaded PDFs.
Physical isolation, not metadata-tag isolation: user uploads can NEVER
accidentally surface in another user's policy retrieval because they're
in a different on-disk index. Retrieval queries this collection only
when the request supplies a session_id matching the upload's session.
"""
client = chromadb.PersistentClient(
path=str(settings.VECTORS_DIR),
settings=ChromaSettings(anonymized_telemetry=False),
)
return client.get_or_create_collection(
name="user_uploads_quarantine",
metadata={"hnsw:space": "cosine"},
)
# Top-level corpus directories that the main `policies` collection ingests.
# Anything outside this allowlist must be rejected loudly β€” prevents a stray
# regulatory PDF landing in an insurer folder, or vice-versa.
ALLOWED_TOP_LEVEL_DIRS = {
"regulatory", # IRDAI / Govt mandates β†’ doc_type='regulatory'
"acko", "aditya-birla", "bajaj-allianz", "care-health", "cholamandalam",
"go-digit", "hdfc-ergo", "icici-lombard", "iffco-tokio", "manipalcigna",
"national-insurance", "new-india", "niva-bupa", "oriental-insurance",
"reliance-general", "royal-sundaram", "sbi-general", "star-health",
"tata-aig", "united-india",
# `user-upload/` is intentionally NOT in this set β€” user uploads go to
# the quarantine collection, never the main `policies` collection.
}
# ---------- pipeline ----------
def discover_pdfs() -> list[Path]:
"""All PDFs under rag/corpus/*/*.pdf, in deterministic order.
Hardened: any PDF whose parent dir name isn't in ALLOWED_TOP_LEVEL_DIRS
is REJECTED with a loud RuntimeError. Prevents accidental cross-
contamination β€” a regulatory PDF in an insurer folder won't get tagged
as a policy, a user upload won't sneak into the main corpus.
"""
pdfs: list[Path] = []
rejected: list[str] = []
for insurer_dir in sorted(settings.CORPUS_DIR.iterdir()):
if not insurer_dir.is_dir():
continue
if insurer_dir.name == "user-upload":
continue # quarantine path β€” handled by /api/upload-policy
if insurer_dir.name not in ALLOWED_TOP_LEVEL_DIRS:
rejected.append(insurer_dir.name)
continue
for pdf in sorted(insurer_dir.glob("*.pdf")):
pdfs.append(pdf)
if rejected:
raise RuntimeError(
"Unknown top-level corpus dirs found (not in ALLOWED_TOP_LEVEL_DIRS): "
f"{rejected}. Either add them to the allowlist or move them out of "
f"{settings.CORPUS_DIR}."
)
return pdfs
def load_manifest() -> dict:
"""Map URL -> insurer_name + policy_name + doc_type from _manifest.json."""
mf = settings.CORPUS_DIR / "_manifest.json"
if not mf.exists():
return {}
data = json.loads(mf.read_text())
out = {}
for r in data.get("results", []):
if not r.get("ok"):
continue
# local_path is relative to project root
out[r["local_path"]] = r
return out
async def ingest_one(
pdf_path: Path,
manifest_entry: dict,
embedder: ActiveEmbeddings,
collection,
):
policy_id = policy_id_for(pdf_path)
insurer_slug = pdf_path.parent.name
policy_name = manifest_entry.get("policy_name", pdf_path.stem)
doc_type = manifest_entry.get("doc_type", "unknown")
source_url = manifest_entry.get("url", "")
# PDFs under rag/corpus/regulatory/ are IRDAI / Govt mandates. Tag them
# so retrieve.py can apply the regulatory-intent boost.
if pdf_path.parent.name == "regulatory" or insurer_slug == "regulatory":
doc_type = "regulatory"
# Skip if already ingested
existing = collection.get(where={"policy_id": policy_id}, limit=1)
if existing and existing.get("ids"):
print(f" SKIP (already ingested): {policy_id}")
return 0
try:
pages = read_pdf_pages(pdf_path)
except Exception as e:
print(f" FAIL pdfplumber: {policy_id} | {type(e).__name__}: {e}")
return 0
chunks = list(chunk_pages(pages))
if not chunks:
print(f" EMPTY: {policy_id} (no text extracted)")
return 0
texts = [c["text"] for c in chunks]
try:
vectors = await embedder.embed(texts, input_type="document")
except Exception as e:
print(f" FAIL embed: {policy_id} | {type(e).__name__}: {e}")
return 0
ids = [f"{policy_id}::chunk{c['chunk_idx']}" for c in chunks]
metadatas = [
{
"policy_id": policy_id,
"insurer_slug": insurer_slug,
"policy_name": policy_name,
"doc_type": doc_type,
"source_url": source_url,
"page_start": c["page_start"],
"page_end": c["page_end"],
"chunk_idx": c["chunk_idx"],
"local_path": str(pdf_path.relative_to(ROOT)),
}
for c in chunks
]
collection.add(
ids=ids,
documents=texts,
embeddings=vectors,
metadatas=metadatas,
)
_abort_if_hnsw_bloated()
return len(chunks)
async def main():
settings.VECTORS_DIR.mkdir(parents=True, exist_ok=True)
_abort_if_hnsw_bloated() # fail fast if a prior run left a bloated index
pdfs = discover_pdfs()
manifest = load_manifest()
collection = get_chroma_collection()
embedder = ActiveEmbeddings()
print(f"Ingesting {len(pdfs)} PDFs into Chroma at {settings.VECTORS_DIR}\n")
total_chunks = 0
t0 = time.time()
for i, pdf in enumerate(pdfs, 1):
rel = str(pdf.relative_to(ROOT))
entry = manifest.get(rel, {})
print(f"[{i}/{len(pdfs)}] {pdf.parent.name} | {pdf.stem[:50]}")
n = await ingest_one(pdf, entry, embedder, collection)
total_chunks += n
if n:
print(f" -> {n} chunks")
elapsed = time.time() - t0
final_count = collection.count()
print(f"\nDone in {elapsed:.1f}s. {total_chunks} new chunks added. Collection now has {final_count} chunks total.")
if __name__ == "__main__":
asyncio.run(main())