InsuranceBot / backend /uploaded_docs.py
rohitsar567's picture
fix(upload-extract): status parity on LLM-fail path + docs updated
2ec48b7
Raw
History Blame Contribute Delete
80.7 kB
"""Persistent uploaded-policy store (#52 β€” graded server-assignment).
WHAT THIS MODULE DOES
---------------------
When a user uploads a PDF via POST /api/upload-policy, three things must
survive an HF Space restart and become globally visible:
1. the raw PDF bytes,
2. a curated-facts-shaped JSON record (the SAME `{field:{value,
source_pdf_path, source_quote, _confidence}}` schema that
40-data/policy_facts/*.json uses, so it flows through the EXISTING
`backend.main._load_curated_facts` -> `_marketplace_catalogue` Pass-2
-> `build_scorecard` path with ZERO grading re-implementation), and
3. enough to re-index the document's chunks into the working Chroma
`policies` collection on the next boot.
PERSISTENCE MODEL
-----------------
Everything lands under `settings.UPLOADED_DOCS_DIR`:
<UPLOADED_DOCS_DIR>/
<policy_id>/
source.pdf # raw uploaded bytes
record.json # curated-facts-shaped JSON (the card)
chunks.json # [{chunk_idx,text,page_start,page_end}, ...]
meta.json # {policy_id, policy_name, insurer_slug,
# sha256, uploaded_at, session_id}
On the HF Space `settings.UPLOADED_DOCS_DIR` resolves to a directory on
the PERSISTENT `/data` disk (see backend/config.py + entrypoint.sh), so a
Space rebuild β€” which throws away the ephemeral container FS including
rag/vectors β€” does NOT lose uploaded policies. Locally (no /data) it
resolves under settings.DATA_DIR so the exact same code path works.
PRIVACY MODEL (explicit, per #52 spec)
--------------------------------------
The #52 spec says the uploaded doc is *added to THE (global) marketplace*.
So once a user uploads a policy it is intentionally a public marketplace
card and its chunks are globally retrievable (doc_type='user_upload' in
the main `policies` collection). The persistent store therefore contains
ONLY the uploaded policy document itself + data derived from it β€” never a
session profile, never another user's data. `session_id` is recorded in
meta.json purely for operational audit/abuse-tracing; it is NEVER used to
gate visibility of the card or the chunks (those are global by design) and
is NEVER written into the Chroma chunk metadata of the global collection.
The pre-existing session-scoped `user_uploads_quarantine` collection is a
separate, private, ephemeral path and is untouched by this module.
NO SILENT FAILURES
------------------
Every function here either succeeds or raises a typed exception with a
clear message. Callers (backend.main) decide whether a failure is fatal to
the request (record creation) or best-effort-logged (startup re-ingest of
ONE doc must not abort boot, but the failure is logged loudly).
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import re
import shutil
import time
from pathlib import Path
from typing import Any, Optional
from backend.config import settings
_log = logging.getLogger(__name__)
# Chroma metadata doc_type for a persisted, globally-visible uploaded doc.
# Deliberately the SAME token the quarantine path uses so the existing
# brain_tools UPLOADED-DOC handling + retrieve.py treat it identically.
UPLOAD_DOC_TYPE = "user_upload"
# Insurer slug for uploaded docs. MUST NOT be "regulatory" (that slug is
# filtered out of the marketplace) and MUST be stable so the card always
# resolves the same insurer_meta fallback.
UPLOAD_INSURER_SLUG = "user-upload"
UPLOAD_INSURER_NAME = "User-uploaded document"
# ---------------------------------------------------------------------------
# Insurer detection from PDF text (2026-05-27).
#
# Pre-this-change, every upload was stamped insurer_slug="user-upload",
# which short-circuits the Claim Experience scorecard sub-score (no
# matching reviews JSON under 40-data/reviews/<slug>.json) and leaves
# the card showing "reputation data being compiled" forever.
#
# Strategy: regex-match the first ~3 pages of the PDF against the
# canonical legal names of the 21 insurers we already have reviews
# data for. On a confident hit, flip insurer_slug to the real slug so
# the scorecard's Claim-Experience pass uses the real reviews data
# (claim_ratio, complaints, network) β€” same path as a catalogued card.
#
# Fail-closed: no match β‡’ stays "user-upload". Score still works,
# Claim Experience just falls back to a generic mid-range number.
# ---------------------------------------------------------------------------
# Each entry: (slug, [name_patterns]). Order matters β€” first hit wins,
# so put the most specific patterns first (e.g. "future generali" before
# bare "generali" β€” though we don't have a generali reviews file today).
# Patterns are matched case-insensitive against the first ~3 pages of
# PDF text (first ~6000 chars).
_INSURER_NAME_PATTERNS: list[tuple[str, list[str]]] = [
("acko", ["acko general insurance", "acko general", "acko gen ins", "acko gi"]),
("aditya-birla", ["aditya birla health insurance", "abhicl", "aditya birla health", "aditya birla"]),
("bajaj-allianz", ["bajaj allianz general insurance", "bajaj allianz general", "bajaj allianz"]),
("care-health", ["care health insurance", "religare health insurance"]),
("cholamandalam", ["cholamandalam ms general insurance", "cholamandalam ms general", "cholamandalam ms", "chola ms"]),
("go-digit", ["go digit general insurance", "go digit", "godigit"]),
("hdfc-ergo", ["hdfc ergo general insurance", "hdfc ergo health", "hdfc ergo"]),
("icici-lombard", ["icici lombard general insurance", "icici lombard"]),
("iffco-tokio", ["iffco tokio general insurance", "iffco tokio"]),
("indusind-general", ["indusind general insurance", "indusind general"]),
("manipalcigna", ["manipalcigna health insurance", "manipal cigna health insurance", "manipalcigna", "manipal cigna"]),
("national-insurance", ["national insurance company", "national insurance"]),
("new-india", ["new india assurance company", "new india assurance", "the new india assurance"]),
("niva-bupa", ["niva bupa health insurance", "niva bupa", "max bupa"]),
("oriental-insurance", ["oriental insurance company", "the oriental insurance", "oriental insurance"]),
("reliance-general", ["reliance general insurance"]),
("royal-sundaram", ["royal sundaram general insurance", "royal sundaram"]),
("sbi-general", ["sbi general insurance", "sbi gen"]),
("star-health", ["star health and allied insurance", "star health and allied", "star health"]),
("tata-aig", ["tata aig general insurance", "tata aig"]),
]
def detect_insurer_slug(full_text: str) -> Optional[str]:
"""Return the matching insurer slug (one of 21) or None.
Scans only the first ~6 000 chars (typically the cover + Part I) since
the insurer's legal name is in the header / footer of every IRDAI PDF.
Case-insensitive substring match in pattern-priority order. Fail-closed.
"""
if not full_text:
return None
head = full_text[:6000].lower()
for slug, patterns in _INSURER_NAME_PATTERNS:
for pat in patterns:
if pat in head:
return slug
return None
def detected_insurer_name(slug: str) -> str:
"""Pretty-display name for a detected slug β€” used in the persisted
record so the card header reads "ManipalCigna" not "manipalcigna".
"""
return {
"acko": "Acko",
"aditya-birla": "Aditya Birla Health",
"bajaj-allianz": "Bajaj Allianz",
"care-health": "Care Health",
"cholamandalam": "Cholamandalam MS",
"go-digit": "Go Digit",
"hdfc-ergo": "HDFC ERGO",
"icici-lombard": "ICICI Lombard",
"iffco-tokio": "IFFCO Tokio",
"indusind-general": "IndusInd General",
"manipalcigna": "ManipalCigna",
"national-insurance": "National Insurance",
"new-india": "New India Assurance",
"niva-bupa": "Niva Bupa",
"oriental-insurance": "Oriental Insurance",
"reliance-general": "Reliance General",
"royal-sundaram": "Royal Sundaram",
"sbi-general": "SBI General",
"star-health": "Star Health",
"tata-aig": "Tata AIG",
}.get(slug, slug)
# ---------------------------------------------------------------------------
# Storage layout
# ---------------------------------------------------------------------------
def uploaded_docs_dir() -> Path:
"""The persistent root for uploaded docs. Created on first use."""
d = settings.UPLOADED_DOCS_DIR
d.mkdir(parents=True, exist_ok=True)
return d
def _doc_dir(policy_id: str) -> Path:
# policy_id is already a tight slug (see backend.main.upload_policy:
# user-upload__<sid12>__<fileslug>) but defend against path traversal.
safe = re.sub(r"[^a-zA-Z0-9_.\-]+", "-", policy_id).strip("-") or "user-upload"
return uploaded_docs_dir() / safe
def prune_persisted_upload(
policy_id: Optional[str] = None, *, prefix: Optional[str] = None
) -> dict:
"""Operator/abuse prune of persisted uploaded doc(s) (#52 residual #5,
#77). Pass an exact `policy_id` OR a `prefix` (e.g.
'user-upload__e2e-verify' to bulk-remove test/abuse cards).
HARD GUARDRAIL: only ever removes a directory that is a DIRECT CHILD of
UPLOADED_DOCS_DIR β€” it can never touch rag/corpus, 40-data, or any
curated/extracted data. A path-safety violation RAISES (must surface;
a silent no-op here would be forbidden by the no-silent-failure rule).
Returns {removed:[ids], skipped:[ids-not-present], root}.
"""
root = uploaded_docs_dir().resolve()
targets: list[str] = []
if policy_id:
targets.append(policy_id)
if prefix is not None:
pfx = re.sub(r"[^a-zA-Z0-9_.\-]+", "-", prefix).strip("-")
if not pfx:
raise RuntimeError("prune prefix is empty after sanitisation")
for d in sorted(root.glob("*")):
if d.is_dir() and d.name.startswith(pfx):
targets.append(d.name)
removed: list[str] = []
skipped: list[str] = []
for pid in dict.fromkeys(targets): # dedupe, preserve order
ddir = _doc_dir(pid).resolve()
if ddir == root or root not in ddir.parents:
raise RuntimeError(
f"refusing to prune outside uploaded-docs root: {pid!r}"
)
if not ddir.exists():
skipped.append(pid)
continue
shutil.rmtree(ddir)
removed.append(pid)
return {"removed": removed, "skipped": skipped, "root": str(root)}
# ---------------------------------------------------------------------------
# Heuristic field extraction -> curated-facts-shaped record
#
# The repo's LLM extractor (rag/extract.py) needs network + the NIM brain.
# That is correct for the corpus build but unusable inside a request (and
# untestable offline). So we derive a REAL, sourced record deterministically
# from the PDF's own text via regex over the IRDAI-standardised wording that
# every Indian health policy uses. Each field we emit carries the verbatim
# source_quote it was matched from β€” nothing is fabricated; a field is only
# emitted when its evidence is literally present in the document.
#
# This populates well above the scorecard's MIN_GRADEABLE_COMPLETENESS_PCT
# (9.0 == ~2 of 23 SCORED_FIELDS) so the card grades for real instead of
# returning the data-starved "β€”"/0 sentinel. When the document genuinely
# lacks structured terms, we DO NOT invent any β€” the card then honestly
# shows the sentinel, which is the correct behaviour.
# ---------------------------------------------------------------------------
def _ctx(text: str, m: re.Match, pad: int = 90) -> str:
"""Verbatim surrounding snippet for a regex match (the source_quote)."""
s = max(0, m.start() - pad)
e = min(len(text), m.end() + pad)
return re.sub(r"\s+", " ", text[s:e]).strip()[:300]
def _fact(value: Any, quote: str, conf: str = "medium") -> dict:
"""A curated-facts cell: {value, source_pdf_path, source_quote, _confidence}."""
return {
"value": value,
"source_pdf_path": "", # filled by the caller with the persisted PDF path
"source_quote": quote,
"_confidence": conf,
}
def extract_fields_from_text(full_text: str) -> dict[str, dict]:
"""Regex-derive scorecard-relevant fields from policy text.
Returns a {field_name: <fact cell>} dict using the SAME canonical field
names backend.scorecard.SCORED_FIELDS / ALIASES read. Only fields with
literal textual evidence are emitted. Never raises (a totally
unparseable doc just yields {}).
"""
t = full_text or ""
low = t.lower()
out: dict[str, dict] = {}
def add(field: str, value: Any, m: Optional[re.Match], conf: str = "medium"):
if value is None:
return
if field in out:
return
quote = _ctx(t, m) if m is not None else ""
out[field] = _fact(value, quote, conf)
# --- UIN (regulator identity; not a scored field but anchors the card) --
m = re.search(r"\b([A-Z]{3}[A-Z0-9]{10,22}V\d{6})\b", t)
if m:
add("uin_code", m.group(1), m, "high")
# --- Initial waiting period (days) -------------------------------------
m = re.search(
r"(\d{1,3})\s*days?[^.]{0,80}?(?:waiting period|from the (?:first )?"
r"(?:policy )?(?:commencement|inception)|shall be excluded)",
t, re.IGNORECASE,
) or re.search(
r"(?:waiting period|initial waiting)[^.]{0,60}?(\d{1,3})\s*days?",
t, re.IGNORECASE,
)
if m:
d = int(m.group(1))
if 0 < d <= 90:
add("initial_waiting_period_days", d, m, "high")
# --- Pre-existing disease waiting (months) -----------------------------
m = re.search(
r"pre[\-\s]?existing[^.]{0,120}?(\d{1,2})\s*(?:months|month)",
t, re.IGNORECASE,
) or re.search(
r"(\d{1,2})\s*months[^.]{0,80}?pre[\-\s]?existing",
t, re.IGNORECASE,
)
if m:
mo = int(m.group(1))
if 0 < mo <= 72:
add("pre_existing_disease_waiting_months", mo, m, "high")
# --- Specific-disease waiting (months) ---------------------------------
m = re.search(
r"(?:specific (?:disease|illness)|cataract|hernia)[^.]{0,120}?"
r"(\d{1,2})\s*months",
t, re.IGNORECASE,
)
if m:
mo = int(m.group(1))
if 0 < mo <= 48:
add("specific_disease_waiting_months", mo, m, "medium")
# --- Maternity waiting (months) ----------------------------------------
m = re.search(
r"maternity[^.]{0,120}?(\d{1,2})\s*months",
t, re.IGNORECASE,
)
if m:
mo = int(m.group(1))
if 0 < mo <= 48:
add("maternity_waiting_months", mo, m, "medium")
# --- Pre / post hospitalisation (days) ---------------------------------
m = re.search(r"pre[\-\s]?hospitali[sz]ation[^.]{0,60}?(\d{1,3})\s*days", t, re.IGNORECASE)
if m:
d = int(m.group(1))
if 0 < d <= 180:
add("pre_hospitalization_days", d, m, "high")
m = re.search(r"post[\-\s]?hospitali[sz]ation[^.]{0,60}?(\d{1,3})\s*days", t, re.IGNORECASE)
if m:
d = int(m.group(1))
if 0 < d <= 365:
add("post_hospitalization_days", d, m, "high")
# --- Co-payment (%) ----------------------------------------------------
m = re.search(r"co[\-\s]?pay(?:ment)?[^.]{0,80}?(\d{1,2})\s*%", t, re.IGNORECASE) \
or re.search(r"(\d{1,2})\s*%[^.]{0,40}?co[\-\s]?pay", t, re.IGNORECASE)
if m:
pct = int(m.group(1))
if 0 <= pct <= 50:
add("copayment_pct", pct, m, "medium")
# --- No-claim bonus (%) ------------------------------------------------
m = re.search(
r"(?:no[\-\s]?claim bonus|cumulative bonus|ncb)[^.]{0,80}?(\d{1,3})\s*%",
t, re.IGNORECASE,
)
if m:
pct = int(m.group(1))
if 0 < pct <= 200:
# MarketplacePolicy.no_claim_bonus_pct is Optional[int]; the
# scorecard reads it numerically either way. Emit int.
add("no_claim_bonus_pct", pct, m, "medium")
# --- Room rent capping -------------------------------------------------
m = re.search(
r"room rent[^.]{0,90}?(no (?:sub[\-\s]?limit|cap|capping|limit)|"
r"\d{1,2}\s*%\s*(?:of\s*(?:the\s*)?sum insured|of si)?|single private|"
r"twin sharing|shared accommodation)",
t, re.IGNORECASE,
)
if m:
cap = m.group(1).strip()
if re.search(r"no (sub[\-\s]?limit|cap|capping|limit)", cap, re.IGNORECASE):
cap = "No room rent cap"
add("room_rent_capping", cap, m, "medium")
# --- Network hospital count -------------------------------------------
m = re.search(
r"([\d,]{3,7})\+?\s*(?:network |empanelled |cashless )?hospitals?",
t, re.IGNORECASE,
)
if m:
try:
n = int(m.group(1).replace(",", ""))
if 50 <= n <= 50000:
add("network_hospital_count", n, m, "medium")
except ValueError:
pass
# --- Cashless supported -----------------------------------------------
if "cashless" in low:
m = re.search(r"cashless[^.]{0,80}", t, re.IGNORECASE)
add("cashless_treatment_supported", True, m, "medium")
# --- Max entry age (years) --------------------------------------------
m = re.search(
r"(?:maximum |max\.? )?entry age[^.]{0,40}?(\d{2,3})\s*years",
t, re.IGNORECASE,
) or re.search(
r"entry age[^.]{0,40}?up to\s*(\d{2,3})\s*years", t, re.IGNORECASE,
)
if m:
age = int(m.group(1))
if 30 <= age <= 100:
add("max_entry_age", age, m, "medium")
# --- AYUSH coverage ----------------------------------------------------
if re.search(r"\bayush\b", low) or "ayurved" in low:
m = re.search(r"ayush[^.]{0,90}", t, re.IGNORECASE) or re.search(
r"ayurved[^.]{0,90}", t, re.IGNORECASE)
add("ayush_coverage", {"covered": True}, m, "medium")
# --- Maternity coverage (boolean-with-detail) -------------------------
if "maternity" in low:
m = re.search(r"maternity[^.]{0,120}", t, re.IGNORECASE)
covered = not bool(re.search(
r"maternity[^.]{0,40}(not covered|excluded|no cover)", t, re.IGNORECASE))
add("maternity_coverage", {"covered": covered}, m, "medium")
# --- Ambulance / day-care / restoration (presence booleans) -----------
if "ambulance" in low:
m = re.search(r"ambulance[^.]{0,90}", t, re.IGNORECASE)
add("ambulance_cover", {"covered": True}, m, "low")
if "day care" in low or "day-care" in low or "daycare" in low:
m = re.search(r"day[\-\s]?care[^.]{0,90}", t, re.IGNORECASE)
add("day_care_treatments_count", {"covered": True, "limit_text": "Day-care procedures covered"}, m, "low")
if "restoration" in low or "refill" in low or "reinstatement" in low:
m = re.search(r"(restoration|refill|reinstatement)[^.]{0,90}", t, re.IGNORECASE)
add("restoration_benefit", {"covered": True}, m, "low")
# --- Claim settlement ratio (insurer-level; commonly stated in CIS) ----
m = re.search(
r"claim settlement ratio[^.]{0,40}?(\d{2,3}(?:\.\d{1,2})?)\s*%",
t, re.IGNORECASE,
)
if m:
try:
csr = float(m.group(1))
if 30 <= csr <= 100:
add("claim_settlement_ratio", csr, m, "medium")
except ValueError:
pass
# ─── 2026-05-27 β€” heuristic-baseline expansion (KI-332) ─────────────
# Adds ~12 new patterns that lift typical upload completeness from
# ~47.8% to ~65-70% even when ALL LLM passes fail. Each pattern is
# high-precision (regex with sanity bounds) β€” if the doc literally
# doesn't state the value we skip the field, never fabricate.
# --- Sum insured options (INR) ----------------------------------------
# Catches "β‚Ή3 Lakh / β‚Ή5 Lakh / β‚Ή10 Lakh / β‚Ή25 Lakh" style ladders.
si_matches = re.findall(
r"(?:rs\.?|β‚Ή|inr)\s*(\d{1,3}(?:[,.]\d{2,3})*)\s*(lakh|lac|crore|cr)\b",
t, re.IGNORECASE,
)
if si_matches:
vals: list[int] = []
for num_str, unit in si_matches:
try:
n = float(num_str.replace(",", ""))
if unit.lower() in ("lakh", "lac"):
n_inr = int(n * 100_000)
else: # crore
n_inr = int(n * 10_000_000)
if 100_000 <= n_inr <= 500_000_000:
vals.append(n_inr)
except (ValueError, TypeError):
continue
vals = sorted(set(vals))[:10] # cap at 10 options; sorted ascending
if 2 <= len(vals) <= 10:
m2 = re.search(r"(?:rs\.?|β‚Ή|inr)\s*\d", t, re.IGNORECASE)
add("sum_insured_options_inr", vals, m2, "medium")
# --- Policy type ------------------------------------------------------
if re.search(r"\bfamily floater\b", low):
m = re.search(r"family floater[^.]{0,60}", t, re.IGNORECASE)
add("policy_type", "family_floater", m, "medium")
elif re.search(r"\bsenior citizen\b", low) and "policy" in low:
m = re.search(r"senior citizen[^.]{0,60}", t, re.IGNORECASE)
add("policy_type", "senior_citizen", m, "medium")
elif re.search(r"\bcritical illness\b", low) and "lump" in low:
m = re.search(r"critical illness[^.]{0,60}", t, re.IGNORECASE)
add("policy_type", "critical_illness", m, "medium")
elif re.search(r"\btop[-\s]?up\b", low):
m = re.search(r"top[-\s]?up[^.]{0,60}", t, re.IGNORECASE)
add("policy_type", "top_up", m, "medium")
# --- Min entry age (years) ---------------------------------------------
m = re.search(
r"min(?:imum)?[^.]{0,30}?entry age[^.]{0,30}?(\d{1,2})\s*(?:years|yrs)",
t, re.IGNORECASE,
) or re.search(
r"entry age[^.]{0,40}?(\d{1,2})\s*(?:years|yrs)[^.]{0,20}?to",
t, re.IGNORECASE,
)
if m:
age = int(m.group(1))
if 0 <= age <= 35:
add("min_entry_age_years", age, m, "medium")
# --- Min child entry age (days) ---------------------------------------
m = re.search(
r"(\d{2,3})\s*days?[^.]{0,40}?(?:dependent (?:child|children)|child(?:ren)?)",
t, re.IGNORECASE,
) or re.search(
r"(?:dependent (?:child|children)|child(?:ren)?)[^.]{0,40}?(\d{2,3})\s*days",
t, re.IGNORECASE,
)
if m:
d = int(m.group(1))
if 1 <= d <= 365:
add("min_child_entry_age_days", d, m, "medium")
# --- Lifelong / max renewal age ----------------------------------------
if re.search(r"\blifelong renew", low) or re.search(r"\blife[-\s]?long renew", low):
m = re.search(r"life[-\s]?long renew[^.]{0,80}", t, re.IGNORECASE)
add("max_renewal_age_years", 999, m, "medium")
else:
m = re.search(
r"(?:renewal|renewable)[^.]{0,40}?(?:up to|until|till)\s*(\d{2,3})\s*(?:years|yrs)",
t, re.IGNORECASE,
)
if m:
age = int(m.group(1))
if 50 <= age <= 120:
add("max_renewal_age_years", age, m, "medium")
# --- Grace period (days) -----------------------------------------------
m = re.search(
r"grace period[^.]{0,40}?(\d{1,3})\s*(?:days?)",
t, re.IGNORECASE,
)
if m:
d = int(m.group(1))
if 1 <= d <= 90:
add("grace_period_days", d, m, "high")
# --- Free-look period (days) -------------------------------------------
m = re.search(
r"free[-\s]?look[^.]{0,40}?(\d{1,3})\s*(?:days?)",
t, re.IGNORECASE,
) or re.search(
r"(\d{1,3})\s*days?\s*(?:as a )?free[-\s]?look",
t, re.IGNORECASE,
)
if m:
d = int(m.group(1))
if 7 <= d <= 60:
add("free_look_period_days", d, m, "high")
# --- Geographic coverage ------------------------------------------------
if re.search(r"\b(?:worldwide|global)\b", low):
m = re.search(r"(?:worldwide|global)[^.]{0,80}", t, re.IGNORECASE)
add("geographic_coverage", "worldwide", m, "medium")
elif re.search(r"\bpan[-\s]?india\b", low):
m = re.search(r"pan[-\s]?india[^.]{0,40}", t, re.IGNORECASE)
add("geographic_coverage", "pan_india", m, "medium")
elif re.search(r"\bonly in india\b|\bindian (resident|territory)\b", low):
m = re.search(r"india[^.]{0,40}", t, re.IGNORECASE)
add("geographic_coverage", "india", m, "low")
# --- ICU capping --------------------------------------------------------
m = re.search(
r"icu(?:\s+charges?| rent)?[^.]{0,80}?(?:no cap|no limit|(\d{1,2})\s*%|2\s*x)",
t, re.IGNORECASE,
)
if m:
s = m.group(0).strip()
if "no cap" in s.lower() or "no limit" in s.lower():
add("icu_capping", "No ICU cap", m, "medium")
elif m.group(1):
add("icu_capping", f"{m.group(1)}% of sum insured", m, "medium")
# --- Deductible (INR) β€” top-up / super top-up plans --------------------
m = re.search(
r"deductible[^.]{0,40}?(?:rs\.?|β‚Ή|inr)\s*(\d{1,3}(?:[,.]\d{2,3})*)",
t, re.IGNORECASE,
)
if m:
try:
n = int(m.group(1).replace(",", "").replace(".", ""))
if 25_000 <= n <= 10_000_000:
add("deductible_amount_inr", n, m, "medium")
except ValueError:
pass
# --- No-claim bonus cap (%) --------------------------------------------
m = re.search(
r"(?:no[\-\s]?claim bonus|cumulative bonus|ncb)[^.]{0,160}?"
r"(?:up to|maximum|cap(?:ped)?)\s*(\d{1,3})\s*%",
t, re.IGNORECASE,
)
if m:
pct = int(m.group(1))
if 25 <= pct <= 250:
add("no_claim_bonus_cap_pct", pct, m, "medium")
# --- Organ donor / critical illness / preventive health-check ----------
if re.search(r"organ\s+donor", low):
m = re.search(r"organ\s+donor[^.]{0,90}", t, re.IGNORECASE)
add("organ_donor_expenses", {"covered": True}, m, "low")
if re.search(r"critical illness", low):
m = re.search(r"critical illness[^.]{0,100}", t, re.IGNORECASE)
# If we find a number of CIs covered, capture it in limit_text.
cnt = re.search(r"(\d{1,3})\s*critical illnesses?", t, re.IGNORECASE)
item: dict[str, Any] = {"covered": True}
if cnt:
item["limit_text"] = f"Covers {cnt.group(1)} critical illnesses"
add("critical_illness_cover", item, m, "low")
if re.search(r"preventive (?:health )?check[\-\s]?up|annual (?:health )?check", low):
m = re.search(
r"preventive (?:health )?check[^.]{0,90}|annual (?:health )?check[^.]{0,90}",
t, re.IGNORECASE,
)
add("preventive_health_checkup", {"covered": True}, m, "low")
if re.search(r"domiciliary", low):
m = re.search(r"domiciliary[^.]{0,90}", t, re.IGNORECASE)
add("domiciliary_treatment", {"covered": True}, m, "low")
if re.search(r"newborn|new[\-\s]?born", low):
m = re.search(r"new[-\s]?born[^.]{0,90}", t, re.IGNORECASE)
add("newborn_coverage", {"covered": True}, m, "low")
# --- Premium payment modes (often listed as a comma-separated set) ----
modes: list[str] = []
if re.search(r"\bannual(?:ly)?\b", low):
modes.append("annual")
if re.search(r"\bhalf[\-\s]?yearly\b|\bsemi[\-\s]?annual\b", low):
modes.append("half_yearly")
if re.search(r"\bquarterly\b", low):
modes.append("quarterly")
if re.search(r"\bmonthly\b", low):
modes.append("monthly")
if len(modes) >= 1:
m = re.search(r"premium[^.]{0,160}?(?:annual|monthly|quarterly|half[\-\s]?yearly)", t, re.IGNORECASE)
add("premium_payment_modes", modes, m, "low")
return out
def _derive_policy_name(full_text: str, fallback: str) -> str:
"""Best-effort human policy name from the document header."""
for line in (full_text or "").splitlines():
s = line.strip()
if not s:
continue
if re.search(r"(policy|plan|insurance|mediclaim|health)", s, re.IGNORECASE) \
and 6 <= len(s) <= 90:
return re.sub(r"\s+", " ", s)
return fallback
# ---------------------------------------------------------------------------
# Persisted record (curated-facts JSON) + PDF + chunk payload
# ---------------------------------------------------------------------------
def build_record(
policy_id: str,
policy_name: str,
full_text: str,
persisted_pdf_path: str,
) -> dict:
"""Build the curated-facts-shaped JSON the marketplace Pass-2 consumes.
The returned dict is the EXACT shape `_load_curated_facts._flatten`
expects: scalar identity keys + per-field `{value, source_*}` cells.
"""
fields = extract_fields_from_text(full_text)
rel_pdf = persisted_pdf_path
for cell in fields.values():
if isinstance(cell, dict) and "source_pdf_path" in cell:
cell["source_pdf_path"] = rel_pdf
# 2026-05-27 β€” detect the actual insurer from the PDF text and flip the
# insurer_slug off the generic "user-upload" so the scorecard's Claim
# Experience sub-score pulls the real reviews JSON
# (40-data/reviews/<slug>.json). Fail-closed: no match β‡’ keep
# UPLOAD_INSURER_SLUG.
detected = detect_insurer_slug(full_text)
slug = detected or UPLOAD_INSURER_SLUG
record: dict[str, Any] = {
"policy_id": policy_id,
"policy_name": policy_name or _derive_policy_name(full_text, policy_id),
"insurer_slug": slug,
"_uploaded_doc": True, # provenance flag (ignored by scorecard)
}
if detected:
# Pretty name for any card renderer that reads from this record.
record["insurer_name"] = detected_insurer_name(detected)
record.update(fields)
return record
def persist_upload(
*,
policy_id: str,
policy_name: str,
pdf_bytes: bytes,
full_text: str,
chunks: list[dict],
session_id: str,
) -> dict:
"""Atomically persist the PDF + JSON record + chunk payload + meta.
Returns the built record dict. Raises RuntimeError on any failure (the
caller MUST surface this β€” a "successful" upload that didn't persist is
a silent failure and is forbidden by the #52 spec).
"""
try:
ddir = _doc_dir(policy_id)
ddir.mkdir(parents=True, exist_ok=True)
pdf_path = ddir / "source.pdf"
pdf_path.write_bytes(pdf_bytes)
record = build_record(
policy_id, policy_name, full_text, persisted_pdf_path=str(pdf_path),
)
# Write to temp files then os.replace for crash-atomic visibility.
rec_tmp = ddir / "record.json.tmp"
rec_tmp.write_text(json.dumps(record, indent=2, ensure_ascii=False))
rec_tmp.replace(ddir / "record.json")
chunk_payload = [
{
"chunk_idx": c["chunk_idx"],
"text": c["text"],
"page_start": c["page_start"],
"page_end": c["page_end"],
}
for c in chunks
]
ch_tmp = ddir / "chunks.json.tmp"
ch_tmp.write_text(json.dumps(chunk_payload, ensure_ascii=False))
ch_tmp.replace(ddir / "chunks.json")
meta = {
"policy_id": policy_id,
"policy_name": record["policy_name"],
# Use whatever build_record resolved β€” the detected insurer
# slug if we matched one, else UPLOAD_INSURER_SLUG.
"insurer_slug": record.get("insurer_slug", UPLOAD_INSURER_SLUG),
"sha256": hashlib.sha256(pdf_bytes).hexdigest(),
"uploaded_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"session_id": session_id, # audit only β€” NEVER a visibility gate
"n_chunks": len(chunk_payload),
}
meta_tmp = ddir / "meta.json.tmp"
meta_tmp.write_text(json.dumps(meta, indent=2))
meta_tmp.replace(ddir / "meta.json")
_log.info(
"persisted uploaded doc %s (%d fields, %d chunks) -> %s",
policy_id, len([k for k in record if not k.startswith(("policy_", "insurer_", "_"))]),
len(chunk_payload), ddir,
)
return record
except Exception as e: # noqa: BLE001 β€” convert to a loud typed failure
raise RuntimeError(
f"persist_upload failed for {policy_id}: {type(e).__name__}: {e}"
) from e
# ---------------------------------------------------------------------------
# Read side β€” used by _load_curated_facts (cards) + startup re-ingest (chunks)
# ---------------------------------------------------------------------------
def load_persisted_records() -> dict[str, dict]:
"""{policy_id: curated-facts-shaped record} for every persisted upload.
Consumed by backend.main._load_curated_facts so each uploaded doc
surfaces as a marketplace card via the EXISTING Pass-2 + build_scorecard
path. A single corrupt record is skipped (logged) β€” it must not take
down the whole catalogue.
"""
out: dict[str, dict] = {}
root = settings.UPLOADED_DOCS_DIR
if not root.exists():
return out
for d in sorted(root.iterdir()):
if not d.is_dir():
continue
rec_path = d / "record.json"
if not rec_path.exists():
continue
try:
rec = json.loads(rec_path.read_text())
pid = rec.get("policy_id") or d.name
out[pid] = rec
except Exception as e: # noqa: BLE001
_log.warning(
"skipping corrupt uploaded record %s: %s: %s",
rec_path, type(e).__name__, e,
)
continue
return out
def iter_persisted_chunks():
"""Yield (policy_id, policy_name, [chunk dicts]) for every persisted doc.
Used by the startup re-ingest to rebuild the uploaded docs' vectors in
the working Chroma `policies` collection after a Space restart wiped the
ephemeral rag/vectors snapshot.
"""
root = settings.UPLOADED_DOCS_DIR
if not root.exists():
return
for d in sorted(root.iterdir()):
if not d.is_dir():
continue
ch_path = d / "chunks.json"
meta_path = d / "meta.json"
if not (ch_path.exists() and meta_path.exists()):
continue
try:
meta = json.loads(meta_path.read_text())
chunks = json.loads(ch_path.read_text())
except Exception as e: # noqa: BLE001
_log.warning(
"skipping unreadable persisted chunks %s: %s: %s",
ch_path, type(e).__name__, e,
)
continue
yield (
meta.get("policy_id") or d.name,
meta.get("policy_name") or d.name,
chunks,
)
async def reingest_persisted_into_policies() -> dict:
"""Re-embed every persisted uploaded doc's chunks into the working
Chroma `policies` collection (idempotent: deletes the doc's prior
chunks first, keyed by policy_id).
Globally visible by design (#52: uploaded doc is added to THE
marketplace). Returns a small summary dict. Raises only if Chroma /
embedder are completely unavailable; a single bad doc is logged and
skipped so one corrupt upload can't block boot.
"""
from rag.ingest import get_chroma_collection
from backend.providers.local_embeddings import LocalEmbeddings
docs = list(iter_persisted_chunks())
summary = {"docs": 0, "chunks": 0, "skipped": 0}
if not docs:
return summary
collection = get_chroma_collection()
embedder = LocalEmbeddings()
for policy_id, policy_name, chunks in docs:
if not chunks:
summary["skipped"] += 1
continue
try:
texts = [c["text"] for c in chunks]
vectors = await embedder.embed(texts, input_type="document")
ids = [f"{policy_id}::chunk{c['chunk_idx']}" for c in chunks]
metadatas = [
{
"policy_id": policy_id,
"insurer_slug": UPLOAD_INSURER_SLUG,
"policy_name": policy_name,
"doc_type": UPLOAD_DOC_TYPE,
"source_url": "",
"page_start": c["page_start"],
"page_end": c["page_end"],
"chunk_idx": c["chunk_idx"],
# NOTE: no session_id β€” these are GLOBAL marketplace
# chunks by design, not session-private quarantine.
}
for c in chunks
]
try:
collection.delete(where={"policy_id": policy_id})
except Exception: # noqa: BLE001 β€” first-ever ingest has nothing to delete
pass
collection.add(
ids=ids, documents=texts, embeddings=vectors, metadatas=metadatas,
)
summary["docs"] += 1
summary["chunks"] += len(chunks)
_log.info(
"re-ingested uploaded doc %s (%d chunks) into policies",
policy_id, len(chunks),
)
except Exception as e: # noqa: BLE001 β€” one bad doc must not block boot
summary["skipped"] += 1
_log.warning(
"startup re-ingest skipped %s: %s: %s",
policy_id, type(e).__name__, e,
)
return summary
# ---------------------------------------------------------------------------
# LLM-assisted extraction for uploaded PDFs (2026-05-27, ADR-044).
#
# Parity with the catalogued 148: same LLM (get_brain_llm), same EXTRACT
# prompt, same HealthPolicy schema, same downstream merge into the
# marketplace catalogue. Pre-this-change the upload path only ran the
# deterministic-heuristic `extract_fields_from_text` over the PDF, which
# is why uploaded cards stalled at 13-48% data_completeness vs the
# 74% median for catalogued. After this change uploaded cards land in
# the same completeness band by construction.
#
# Runs as a background asyncio task fired from the upload endpoint.
# The upload's HTTP response returns immediately (sub-second) with the
# heuristic record; the LLM pass (~30-60s) lands in the background.
# A new GET /api/upload/extraction-status/{policy_id} endpoint exposes
# in-flight state to the frontend so the chat flow can wait for
# extraction β†’ THEN render the card with full data (no partial render).
# ---------------------------------------------------------------------------
# In-memory status dict β€” one entry per uploaded policy_id.
# Shape:
# {
# "status": "pending" | "running" | "complete" | "failed",
# "policy_id": str,
# "policy_name": str,
# "insurer_slug": str,
# "started_at": ISO-8601 UTC,
# "completed_at": ISO-8601 UTC | None,
# "completeness_pct": float | None, # populated on complete
# "overall_grade": str | None,
# "error": str | None,
# }
# Survives only the live process β€” fine for the UX use case (the
# frontend polls within ~120s of upload).
_UPLOAD_EXTRACTION_STATUS: dict[str, dict] = {}
_UPLOAD_EXTRACTION_LOCK = asyncio.Lock()
async def _set_extraction_status(policy_id: str, **fields) -> None:
async with _UPLOAD_EXTRACTION_LOCK:
cur = _UPLOAD_EXTRACTION_STATUS.get(policy_id, {})
cur.update(fields)
cur["policy_id"] = policy_id
_UPLOAD_EXTRACTION_STATUS[policy_id] = cur
def get_extraction_status(policy_id: str) -> Optional[dict]:
"""Public read accessor used by the /api/upload/extraction-status endpoint."""
return _UPLOAD_EXTRACTION_STATUS.get(policy_id)
# ---------------------------------------------------------------------------
# Tier-2 optimisations (ADR-044, 2026-05-27):
# - Content-hash cache: same sha256(pdf_bytes) β†’ reuse prior extraction
# instead of re-running the LLM.
# - Multi-pass per-section extraction: for big PDFs (β‰₯25K chars) the
# single-pass extractor truncates JSON output.
# Split the schema into 7 logical sections, run
# each as its own smaller Gemini call IN PARALLEL
# via asyncio.gather(), merge into one
# HealthPolicy. Each section call carries ~15%
# of the schema β†’ fits comfortably in Gemini's
# output budget. Failure isolation: 6/7 sections
# landing produces a partial extraction far
# better than the heuristic floor.
# ---------------------------------------------------------------------------
# Schema partition for multi-pass extraction. Each entry = (section_name,
# [field names from HealthPolicy]). Field membership mirrors the schema's
# own section comments (`# === 1. Identity`, `# === 4. Waiting periods`,
# etc.) so reasoning about which call missed what is mechanical.
#
# Total fields covered: 39 (= all non-derived HealthPolicy fields). The
# downstream `HealthPolicy(**merged)` happily accepts a dict missing any
# Optional field; the four required identity fields (policy_id,
# insurer_name, insurer_slug, policy_name) are force-filled by the caller
# from already-resolved upload state, NOT relied on from the LLM.
_EXTRACT_SECTIONS: list[tuple[str, list[str]]] = [
("identity", [
"policy_id", "insurer_name", "insurer_slug", "policy_name",
"policy_type", "uin_code",
]),
("eligibility", [
"min_entry_age_years", "max_entry_age_years",
"max_renewal_age_years", "min_child_entry_age_days",
"family_composition_allowed", "residency_requirement",
]),
("financial", [
"sum_insured_options_inr", "premium_payment_modes",
"premium_range_indicative_inr", "premium_payment_term_years",
"grace_period_days", "free_look_period_days",
"no_claim_bonus_pct", "no_claim_bonus_cap_pct",
"deductible_amount_inr", "copayment_pct",
"copayment_trigger_notes",
]),
("waiting_periods", [
"initial_waiting_period_days",
"pre_existing_disease_waiting_months",
"specific_disease_waiting_months",
"specific_diseases_listed",
"maternity_waiting_months",
"sub_limits_waiting_notes",
]),
("coverage", [
"inpatient_hospitalization", "pre_hospitalization_days",
"post_hospitalization_days", "day_care_treatments",
"domiciliary_treatment", "ayush_coverage",
"maternity_coverage", "newborn_coverage",
"organ_donor_expenses", "ambulance_cover",
"critical_illness_cover", "restoration_benefit",
"preventive_health_checkup",
]),
("limits", [
"room_rent_capping", "icu_capping",
"disease_wise_sub_limits",
]),
("network_claims", [
"geographic_coverage", "worldwide_emergency_cover",
"network_hospital_count", "cashless_treatment_supported",
"permanent_exclusions", "temporary_exclusions",
"claim_settlement_ratio_pct",
]),
]
def _schema_excerpt_for_fields(field_names: list[str]) -> str:
"""Like rag.extract.schema_excerpt() but filtered to just these fields.
Keeps the LLM's per-section task tightly scoped + saves input tokens."""
from rag.schema import HealthPolicy as _HP
fields = _HP.model_fields
lines = []
for name in field_names:
info = fields.get(name)
if info is None:
continue
ann_str = (
str(info.annotation)
.replace("typing.", "")
.replace("Optional[", "?")
.replace("]", "")
)
lines.append(f" {name}: {ann_str}")
return "{\n" + "\n".join(lines) + "\n}"
async def _multipass_extract_with_gemini(
*,
text: str,
policy_id: str,
insurer_slug: str,
insurer_name: str,
policy_name: str,
llm_gemini,
set_status,
doc_dir: Path,
) -> Optional[dict]:
"""Multi-pass per-section LLM extraction.
Runs 7 Gemini calls in parallel (one per `_EXTRACT_SECTIONS` entry),
each carrying only ~15% of the HealthPolicy schema. Merges all
successful section results into a single dict suitable for
`HealthPolicy(**out)`. Identity fields force-filled from the
already-resolved upload state.
Returns the merged dict on partial-or-full success (any section
landing counts as success β€” heuristic floor still wins where every
section fails). Returns None ONLY when every single section call
raises / produces no parseable JSON, in which case the caller falls
through to the legacy single-pass + NIM-fallback path.
"""
from rag.extract import (
EXTRACT_SYSTEM,
build_extract_prompt,
json_from_llm_text,
)
from backend.providers.base import ChatMessage
async def _one_section(name: str, fields: list[str]) -> tuple[str, Optional[dict]]:
"""Run one section's Gemini call. Returns (name, dict_or_None)."""
excerpt = _schema_excerpt_for_fields(fields)
prompt = build_extract_prompt(text, excerpt, policy_id)
# Soften the per-section prompt's required-fields stance: only the
# IDENTITY section is shown the four required scalars, every other
# section may legitimately return them as null without that being
# a parse failure (the caller force-fills them anyway).
section_hint = (
f"\n\nIMPORTANT: For THIS call, only extract fields from the "
f"'{name}' section above ({len(fields)} fields). Return JSON "
f"containing ONLY these field names. Omit fields you can't infer."
)
messages = [
ChatMessage(role="system", content=EXTRACT_SYSTEM),
ChatMessage(role="user", content=prompt + section_hint),
]
try:
res = await asyncio.wait_for(
llm_gemini.chat(
messages=messages,
temperature=0.0,
max_tokens=4096, # ~half the single-pass budget; fits one section comfortably
),
timeout=90.0,
)
raw = res.text or ""
# Persist for ops visibility (one file per section).
try:
(doc_dir / f"llm_raw_multipass_{name}.txt").write_text(raw)
except Exception:
pass
try:
data = json_from_llm_text(raw)
except Exception as parse_err:
_log.warning(
"[upload-extract] multipass section '%s' for %s parse failed: %s",
name, policy_id, str(parse_err)[:160],
)
return name, None
# Only keep keys this section was asked to fill β€” drops any
# cross-section spill the model might emit.
kept = {k: v for k, v in (data or {}).items() if k in set(fields)}
_log.info(
"[upload-extract] multipass section '%s' landed %d/%d fields "
"(raw %d chars) for %s",
name, len(kept), len(fields), len(raw), policy_id,
)
return name, kept
except Exception as e: # noqa: BLE001 β€” one section failing is fine
_log.warning(
"[upload-extract] multipass section '%s' for %s FAILED: %s: %s",
name, policy_id, type(e).__name__, str(e)[:160],
)
return name, None
# Surface that multi-pass started, before the first response, so an
# operator polling the status endpoint sees the path was taken.
await set_status(
policy_id,
llm_used="gemini-2.5-flash-multipass(starting)",
llm_response_chars=0,
)
# Fire all 7 sections in parallel.
results = await asyncio.gather(
*[_one_section(name, fields) for name, fields in _EXTRACT_SECTIONS],
return_exceptions=False,
)
# Merge β€” LATER sections do NOT override earlier ones (no section
# claims the same field as another by construction). Drop None /
# empty.
merged: dict = {}
sections_landed: list[str] = []
for name, partial in results:
if not partial:
continue
sections_landed.append(name)
for k, v in partial.items():
if v in (None, "", [], {}):
continue
merged.setdefault(k, v)
if not merged:
_log.warning(
"[upload-extract] multipass: 0/7 sections landed for %s β€” "
"falling through to single-pass", policy_id,
)
return None
# Force-fill identity fields the caller has already resolved.
merged.setdefault("policy_id", policy_id)
merged.setdefault("insurer_slug", insurer_slug)
merged.setdefault("insurer_name", insurer_name)
merged.setdefault("policy_name", policy_name)
_log.info(
"[upload-extract] multipass: merged %d/7 sections (%s) for %s β€” "
"%d total fields populated",
len(sections_landed), ",".join(sections_landed),
policy_id, len(merged),
)
return merged
# ---------------------------------------------------------------------------
def _find_cached_extraction(content_sha: str, current_policy_id: str) -> Optional[Path]:
"""Look for a prior successful extraction of the same content (sha256
match) under a DIFFERENT policy_id. Returns the path to the existing
rag/extracted/<other_pid>.json if found, else None.
This handles the legitimate "user uploads the same PDF twice (maybe
in two browser tabs / two sessions)" case β€” the second upload should
get the identical extracted JSON without paying the LLM cost again.
Fail-closed: any I/O error β†’ None (caller runs a fresh extraction).
"""
if not content_sha:
return None
try:
from backend.config import settings as _settings
for meta_path in uploaded_docs_dir().glob("*/meta.json"):
try:
meta = json.loads(meta_path.read_text())
except Exception:
continue
if meta.get("sha256") != content_sha:
continue
other_pid = meta.get("policy_id") or meta_path.parent.name
if other_pid == current_policy_id:
continue # ignore our own meta if it's already written
cached = _settings.EXTRACTED_DIR / f"{other_pid}.json"
if cached.exists():
return cached
except Exception: # noqa: BLE001
pass
return None
async def extract_one_for_upload(
policy_id: str,
pdf_path: Path,
policy_name: str,
insurer_slug: str,
insurer_name: str,
) -> bool:
"""Run the same LLM extractor used for the catalogued 148 against an
uploaded PDF. On success, writes `rag/extracted/<policy_id>.json` and
invalidates the marketplace grade cache so the next /api/policies/all
+ /api/policies/{id}/scorecard call returns the LLM-graded card.
Status is mirrored to `_UPLOAD_EXTRACTION_STATUS[policy_id]` at every
phase change so the frontend's poll loop sees progress in real time.
Hash-cache short-circuit (Tier-2 ADR-044): if a prior upload with the
same `sha256(pdf_bytes)` already has a successful `rag/extracted/
<other_pid>.json`, that file is COPIED to this policy's path without
re-running the LLM. Same content, same extraction β€” guaranteed.
Returns True iff a HealthPolicy was successfully extracted and written.
Swallows all errors (returns False) β€” a failed LLM pass must NEVER
affect the upload's HTTP response, which has already returned.
"""
_now = lambda: time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
await _set_extraction_status(
policy_id,
status="running",
policy_name=policy_name,
insurer_slug=insurer_slug,
started_at=_now(),
completed_at=None,
completeness_pct=None,
overall_grade=None,
error=None,
)
# ─── Tier-2 hash-cache short-circuit (ADR-044) ─────────────────
# If we've previously extracted a PDF with the same content hash,
# reuse that extraction. Same content β†’ same fields by construction.
# Saves 30-60s + a Gemini call.
try:
meta_path = _doc_dir(policy_id) / "meta.json"
if meta_path.exists():
meta = json.loads(meta_path.read_text())
content_sha = meta.get("sha256") or ""
if content_sha:
cached_path = _find_cached_extraction(content_sha, policy_id)
if cached_path is not None:
_log.info(
"[upload-extract] hash-cache HIT for %s "
"(reusing %s) β€” skipping LLM",
policy_id, cached_path.name,
)
# Copy the prior extraction to this policy's path so
# /api/policies/{id}/scorecard finds it under the
# right id.
from backend.config import settings as _settings_a
_settings_a.EXTRACTED_DIR.mkdir(parents=True, exist_ok=True)
dest = _settings_a.EXTRACTED_DIR / f"{policy_id}.json"
dest.write_text(cached_path.read_text())
# Bust the #40 grade cache so /api/policies/all
# picks up the new card BEFORE we resolve the
# catalogue scorecard for status reporting.
try:
import backend.main as _bm
with _bm._MG_LOCK:
_bm._MG_CACHE["sig"] = None
_bm._MG_CACHE["index"] = None
except Exception:
pass
# Mark status complete with the EXACT completeness +
# grade the chat card will render. Mirror the live
# scorecard endpoint's resolution order (catalogue
# primary β†’ bare-policy fallback with insurer_reviews).
# Earlier draft of this branch called
# build_scorecard(_doc, profile=None) without reviews
# and read `.overall_grade` instead of `.grade`, so
# cache-hit uploads always reported compβ‰ˆ17.4 + grade
# None even when the actual card was 47.8% / grade C
# (the 2026-05-27 multi-PDF audit caught this).
_final_comp = None
_final_grade = None
try:
from backend.scorecard import build_scorecard as _bs
# PRIMARY β€” catalogue scorecard (matches /api/policies/{id}/scorecard).
_sc = _bm._catalogue_scorecard(policy_id, None)
if _sc is None:
# FALLBACK β€” bare-policy build_scorecard with reviews.
_doc = json.loads(dest.read_text())
_ir = None
if insurer_slug:
from backend.config import settings as _settings_b
_rp = _settings_b.DATA_DIR / "reviews" / f"{insurer_slug}.json"
if _rp.exists():
try:
_ir = json.loads(_rp.read_text())
except Exception:
_ir = None
_sc = _bs(_doc, insurer_reviews=_ir, profile=None)
if _sc is not None:
_final_comp = float(_sc.data_completeness_pct)
_final_grade = _sc.grade # NOT overall_grade
except Exception as _sc_err: # noqa: BLE001
_log.warning(
"[upload-extract] cache-hit status resolve failed for %s: %s",
policy_id, _sc_err,
)
await _set_extraction_status(
policy_id, status="complete",
completed_at=_now(),
completeness_pct=_final_comp,
overall_grade=_final_grade,
# Provenance β€” operator sees WHY no LLM ran: the
# SHA256 of pdf_bytes matched a prior extraction
# so we reused it. Distinct from the gemini-2.5-
# flash#N / nim-fallback labels.
llm_used="hash-cache",
llm_response_chars=len((dest.read_text() or "")),
)
return True
except Exception as e: # noqa: BLE001 β€” cache miss is fine, run fresh
_log.debug("[upload-extract] hash-cache lookup failed: %s", e)
try:
# Lazy imports β€” these touch the LLM client + DuckDB; we don't
# want to pay that cost at module import time.
from rag.extract import (
EXTRACT_SYSTEM,
build_extract_prompt,
schema_excerpt,
read_full_text,
json_from_llm_text,
upsert_policy,
)
from rag.schema import HealthPolicy
from backend.providers.base import ChatMessage
# 2026-05-27 β€” switched from NIM (get_brain_llm) to Gemini
# 2.5-flash with native JSON-mode (response_mime_type=
# application/json). Gemini is the steady-state primary
# chat brain (ADR-040) and gives schema-locked structured
# output, which is exactly what the EXTRACT prompt needs.
# NIM stays as the fallback for the (a) missing-GOOGLE_API_KEY
# case or (b) Gemini 5xx/quota path. Same prompt, same schema,
# same downstream HealthPolicy parse + writes β€” only the
# transport changes.
from backend.providers.google_gemini_llm import GoogleGeminiLLM
from backend.providers.nvidia_nim_llm import get_brain_llm
_log.info(
"[upload-extract] starting LLM extraction for %s (insurer=%s)",
policy_id, insurer_slug,
)
# Read text from the persisted PDF (same as extract_one).
try:
text = read_full_text(pdf_path)
except Exception as e: # noqa: BLE001
_log.warning(
"[upload-extract] read_full_text failed %s: %s: %s",
policy_id, type(e).__name__, e,
)
await _set_extraction_status(
policy_id, status="failed",
completed_at=_now(),
error=f"read_full_text: {type(e).__name__}: {str(e)[:160]}",
)
return False
prompt = build_extract_prompt(text, schema_excerpt(), policy_id)
messages = [
ChatMessage(role="system", content=EXTRACT_SYSTEM),
ChatMessage(role="user", content=prompt),
]
# ─── Multi-pass per-section extraction (ADR-044 Β§D6, 2026-05-27) ───
# For large PDFs (β‰₯25K chars), the single-pass extraction reliably
# truncates because Gemini 2.5-flash can't fit ~40 schema fields
# with verbatim quotes into one parseable JSON. Split the schema
# into 7 logical sections and run each as its own smaller Gemini
# call in PARALLEL via asyncio.gather. Each call carries ~15% of
# the schema β†’ fits comfortably in Gemini's output budget even
# for 8 MB PDFs. Successful sections merge into a partial
# HealthPolicy; missing sections fall back to whatever the
# heuristic baseline supplies. Significantly more reliable than
# one giant call on large/dense PDFs (Test Policy.pdf 8 MB was
# the trigger).
_MULTIPASS_THRESHOLD_CHARS = 25_000
if len(text) >= _MULTIPASS_THRESHOLD_CHARS:
try:
_mp_data = await _multipass_extract_with_gemini(
text=text,
policy_id=policy_id,
insurer_slug=insurer_slug,
insurer_name=insurer_name,
policy_name=policy_name,
llm_gemini=GoogleGeminiLLM(timeout=120.0),
set_status=_set_extraction_status,
doc_dir=_doc_dir(policy_id),
)
if _mp_data:
try:
policy = HealthPolicy(**_mp_data)
raw = json.dumps(_mp_data, ensure_ascii=False)
await _set_extraction_status(
policy_id,
llm_used="gemini-2.5-flash-multipass",
llm_response_chars=len(raw),
)
_log.info(
"[upload-extract] multi-pass produced valid HealthPolicy "
"for %s (%d fields in payload)", policy_id, len(_mp_data),
)
except Exception as _mp_parse_err: # noqa: BLE001
_log.warning(
"[upload-extract] multi-pass parse failed for %s β€” "
"falling through to single-pass: %s",
policy_id, _mp_parse_err,
)
policy = None
except Exception as _mp_err: # noqa: BLE001 β€” fall through to single-pass
_log.warning(
"[upload-extract] multi-pass extraction errored for %s "
"(falling through to single-pass): %s: %s",
policy_id, type(_mp_err).__name__, str(_mp_err)[:200],
)
policy = None
else:
policy = None # single-pass path below will fill
# Tier-1 Gemini-stability hardening (ADR-044, 2026-05-27):
# 1. Bumped retry count from 1 β†’ 3 on the Gemini primary path
# with jittered exp backoffs (2s/4s/8s Β±25%). Mirrors the
# _TRANSIENT_RETRY_BACKOFFS_STICKY pattern from single_brain
# (ADR-042) that proved effective for Gemini's 429/5xx tail.
# 2. NIM is the FINAL fallback after Gemini exhausts retries.
# 3. Raw LLM responses are captured to disk on each failed
# attempt β€” UPLOADED_DOCS_DIR/<pid>/llm_raw_<n>.txt β€” so the
# operator can SEE why a Gemini call failed (was it a 429,
# truncated mid-emission, returned with markdown fences,
# etc.). Previously the failure was a black box.
import random as _random
_GEMINI_BACKOFFS = (2.0, 4.0, 8.0)
_GEMINI_JITTER = 0.25
def _jit(b: float) -> float:
return b * _random.uniform(1 - _GEMINI_JITTER, 1 + _GEMINI_JITTER)
llm_gemini = GoogleGeminiLLM(timeout=180.0)
llm_nim = get_brain_llm()
raw = ""
policy: Optional[HealthPolicy] = None
attempts: list[tuple[object, str]] = [
(llm_gemini, "gemini-2.5-flash#1"),
(llm_gemini, "gemini-2.5-flash#2"),
(llm_gemini, "gemini-2.5-flash#3"),
(llm_nim, "nim-fallback"),
]
for attempt, (llm, label) in enumerate(attempts):
try:
# Backoff BEFORE every attempt after the first Gemini try.
# First attempt: no wait. Subsequent Gemini attempts: jittered
# exp. NIM fallback: no extra backoff (Gemini already gave up).
if 1 <= attempt <= len(_GEMINI_BACKOFFS):
_bo = _jit(_GEMINI_BACKOFFS[attempt - 1])
_log.info(
"[upload-extract] sleeping %.1fs before %s retry",
_bo, label,
)
await asyncio.sleep(_bo)
attempt_timeout = 180 if label.startswith("gemini") else 120
chat_kwargs = {
"messages": messages,
"temperature": 0.0,
"max_tokens": 8192,
}
res = await asyncio.wait_for(
llm.chat(**chat_kwargs),
timeout=attempt_timeout,
)
raw = res.text
_log.info(
"[upload-extract] %s returned %d chars; parsing JSON…",
label, len(raw or ""),
)
# Record provider + response size on the status dict so the
# operator can prove WHICH LLM landed the extraction without
# needing HF Space stdout access.
await _set_extraction_status(
policy_id,
llm_used=label,
llm_response_chars=len(raw or ""),
)
# Persist raw response for ops visibility (always β€” both
# successful + failed parses get a copy).
try:
(_doc_dir(policy_id) / f"llm_raw_{attempt + 1}_{label.replace('#','-')}.txt").write_text(raw or "")
except Exception:
pass
data = json_from_llm_text(raw)
# Force-fill identity fields (REQUIRED by the schema, the
# LLM frequently emits null for these because they're not
# in the truncated text). Use what the upload path
# already resolved.
if not data.get("policy_id"):
data["policy_id"] = policy_id
if not data.get("insurer_slug"):
data["insurer_slug"] = insurer_slug
if not data.get("insurer_name"):
data["insurer_name"] = insurer_name
if not data.get("policy_name"):
data["policy_name"] = policy_name
policy = HealthPolicy(**data)
break
except Exception as e: # noqa: BLE001
_log.warning(
"[upload-extract] attempt %d (%s) failed for %s: %s: %s",
attempt + 1, label, policy_id, type(e).__name__, str(e)[:200],
)
continue
if policy is None:
_log.warning(
"[upload-extract] no policy extracted for %s after retries; "
"card stays on heuristic record", policy_id,
)
# KI-333 (2026-05-27) β€” even on total LLM failure, the heuristic
# record.json already produced a card (now ~65-70% post KI-332).
# Surface the SAME completeness + grade the chat card will show,
# so the operator-visible status matches user-visible card.
# Previously the fail path left comp=None/grade=None while the
# card showed C/65% β€” confusing parity gap.
_final_comp = None
_final_grade = None
try:
# Bust the marketplace grade cache so _catalogue_scorecard
# rebuilds with the new heuristic record.
import backend.main as _bm_f
with _bm_f._MG_LOCK:
_bm_f._MG_CACHE["sig"] = None
_bm_f._MG_CACHE["index"] = None
_sc_f = _bm_f._catalogue_scorecard(policy_id, None)
if _sc_f is None:
# Fallback path β€” bare scorecard on the heuristic record.json
# if catalogue indices haven't rebuilt yet.
from backend.scorecard import build_scorecard as _bs_f
rec_path = _doc_dir(policy_id) / "record.json"
if rec_path.exists():
rec = json.loads(rec_path.read_text())
# Flatten cell-shape {value, ...} dicts to scalars for build_scorecard.
flat = {}
for k, v in rec.items():
if isinstance(v, dict) and "value" in v:
flat[k] = v["value"]
else:
flat[k] = v
flat.setdefault("policy_id", policy_id)
flat.setdefault("insurer_slug", insurer_slug)
flat.setdefault("insurer_name", insurer_name)
flat.setdefault("policy_name", policy_name)
_ir_f = None
if insurer_slug:
from backend.config import settings as _settings_f
_rp_f = _settings_f.DATA_DIR / "reviews" / f"{insurer_slug}.json"
if _rp_f.exists():
try: _ir_f = json.loads(_rp_f.read_text())
except Exception: _ir_f = None
_sc_f = _bs_f(flat, insurer_reviews=_ir_f, profile=None)
if _sc_f is not None:
_final_comp = float(_sc_f.data_completeness_pct)
_final_grade = _sc_f.grade # NOT overall_grade
except Exception as _sc_err_f: # noqa: BLE001
_log.warning(
"[upload-extract] fail-path heuristic-scorecard resolve "
"failed for %s: %s", policy_id, _sc_err_f,
)
await _set_extraction_status(
policy_id, status="failed",
completed_at=_now(),
completeness_pct=_final_comp,
overall_grade=_final_grade,
error="LLM returned no valid HealthPolicy after primary + fallback retries",
)
return False
# Write rag/extracted/<policy_id>.json β€” same shape as catalogued.
from backend.config import settings as _settings
_settings.EXTRACTED_DIR.mkdir(parents=True, exist_ok=True)
out_json = _settings.EXTRACTED_DIR / f"{policy_id}.json"
out_json.write_text(policy.model_dump_json(indent=2))
# ALSO merge the LLM output INTO the persisted record.json so the
# marketplace catalogue's _load_curated_facts() pass sees the
# combined heuristic-baseline + LLM-extracted fields (rather than
# just the LLM payload, which may be sparser than the heuristic
# for non-standard PDFs). Heuristic stays as the fallback; LLM
# values override where present + non-empty. This is the same
# "curated overlay" model the catalogued 148 use via
# 40-data/policy_facts/.
try:
doc_dir = _doc_dir(policy_id)
rec_path = doc_dir / "record.json"
if rec_path.exists():
existing = json.loads(rec_path.read_text())
llm_dump = policy.model_dump()
# Carry over LLM scalar values + verbatim source_quotes
# into the heuristic record. Skip null/empty/empty-list
# so heuristic stays intact where the LLM was silent.
for k, v in llm_dump.items():
if k in ("policy_id", "policy_name", "insurer_slug", "insurer_name"):
continue
if v in (None, "", [], {}):
continue
# Already in cell-shape ({value, source_quote, ...}) on
# the heuristic side; lift the LLM scalar into the
# value field, preserving the heuristic's source_quote
# / source_pdf_path if the LLM didn't supply one.
if isinstance(existing.get(k), dict) and "value" in existing[k]:
existing[k] = {**existing[k], "value": v}
else:
existing[k] = v
# Also carry over the LLM's confidence + insurer_name if
# detected, both for downstream provenance.
if getattr(policy, "extraction_confidence_pct", None) is not None:
existing["_llm_extraction_confidence_pct"] = policy.extraction_confidence_pct
tmp = rec_path.with_suffix(".json.tmp")
tmp.write_text(json.dumps(existing, indent=2, ensure_ascii=False, default=str))
tmp.replace(rec_path)
_log.info(
"[upload-extract] merged LLM extraction into record.json for %s",
policy_id,
)
except Exception as _merge_err: # noqa: BLE001
_log.warning(
"[upload-extract] record.json merge failed for %s: %s",
policy_id, _merge_err,
)
# Persist into DuckDB so admin / re-render paths see the new card.
try:
upsert_policy(
policy,
source_pdf_path=str(pdf_path),
source_pdf_url="",
)
except Exception as e: # noqa: BLE001 β€” DB write is best-effort
_log.warning(
"[upload-extract] upsert_policy failed for %s: %s: %s",
policy_id, type(e).__name__, e,
)
# Invalidate the #40 marketplace grade cache so the next
# /api/policies/all / scorecard call returns the LLM-graded card.
try:
import backend.main as _bm
with _bm._MG_LOCK:
_bm._MG_CACHE["sig"] = None
_bm._MG_CACHE["index"] = None
except Exception as e: # noqa: BLE001 β€” cache miss is fine
_log.debug(
"[upload-extract] could not invalidate _MG_CACHE for %s: %s",
policy_id, e,
)
_log.info(
"[upload-extract] OK %s (extraction_confidence_pct=%s)",
policy_id, getattr(policy, "extraction_confidence_pct", "n/a"),
)
# Resolve the freshly-graded card so the status can report the
# actual completeness + grade the CHAT CARD will show. Mirror
# the /api/policies/{id}/scorecard endpoint's resolution order
# byte-for-byte: PRIMARY path is the marketplace catalogue
# scorecard (_catalogue_scorecard) which folds in the heuristic
# record.json + curated overlay + insurer reviews + product
# dedup β€” that's what produces the 52.2%/grade-C the user sees
# on the inline card. The fallback path (build_scorecard on the
# bare extracted JSON) reads 17.4% because it doesn't see the
# heuristic merge or reviews-driven sub-scores.
_final_completeness = None
_final_grade = None
try:
# PRIMARY β€” same call /api/policies/{id}/scorecard makes first.
# Bust the marketplace grade cache first (we invalidated _MG_CACHE
# above, but _catalogue_indices builds off the latest record.json
# + extracted JSON on each call so this becomes a fresh build).
import backend.main as _bm2
_sc = _bm2._catalogue_scorecard(policy_id, None)
if _sc is None:
# FALLBACK β€” same path the endpoint falls through to for
# non-catalogued ids. For user uploads this is just defensive;
# the upload IS a marketplace card by design (record.json is
# persisted under UPLOADED_DOCS_DIR/<pid>/).
from backend.scorecard import build_scorecard as _bs
_doc_for_sc = json.loads(out_json.read_text())
_ir = None
if insurer_slug:
_rp = _settings.DATA_DIR / "reviews" / f"{insurer_slug}.json"
if _rp.exists():
try:
_ir = json.loads(_rp.read_text())
except Exception:
_ir = None
_sc = _bs(_doc_for_sc, insurer_reviews=_ir, profile=None)
if _sc is not None:
_final_completeness = float(_sc.data_completeness_pct)
# 2026-05-27 β€” Scorecard dataclass attr is `.grade`, NOT
# `.overall_grade` (only the ScorecardResponse wire field
# is renamed). Earlier draft of this resolver used the
# wire name and silently logged grade=None on every
# upload β€” the parity audit caught it.
_final_grade = _sc.grade
except Exception as _sc_err: # noqa: BLE001
_log.warning(
"[upload-extract] status-card resolve failed for %s: %s: %s",
policy_id, type(_sc_err).__name__, str(_sc_err)[:160],
)
await _set_extraction_status(
policy_id, status="complete",
completed_at=_now(),
completeness_pct=_final_completeness,
overall_grade=_final_grade,
)
return True
except Exception as e: # noqa: BLE001 β€” top-level catch-all
try:
await _set_extraction_status(
policy_id, status="failed",
completed_at=_now(),
error=f"{type(e).__name__}: {str(e)[:200]}",
)
except Exception:
pass
# fall through to the existing _log.warning that follows
_log.warning(
"[upload-extract] unexpected failure for %s: %s: %s",
policy_id, type(e).__name__, str(e)[:400],
)
return False
async def backfill_extractions(*, force: bool = False) -> dict:
"""Run LLM-assisted extraction for every persisted upload that doesn't
yet have a corresponding `rag/extracted/<policy_id>.json` (or force=True
to re-extract every upload). Fires sequentially so we don't fan-out the
LLM chain. Returns a {processed, skipped, failed} summary.
Designed to be called once at server startup (to upgrade old uploads
that were persisted before the LLM-extraction pipeline was wired) AND
as the backing for POST /api/admin/upload/reextract.
"""
from backend.config import settings as _settings
summary: dict = {"processed": 0, "skipped": 0, "failed": 0, "policies": []}
records = load_persisted_records()
for policy_id, record in records.items():
try:
out_json = _settings.EXTRACTED_DIR / f"{policy_id}.json"
if out_json.exists() and not force:
summary["skipped"] += 1
continue
pdf_path = _doc_dir(policy_id) / "source.pdf"
if not pdf_path.exists():
_log.warning(
"[backfill] missing source.pdf for %s β€” skipping", policy_id,
)
summary["skipped"] += 1
continue
policy_name = record.get("policy_name") or policy_id
insurer_slug = record.get("insurer_slug") or UPLOAD_INSURER_SLUG
insurer_name = record.get("insurer_name") or detected_insurer_name(insurer_slug)
ok = await extract_one_for_upload(
policy_id=policy_id,
pdf_path=pdf_path,
policy_name=policy_name,
insurer_slug=insurer_slug,
insurer_name=insurer_name,
)
if ok:
summary["processed"] += 1
summary["policies"].append(policy_id)
else:
summary["failed"] += 1
except Exception as e: # noqa: BLE001
_log.warning(
"[backfill] failed for %s: %s: %s",
policy_id, type(e).__name__, str(e)[:200],
)
summary["failed"] += 1
_log.info(
"[backfill] done: processed=%d skipped=%d failed=%d",
summary["processed"], summary["skipped"], summary["failed"],
)
return summary