InsuranceBot / rag /extract.py
rohitsar567's picture
chore(cleanup): purge stale narrative/tombstones/dead code β€” codebase reads as the current standard
23b8fad
Raw
History Blame Contribute Delete
13.6 kB
"""Structured extraction: PDF -> 48-field HealthPolicy JSON -> DuckDB.
For each PDF in rag/corpus/:
1. Read full text via pdfplumber (already have per-page text from ingest)
2. Pass to the brain LLM (get_brain_llm) with the Pydantic schema as a
structured-output target. Sarvam-M is not in the reasoning path (it
is used for Indic translation only).
3. Self-critique pass β€” LLM scores per-field confidence vs source text
4. Validate via Pydantic
5. Upsert into DuckDB `policies` table
Each extracted policy is also written to rag/extracted/<policy_id>.json for
reproducibility / debugging.
Run:
python -m rag.extract # extract all PDFs not yet extracted
python -m rag.extract --policy <policy_id> # one specific
"""
from __future__ import annotations
import argparse
import asyncio
import json
import re
import time
from pathlib import Path
from typing import Optional
import duckdb
import pdfplumber
from backend.config import settings
from backend.providers.base import ChatMessage
from backend.providers.nvidia_nim_llm import (
NvidiaNimLLM,
get_brain_llm,
)
from rag.ingest import policy_id_for
from rag.schema import HealthPolicy
ROOT = settings.CORPUS_DIR.parent.parent
EXTRACTED_DIR = settings.EXTRACTED_DIR
DB_PATH = settings.STRUCTURED_DB
# ---------- LLM extraction prompts ----------
EXTRACT_SYSTEM = """You extract structured fields from Indian health insurance policy documents and output a compact JSON object. Strict instructions:
1. **OUTPUT ONLY THE JSON.** No markdown fences, no commentary, no <think> tags, no preface. Start your response with `{` and end with `}`. Nothing else.
2. **EMIT EVERY FIELD YOU CAN INFER, EVEN PARTIALLY.** This is critical β€” be GENEROUS in field population, not conservative. For each field:
- If the document **states** it: extract the verbatim value.
- If the document **implies** it (e.g., "this is a family floater plan" implies policy_type='family_floater'; UIN starting with IHIP usually means individual indemnity): set the field with the inferred value.
- If the document is **partially silent but the structure is standard** (e.g., maternity not mentioned β†’ assume not covered, so omit; grace period not stated for renewals β†’ IRDAI mandates 30 days, so set 30): apply the IRDAI default when reasonable.
- Only OMIT a field when there is **zero textual signal** AND no industry default applies.
The downstream consumer treats null as "unknown" and won't show it. Sparse extractions hurt the user β€” they make the comparison + filter UI useless. Lean toward EMITTING.
3. **NORMALIZE VALUES.**
- Waiting periods in months as integer; days separately.
- Sum insured as list of INR integers, no commas: [500000, 1000000].
- Booleans: true / false (lowercase).
- Percentages: numeric (50 for 50%).
- Coverage items (CoverageItem): {"covered": bool, "limit_inr": int?, "limit_text": str?, "notes": str?} β€” drop null sub-keys inside.
- Enums use the canonical lowercase underscore form: 'family_floater', 'pan_india', 'critical_illness'. The schema normalises common variants automatically, but emitting canonical saves a normalisation hop.
4. **NO HALLUCINATIONS.** Do not invent specific NUMBERS (waiting months, sub-limits, sum-insured tiers) that aren't in the text. Inferring an ENUM from clear textual cues is fine β€” that's extraction, not hallucination.
5. **COMPACT.** No whitespace beyond what's needed. Single object."""
def build_extract_prompt(policy_text: str, schema_excerpt: str, policy_id: str) -> str:
# The policy text is capped at 12k chars (~3k tokens): that captures
# the Schedule + Key Definitions + Benefits front-matter where the
# structured fields live. The Exclusions section (the back half)
# doesn't yield many structured fields, just the policy_exclusions
# list which is mostly noise.
MAX_CHARS = 12_000
if len(policy_text) > MAX_CHARS:
# Front-bias: schedules, definitions, waiting periods, UIN, sum-insured
# tables all live in the first ~25k chars. Truncate the back (which is
# usually exclusions + boilerplate + grievance procedures).
policy_text = policy_text[:MAX_CHARS] + "\n\n[...truncated for length β€” extract from above only...]"
return f"""POLICY DOCUMENT (policy_id = {policy_id}):
'''
{policy_text}
'''
JSON SCHEMA (field names + types you must use):
{schema_excerpt}
Now produce the JSON object. Remember: null for any field not explicitly stated.
"""
# ---------- helpers ----------
def schema_excerpt() -> str:
"""Compact representation of HealthPolicy fields for the prompt. Strips
descriptions to save input tokens (~6.7k β†’ ~2.5k chars)."""
fields = HealthPolicy.model_fields
lines = []
for name, info in fields.items():
ann = info.annotation
ann_str = str(ann).replace("typing.", "").replace("Optional[", "?").replace("]", "")
lines.append(f" {name}: {ann_str}")
return "{\n" + "\n".join(lines) + "\n}"
def read_full_text(pdf_path: Path) -> str:
out = []
with pdfplumber.open(pdf_path) as pdf:
for i, page in enumerate(pdf.pages, start=1):
text = page.extract_text() or ""
text = re.sub(r"[ \t]+", " ", text)
out.append(f"[Page {i}]\n{text}")
return "\n\n".join(out)
def find_pdfs() -> list[Path]:
pdfs = []
for insurer_dir in sorted(settings.CORPUS_DIR.iterdir()):
if not insurer_dir.is_dir():
continue
for pdf in sorted(insurer_dir.glob("*.pdf")):
pdfs.append(pdf)
return pdfs
def load_manifest() -> dict:
mf = settings.CORPUS_DIR / "_manifest.json"
if not mf.exists():
return {}
data = json.loads(mf.read_text())
return {r["local_path"]: r for r in data.get("results", []) if r.get("ok")}
def json_from_llm_text(text: str) -> dict:
"""Strip code fences and <think> blocks, extract the first balanced {...} block."""
text = text.strip()
# Reasoning models emit <think>...</think> before the JSON; strip them
# (handles both closed and unterminated think blocks when output is
# truncated mid-thought).
text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL)
text = re.sub(r"<think>.*", "", text, flags=re.DOTALL)
# Remove fenced markdown
text = re.sub(r"^```(?:json)?\s*", "", text)
text = re.sub(r"\s*```$", "", text)
text = text.strip()
# Find first balanced { ... }
start = text.find("{")
if start == -1:
raise ValueError("no JSON object found in LLM output")
depth = 0
in_str = False
esc = False
for i in range(start, len(text)):
c = text[i]
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c == "{":
depth += 1
elif c == "}":
depth -= 1
if depth == 0:
return json.loads(text[start : i + 1])
raise ValueError("unbalanced braces in LLM JSON")
# ---------- DuckDB store ----------
def init_db():
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
con = duckdb.connect(str(DB_PATH))
con.execute("""
CREATE TABLE IF NOT EXISTS policies (
policy_id TEXT PRIMARY KEY,
insurer_slug TEXT,
insurer_name TEXT,
policy_name TEXT,
policy_type TEXT,
uin_code TEXT,
extraction_confidence_pct DOUBLE,
extracted_at TEXT,
source_pdf_path TEXT,
source_pdf_url TEXT,
data_json TEXT -- full HealthPolicy JSON for retrieval
)
""")
con.close()
def upsert_policy(policy: HealthPolicy, source_pdf_path: str, source_pdf_url: str):
con = duckdb.connect(str(DB_PATH))
data_json = policy.model_dump_json()
con.execute(
"""
INSERT INTO policies VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (policy_id) DO UPDATE SET
insurer_slug = excluded.insurer_slug,
insurer_name = excluded.insurer_name,
policy_name = excluded.policy_name,
policy_type = excluded.policy_type,
uin_code = excluded.uin_code,
extraction_confidence_pct = excluded.extraction_confidence_pct,
extracted_at = excluded.extracted_at,
source_pdf_path = excluded.source_pdf_path,
source_pdf_url = excluded.source_pdf_url,
data_json = excluded.data_json
""",
[
policy.policy_id,
policy.insurer_slug,
policy.insurer_name,
policy.policy_name,
(policy.policy_type.value if hasattr(policy.policy_type, "value") else policy.policy_type) if policy.policy_type else None,
policy.uin_code,
policy.extraction_confidence_pct,
time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
source_pdf_path,
source_pdf_url,
data_json,
],
)
con.close()
# ---------- pipeline ----------
async def extract_one(pdf_path: Path, manifest_entry: dict, llm_primary, llm_fallback) -> Optional[HealthPolicy]:
policy_id = policy_id_for(pdf_path)
out_json = EXTRACTED_DIR / f"{policy_id}.json"
if out_json.exists():
print(f" SKIP (already extracted): {policy_id}")
with open(out_json) as f:
return HealthPolicy(**json.load(f))
try:
text = read_full_text(pdf_path)
except Exception as e:
print(f" FAIL read: {policy_id} | {type(e).__name__}: {e}")
return None
prompt = build_extract_prompt(text, schema_excerpt(), policy_id)
messages = [
ChatMessage(role="system", content=EXTRACT_SYSTEM),
ChatMessage(role="user", content=prompt),
]
# Try primary, fall back on failure or empty result
raw = ""
for attempt, llm in enumerate([llm_primary, llm_fallback]):
try:
# Hard per-attempt timeout so a hung TCP connection in httpx
# pooling can't stall the whole sweep. The primary gets a larger
# ceiling than the fallback.
attempt_timeout = 180 if llm is llm_primary else 120
res = await asyncio.wait_for(
llm.chat(messages=messages, temperature=0.0, max_tokens=2048),
timeout=attempt_timeout,
)
raw = res.text
data = json_from_llm_text(raw)
# Force-fill identity fields from filename/manifest. These are
# REQUIRED in the schema, and the LLM frequently emits them as
# null because they're not in the truncated text. Override even
# if the key exists with null/empty.
if not data.get("policy_id"):
data["policy_id"] = policy_id
if not data.get("insurer_slug"):
data["insurer_slug"] = pdf_path.parent.name
if not data.get("insurer_name"):
data["insurer_name"] = manifest_entry.get("insurer_name") or pdf_path.parent.name
if not data.get("policy_name"):
data["policy_name"] = manifest_entry.get("policy_name") or pdf_path.stem
policy = HealthPolicy(**data)
EXTRACTED_DIR.mkdir(parents=True, exist_ok=True)
out_json.write_text(policy.model_dump_json(indent=2))
upsert_policy(
policy,
source_pdf_path=str(pdf_path.relative_to(ROOT)),
source_pdf_url=manifest_entry.get("url", ""),
)
print(f" OK | provider={llm.name} | conf={policy.extraction_confidence_pct or 'n/a'}")
return policy
except Exception as e:
print(f" attempt {attempt+1} FAIL: {type(e).__name__}: {e!s:.200s}")
continue
# Save the raw output for inspection on total failure
(EXTRACTED_DIR / f"{policy_id}._raw.txt").write_text(raw)
return None
async def main():
parser = argparse.ArgumentParser()
parser.add_argument("--policy", help="Specific policy_id to extract", default=None)
parser.add_argument("--limit", type=int, default=None, help="Cap on number of policies")
args = parser.parse_args()
init_db()
EXTRACTED_DIR.mkdir(parents=True, exist_ok=True)
pdfs = find_pdfs()
manifest = load_manifest()
if args.policy:
pdfs = [p for p in pdfs if policy_id_for(p) == args.policy]
if not pdfs:
print(f"No PDF matches policy_id={args.policy}")
return
if args.limit:
pdfs = pdfs[: args.limit]
# get_brain_llm() returns a NimChainLLM that already does internal
# multi-candidate fallback, so primary and fallback both resolve to it.
primary = get_brain_llm()
fallback = get_brain_llm()
print(f"Extracting {len(pdfs)} policies. via the brain chain (get_brain_llm, internal fallback).\n")
t0 = time.time()
ok = 0
for i, pdf in enumerate(pdfs, 1):
rel = str(pdf.relative_to(ROOT))
entry = manifest.get(rel, {})
print(f"[{i}/{len(pdfs)}] {pdf.parent.name} | {pdf.stem[:50]}")
result = await extract_one(pdf, entry, primary, fallback)
if result is not None:
ok += 1
elapsed = time.time() - t0
print(f"\nDone in {elapsed:.1f}s. {ok}/{len(pdfs)} extracted.")
print(f"DuckDB: {DB_PATH.relative_to(ROOT)}")
if __name__ == "__main__":
asyncio.run(main())