InsuranceBot / backend /brain_tools.py
rohitsar567's picture
feat!: remove cross-session profile recall (ADR-043) β€” net βˆ’3700 LOC
6d5684e
Raw
History Blame Contribute Delete
93.7 kB
"""Tool functions for single_brain.py (Path B β€” single-LLM architecture).
The Gemini Flash model exposed in `backend/single_brain.py` calls these
three tools to (a) persist captured profile fields, (b) retrieve policy
chunks from Chroma, and (c) mark which policies it has recommended on
the current turn so follow-ups like "tell me more about #2" can resolve.
Each function:
* Takes plain JSON-serialisable inputs (str / int / list[str]).
* Returns a plain JSON-serialisable dict that gets fed back to the LLM
on the next iteration of the function-calling loop.
* Never raises β€” failures are surfaced via {"ok": False, "error": "..."}
so the LLM can decide whether to retry or recover.
The Gemini function-calling DSL (JSON Schema-flavoured) for these three
tools is generated by `single_brain.TOOL_SCHEMAS` from this module's
metadata.
═══════════════════════════════════════════════════════════════════════════
SLOT_UNION β€” SINGLE SOURCE OF TRUTH FOR CAPTURED FIELDS (B6, 2026-05-15)
═══════════════════════════════════════════════════════════════════════════
SLOT_UNION enumerates every captured field that influences EITHER the
recommendation pipeline (retrieval query + scorecard) OR the pricing
pipeline (premium_calculator.estimate / bulk_estimate). It is the contract
between brain_tools (capture surface), single_brain (LLM tool calls),
premium_calculator (pricing inputs) and scorecard (match scoring).
Slot β†’ consumer matrix:
RECOMMENDATION SLOTS (in _REQUIRED_FOR_READY β€” hard gate)
name β†’ profile identity (no pricing influence)
age β†’ retrieval query + scorecard + pricing (age band)
dependents β†’ retrieval query + scorecard + pricing (family loading)
location_tier β†’ retrieval query + scorecard + pricing (location loading)
income_band β†’ retrieval query + scorecard
primary_goal β†’ retrieval query + scorecard
health_conditions β†’ retrieval query + scorecard + pricing (health loading)
PRICING-ONLY SLOTS (B5 + B6 additions β€” SOFT capture, post-recap)
budget_band β†’ pricing band match
desired_sum_insured_inr β†’ pricing SI override (per-policy estimate)
existing_cover_inr β†’ pricing (existing-cover discount loading)
FAMILY-DETAIL SLOTS (used by pricing if dependents includes parents)
parents_to_insure β†’ triggers parents_loading branch
parents_age_max β†’ pricing (parents age loading 1.0Γ— / 1.4Γ— / 1.8Γ—)
parents_has_ped β†’ pricing (PED loading inflation for parents)
D2 ADDITIONS (2026-05-15 β€” copay + family medical history)
copay_pct β†’ pricing (copay discount 1.0Γ— / 0.95Γ— / 0.88Γ— / 0.80Γ—)
family_medical_history β†’ pricing (family-history loading) + retrieval boost
KI-275 (2026-05-15 β€” smoker / tobacco)
smoker β†’ pricing (smoker_loading 1.0Γ— / 1.40Γ—, +30-50%)
Total: 16 slots. `gender` is tolerated by save_profile_field for forward
compat but is NOT on the Profile dataclass today; it does not appear in
SLOT_UNION because no consumer reads it.
"""
from __future__ import annotations
import json as _json
import logging
import time
from pathlib import Path
from typing import Any, Optional
from backend.config import settings
from backend.policy_identity import clean_display_policy_name
_log = logging.getLogger(__name__)
# ───────────────────────────────────────────────────────────────────────────
# KI-278 (2026-05-16) β€” policy-fact enrichment for the eligibility/ranking
# filter. The Chroma chunk only carries citation metadata; the structural
# facts the eligibility filter needs (top-up signal, sum-insured options,
# mandatory co-pay) live in 40-data/policy_facts/*.json.
#
# IMPORTANT: this code only READS those JSON files and only reads STABLE
# SCHEMA KEYS (policy_type_indemnity_or_fixed / deductible_amount /
# co_payment_pct / sum_insured_options). The concurrent scorecard-repair
# process rewrites the *values*; the *field names* are the contract and do
# not change. We never write to policy_facts/*.json.
#
# A short in-process cache keeps this off the hot path (the catalog is ~250
# files; we only ever touch the ≀8 retrieved per turn, and memoise stems).
# ───────────────────────────────────────────────────────────────────────────
_FACTS_DIR = settings.DATA_DIR / "policy_facts"
_DOCTYPE_SUFFIXES = ("__wordings", "__brochure", "__cis", "__prospectus")
# Width of the Chroma candidate pool pulled before eligibility +
# scorecard-aware ranking β€” wide enough that a strong policy ranked only
# mid-cosine for a generic query still enters contention. The LLM still
# only sees the top-k best-fit survivors.
_RECALL_POOL = 40
_FACT_KEYS = (
"policy_type_indemnity_or_fixed",
# Raw catalog type key β€” some curated files carry the product type
# only under `policy_type` (e.g. "hospital_cash") with no
# `policy_type_indemnity_or_fixed`; the fixed-benefit gate needs it.
"policy_type",
"deductible_amount",
"co_payment_pct",
"sum_insured_options",
# Eligibility-gate keys (read-only):
# uin_code β†’ canonical dedup identity (1 UIN = 1 product;
# collapses doctype-sibling/rename duplicates).
# max_entry_age β†’ hard eligibility for the insured person; the
# curated value is authoritative and overrides
# a missing/None Chroma chunk field.
# maternity_coverage β†’ required-feature gate: an explicit maternity /
# newborn_coverage newborn need ranks confirmed plans first.
"uin_code",
"max_entry_age",
"maternity_coverage",
"newborn_coverage",
)
# Bug #44 (2026-05-19) β€” DECISION-CRITICAL coverage fields that MUST be
# resolved from the SAME canonical curated entry the marketplace
# scorecard / #31 profile-summary path uses, so a verbal answer (or a
# comparison TABLE the LLM builds from get_policy_facts) can never
# contradict the scorecard card for the same policy.
#
# Root cause: each product has a base 40-data/policy_facts/<id>.json AND
# doctype siblings (__wordings/__cis/__brochure/__prospectus). The
# scorecard path resolves via main._load_curated_facts() (KI-219/KI-251
# canonical precedence). get_policy_facts previously surfaced facts via
# _load_policy_facts() β€” a DIFFERENT _candidate_stems resolver whose
# 7-key _FACT_KEYS doesn't even include PED β€” so the two paths could
# read different files and disagree on PED waiting (a live audit found
# the comparison table said 24mo while the #31 card said "0 months").
#
# Fix: these specific fields are taken from the canonical curated entry
# (`_curated_facts_all()[pid]`, the EXACT dict main._load_curated_facts()
# feeds build_scorecard) so both surfaces agree BY CONSTRUCTION. This
# mirrors the existing _scorecard_signal β†’ marketplace_grade single-
# source pattern (#40 / KI-219). The guard
# tests/test_policy_facts_source_consistency.py asserts catalogue-wide
# agreement.
_DECISION_CRITICAL_FACT_KEYS = (
"pre_existing_disease_waiting_months",
"initial_waiting_period_days",
"copayment_pct",
"room_rent_capping",
"claim_settlement_ratio",
)
def canonical_decision_facts(policy_id: str) -> dict:
"""Decision-critical coverage facts for `policy_id`, resolved from the
SAME canonical curated entry the marketplace scorecard path uses
(main._load_curated_facts via _curated_facts_all). Returns only the
keys in _DECISION_CRITICAL_FACT_KEYS that have a non-empty value.
Single source of truth (Bug #44): the scorecard / #31 path and
get_policy_facts BOTH read these fields from this one canonical entry,
so they cannot drift. Read-only; never raises (returns {} on any
failure so the tool degrades gracefully)."""
pid = (policy_id or "").strip()
if not pid:
return {}
try:
all_cur = _curated_facts_all() or {}
except Exception: # noqa: BLE001 β€” curated layer optional
return {}
entry = all_cur.get(pid)
if entry is None:
# Curated layer registers every doctype-suffix permutation +
# each sibling stem/policy_id pointing at the canonical entry
# (main._load_curated_facts Pass-2). Fall back to the canonical
# (doctype-stripped) form for callers holding a suffixed id.
for suf in _DOCTYPE_SUFFIXES:
if pid.endswith(suf):
entry = all_cur.get(pid[: -len(suf)])
break
if not isinstance(entry, dict):
return {}
out: dict = {}
for k in _DECISION_CRITICAL_FACT_KEYS:
v = entry.get(k)
# _load_curated_facts already unwraps {value, source_*} to scalar;
# accept the wrapped shape too, defensively.
if isinstance(v, dict) and "value" in v:
v = v.get("value")
if v in (None, "", []):
continue
out[k] = v
return out
_facts_cache: dict[str, dict] = {}
def _unwrap(v: Any) -> Any:
"""policy_facts values are `{value, source_pdf_path, source_quote}`
wrappers OR bare scalars. Return the underlying value either way."""
if isinstance(v, dict) and "value" in v:
return v["value"]
return v
def _candidate_stems(policy_id: str) -> list[str]:
"""A retrieved chunk's policy_id may be the canonical stem
(`insurer__product`) OR a doctype-suffixed stem
(`insurer__product__cis`). Try the exact id first, then the canonical
form, then every doctype sibling β€” first existing file wins.
"""
pid = (policy_id or "").strip()
if not pid:
return []
stems = [pid]
base = pid
for suf in _DOCTYPE_SUFFIXES:
if base.endswith(suf):
base = base[: -len(suf)]
break
if base != pid:
stems.append(base)
for suf in _DOCTYPE_SUFFIXES:
cand = base + suf
if cand not in stems:
stems.append(cand)
return stems
_extraction_cache: dict = {}
def _has_extraction(policy_id: str) -> bool:
"""True iff this policy has an LLM-extracted corpus file β€” the EXACT
renderability rule the marketplace uses (main builds its card set from
settings.EXTRACTED_DIR/*.json). A policy with curated facts but no
extracted file renders as a BROKEN card: raw policy_id as the title,
grade "N/A", "No extraction available for this policy.", "Data not
indexed" (/api/bulk-scorecard). Such a policy must NEVER be quality-
seeded or cited. Canonical-stem aware (doctype siblings count); cached
incl. negatives."""
pid = (policy_id or "").strip()
if not pid:
return False
if pid in _extraction_cache:
return _extraction_cache[pid]
ok = False
try:
for stem in _candidate_stems(pid):
if (settings.EXTRACTED_DIR / f"{stem}.json").exists():
ok = True
break
except Exception: # noqa: BLE001 β€” predicate must never break retrieval
ok = False
_extraction_cache[pid] = ok
return ok
def _load_policy_facts(policy_id: str) -> dict:
"""Return {fact_key: value} for a policy_id, or {} when no facts file
exists / is unreadable. Cached per policy_id (incl. negative cache)."""
if policy_id in _facts_cache:
return _facts_cache[policy_id]
out: dict = {}
try:
for stem in _candidate_stems(policy_id):
fp = _FACTS_DIR / f"{stem}.json"
if not fp.exists():
continue
try:
d = _json.loads(fp.read_text())
except Exception: # noqa: BLE001 β€” corrupt mid-repair β†’ skip
continue
for k in _FACT_KEYS:
if k in d and k not in out:
out[k] = _unwrap(d[k])
# Stop once we have the decision-critical signals. KI-279: a
# type signal is "have it" if EITHER the canonical key OR the
# raw `policy_type` key is present (some files only carry the
# latter), so the fixed-benefit gate is never starved.
# KI-280: also require the unified-gate signals (uin_code +
# max_entry_age) before breaking so the eligibility/dedup rules
# are not starved when a sibling doctype file carries them. The
# candidate-stem list is short (≀5) so iterating it fully is
# cheap; this just stops us breaking after stem #1 when the UIN
# / entry-age lives in stem #2.
_have_type = (
"policy_type_indemnity_or_fixed" in out
or "policy_type" in out
)
if (
_have_type
and "sum_insured_options" in out
and "uin_code" in out
and "max_entry_age" in out
):
break
except Exception as e: # noqa: BLE001 β€” enrichment must never break retrieval
_log.warning("KI-278 _load_policy_facts(%s) failed: %s", policy_id, e)
out = {}
_facts_cache[policy_id] = out
return out
_curated_all_cache: dict = {} # single-slot cache for the full curated layer
_reviews_cache: dict = {}
def _curated_facts_all() -> dict:
"""The full curated-facts layer β€” the same source the marketplace
scores from (main._load_curated_facts, KI-219/251 canonical
precedence; lazy import since main is loaded at request time). Using
it here makes a policy's recommendation grade identical to its
marketplace grade. The 7-key `_load_policy_facts` is the input for the
eligibility/dedup gate, not for grading."""
if "d" not in _curated_all_cache:
from backend.main import _load_curated_facts # lazy: avoids cycle
_curated_all_cache["d"] = _load_curated_facts()
return _curated_all_cache["d"]
def _insurer_reviews(slug: str) -> Optional[dict]:
"""Insurer reviews (40-data/reviews/<slug>.json) passed to
build_scorecard β€” drives the claim-experience sub-score; the same
source the marketplace uses."""
if not slug:
return None
if slug in _reviews_cache:
return _reviews_cache[slug]
ir = None
try:
rp = settings.DATA_DIR / "reviews" / f"{slug}.json"
if rp.exists():
ir = _json.loads(rp.read_text())
except Exception: # noqa: BLE001 β€” reviews optional
ir = None
_reviews_cache[slug] = ir
return ir
def _scorecard_signal(policy_id: str, profile=None) -> dict:
"""{_grade, _overall_score} for policy_id β€” the SAME grade its
marketplace card shows (#40 SINGLE SOURCE OF TRUTH, 2026-05-18).
Delegates to backend.main.marketplace_grade, which derives the grade
from the ONE marketplace catalogue computation
(backend.main._marketplace_catalogue) using UIN-canonical resolution:
a marketing-rename / KI-145 variant id resolves onto its canonical
card. There is no longer a parallel doctype-rank/_merge
re-implementation here, so the cited-card grade cannot drift from the
marketplace card grade β€” for ALL 148 by construction, asserted by
tests/test_scorecard_parity.py.
`profile` is accepted for call-site compatibility. The parity-relevant
grade LETTER (what _recommendation_fit gates on) is the profile-neutral
marketplace grade, identical to the /api/policies/all catalogue view.
Read-only; returns {} only when the policy is unknown to the
marketplace (ranking still works off co-pay + SI headroom + cosine)."""
try:
from backend.main import marketplace_grade # lazy: avoids cycle
sig = marketplace_grade(policy_id)
if sig and sig.get("_grade"):
return {
"_grade": sig.get("_grade"),
"_overall_score": sig.get("_overall_score"),
}
return {}
except Exception: # noqa: BLE001 β€” scorecard optional; ranking degrades gracefully
return {}
# ---- accepted fields for save_profile_field --------------------------------
# Mirrors the slot list in sales_brain._REQUIRED_FOR_READY plus the
# nice-to-have fields the LLM may capture opportunistically. `gender`
# is listed in the spec but is NOT a Profile dataclass field today β€”
# we silently no-op on it rather than rejecting (forward-compat).
_ACCEPTED_FIELDS = {
"name",
"age",
"dependents",
"location_tier",
"income_band",
"primary_goal",
"health_conditions",
"existing_cover_inr",
"budget_band",
"budget_inr", # #64 β€” exact β‚Ή/yr; set as a side-effect of budget_band
"desired_sum_insured_inr", # SOFT capture (pricing input, post-recap)
# Family-detail pricing inputs (B6) β€” already on the Profile dataclass
# via needs_finder.Profile (parents_to_insure / parents_age_max /
# parents_has_ped). Listed here so save_profile_field can persist them
# when Gemini extracts them post-recap.
"parents_to_insure",
"parents_age_max",
"parents_has_ped",
# D2 (2026-05-15) β€” coupled additions: co-pay tolerance + family medical history
"copay_pct",
"family_medical_history",
# KI-275 (2026-05-15) β€” smoker / tobacco use, +30-50% premium loading.
"smoker",
"gender", # tolerated; not persisted unless Profile gains the field
}
# These are the slots the brain MUST capture before recommending β€” same
# list as sales_brain._REQUIRED_FOR_READY. Kept inline (not imported)
# to avoid coupling single_brain to sales_brain.
_REQUIRED_FOR_READY = (
"name",
"age",
"dependents",
"location_tier",
"income_band",
"primary_goal",
"health_conditions",
)
# ───────────────────────────────────────────────────────────────────────────
# Bug #108 + #110 (2026-05-16) β€” POST-RECAP pricing & family-history bundle.
#
# Bug #108: the RULE 2.5 bundle (SI / budget / co-pay / family history /
# smoker) is asked in ONE prompt. When the user answers SOME but
# not all, the LLM treated the bundle as fully satisfied (it's
# SOFT capture) and recommended with the unanswered slot blank β€”
# e.g. asked SI/budget/co-pay/smoker, user gave SI/budget/co-pay,
# smoker never re-asked, bot recommended.
# Bug #110: family_medical_history (item 5 of the bundle) was the slot most
# often silently dropped β€” the fact-find effectively NEVER asked it.
#
# FIX: a deterministic ONE-SHOT re-ask gate. After the hard 7-slot gate
# passes, on a recommendation retrieve_policies call, if any UNRESOLVED
# bundle slot remains we return a directive `pricing_inputs_incomplete`
# response (re-ask exactly the unresolved ones, verbatim) β€” but ONLY once
# per session, and ONLY when the user has not explicitly skipped. After the
# single re-ask we proceed (SOFT-capture semantics preserved: the user can
# still skip; we never hard-loop).
#
# A bundle slot is RESOLVED when it is either captured on the profile OR the
# user explicitly declined/skipped it (session.pricing_bundle_skipped β€” set
# by single_brain when the user says "just show me options" / "skip" / "you
# decide"). parents_age_max is conditional: only part of the bundle when the
# user is covering parents and it isn't captured yet.
_PRICING_BUNDLE_CORE: tuple[str, ...] = (
"desired_sum_insured_inr",
"budget_band",
"existing_cover_inr",
"copay_pct",
"family_medical_history", # Bug #110 β€” must be asked, same skip handling
"smoker",
)
# Exact verbatim re-ask phrasing per bundle slot (mirrors RULE 2.5 wording so
# the re-ask reads identically to the first ask). family_medical_history is
# first so a forgotten Bug #110 slot leads the re-ask.
_PRICING_BUNDLE_QUESTIONS: dict[str, str] = {
"family_medical_history": (
"Any major conditions running in your blood family "
"(parents/siblings) β€” cancer / diabetes / heart disease / "
"hypertension? (say 'none' if not)"
),
"desired_sum_insured_inr": (
"How much sum insured would you like? (e.g., β‚Ή5L / β‚Ή10L / "
"β‚Ή25L / β‚Ή1Cr)"
),
"budget_band": (
"What's your annual premium budget? (e.g., β‚Ή10-15K/year, or "
"β‚Ή50K+ for premium covers)"
),
"existing_cover_inr": (
"Any existing health cover from work or otherwise? "
"(e.g., '5L through employer', or 'no')"
),
"copay_pct": (
"Are you OK with a co-pay β€” sharing 10-30% of every claim β€” to "
"lower the premium? Or do you want zero co-pay?"
),
"smoker": "Do you smoke or use tobacco products? (yes / no)",
"parents_age_max": (
"Roughly what's the age of the eldest parent you'd cover?"
),
}
def _profile_has_parents(profile) -> bool:
"""True when the captured dependents indicate parents are covered (so
parents_age_max becomes part of the post-recap bundle)."""
dep = getattr(profile, "dependents", None)
if isinstance(dep, str) and "parent" in dep.lower():
return True
return bool(getattr(profile, "parents_to_insure", None))
def _unresolved_pricing_bundle(profile, session) -> list[str]:
"""Return the bundle slots that are NEITHER captured on the profile NOR
explicitly skipped by the user this session. Empty list β‡’ nothing left
to re-ask (recommend may proceed)."""
if session is not None and bool(
getattr(session, "pricing_bundle_skipped", False)
):
# User explicitly said "just show me options / you decide / skip".
return []
bundle = list(_PRICING_BUNDLE_CORE)
if _profile_has_parents(profile):
bundle.append("parents_age_max")
# #41 (2026-05-21) β€” a slot the user has ANSWERED is resolved even if
# the answer coerces to an empty value (family_medical_history="none"
# β†’ []). profile.asked records answered slots, so an empty-but-asked
# slot is NOT re-asked.
asked = set(getattr(profile, "asked", None) or [])
unresolved: list[str] = []
for slot in bundle:
if getattr(profile, slot, None) in (None, "", []) and slot not in asked:
unresolved.append(slot)
return unresolved
# ───────────────────────────────────────────────────────────────────────────
# SLOT_UNION (B6, 2026-05-15)
# ───────────────────────────────────────────────────────────────────────────
# Single source of truth for every captured field that drives EITHER the
# profile (recommendation match + scorecard) OR the pricing pipeline
# (premium_calculator.estimate / bulk_estimate). See module docstring above
# for the full slot→consumer matrix.
#
# Ordering convention (do not re-order without auditing union_snapshot
# callers): required slots first, then pricing-only slots, then
# family-detail slots. Total = 13.
SLOT_UNION: tuple[str, ...] = (
# Recommendation slots (in _REQUIRED_FOR_READY)
"name",
"age",
"dependents",
"location_tier",
"income_band",
"primary_goal",
"health_conditions",
# Pricing slots (B5 + B6 additions)
"budget_band",
"budget_inr", # #64 β€” exact β‚Ή/yr (lossless companion to budget_band)
"desired_sum_insured_inr",
"existing_cover_inr",
# Family-detail slots (used by pricing if applicable)
"parents_to_insure",
"parents_age_max",
"parents_has_ped",
# D2 additions (2026-05-15)
"copay_pct",
"family_medical_history",
# KI-275 (2026-05-15) β€” smoker / tobacco use, +30-50% premium loading.
"smoker",
)
# Invariant: every SLOT_UNION field must be accepted by save_profile_field
# (otherwise the LLM can never capture it). Validated at import time so a
# future delete here is loud, not silent.
assert all(_s in _ACCEPTED_FIELDS for _s in SLOT_UNION), (
"SLOT_UNION contains a field not in _ACCEPTED_FIELDS: "
f"{[s for s in SLOT_UNION if s not in _ACCEPTED_FIELDS]}"
)
def union_snapshot(profile) -> dict:
"""Return a JSON-safe dict of every SLOT_UNION field currently captured
on `profile`. Empty / None / [] slots are EXCLUDED so the pricing
pipeline can safely treat presence as "captured" (KI-091 null-overwrite
rule β€” never pass None where 0 is meaningful).
Used by premium_calculator.estimate / bulk_estimate to read pricing
inputs without re-implementing the field-name list on each side.
"""
snap: dict = {}
for fld in SLOT_UNION:
try:
v = getattr(profile, fld, None)
except Exception: # noqa: BLE001
v = None
if v in (None, "", []):
continue
snap[fld] = v
return snap
def _profile_complete(profile) -> bool:
"""Return True when every slot in _REQUIRED_FOR_READY is non-empty on
the live Profile dataclass."""
for slot in _REQUIRED_FOR_READY:
val = getattr(profile, slot, None)
if val in (None, "", []):
return False
return True
# ---- save_profile_field ----------------------------------------------------
def save_profile_field(session, field: str, value: Any) -> dict:
"""Validate + persist a captured profile field on session.profile.
Accepted fields: name, age, dependents, location_tier, income_band,
primary_goal, health_conditions, existing_cover_inr,
budget_band, gender (tolerated, may no-op).
Lightweight normalization:
- age: int, clamp to [0, 110]
- dependents: pass through `_normalize_dependents` when available,
else stringify.
- existing_cover_inr: parse via `_parse_inr_amount` when available,
else int-coerce.
- health_conditions: coerce to list[str], lowercase, strip empties.
- everything else: string pass-through (Gemini already emits
canonical values when prompted correctly).
Returns: {"saved": True, "field": ..., "value": ...,
"profile_complete": bool}
On unknown field: {"saved": False, "error": "unknown_field"}
"""
if not isinstance(field, str) or not field:
return {"saved": False, "error": "missing_field_name"}
fld = field.strip().lower()
if fld not in _ACCEPTED_FIELDS:
return {"saved": False, "error": f"unknown_field:{field}"}
profile = session.profile
normalized: Any = value
try:
if fld == "age":
normalized = _coerce_age(value)
elif fld == "dependents":
normalized = _coerce_dependents(value)
elif fld == "health_conditions":
normalized = _coerce_health_conditions(value)
elif fld == "existing_cover_inr":
normalized = _coerce_existing_cover(value)
elif fld == "desired_sum_insured_inr":
normalized = _coerce_desired_sum_insured(value)
elif fld in ("parents_to_insure", "parents_has_ped"):
normalized = _coerce_bool(value)
elif fld == "parents_age_max":
normalized = _coerce_age(value)
elif fld == "copay_pct":
normalized = _coerce_copay_pct(value)
elif fld == "family_medical_history":
normalized = _coerce_family_medical_history(value)
elif fld == "smoker":
normalized = _coerce_smoker(value)
elif fld == "budget_band":
normalized = _coerce_budget_band(value)
elif fld == "name":
normalized = (str(value).strip() if value is not None else None) or None
elif fld == "gender":
# Profile dataclass has no gender field today; silently
# accept + skip persistence so the LLM doesn't loop trying
# to save it. Forward-compat: when Profile gains gender,
# this branch just becomes `normalized = str(value).strip()`.
return {
"saved": False,
"field": fld,
"value": value,
"error": "field_not_on_profile_dataclass",
"profile_complete": _profile_complete(profile),
}
else:
# location_tier / income_band / primary_goal β€” pass through
# (budget_band is normalised above via _coerce_budget_band)
normalized = (str(value).strip() if value is not None else None) or None
except (TypeError, ValueError) as e:
return {
"saved": False,
"field": fld,
"value": value,
"error": f"normalize_failed:{type(e).__name__}:{e}",
}
# Drop None/empty so we don't overwrite a previously-captured slot
# with a Gemini turn that "didn't extract anything". Universal rule
# from KI-091/094 (extractor null overwrite).
if normalized in (None, "", []):
# #41 (2026-05-21) β€” an explicit NEGATIVE family-history answer
# ("none") legitimately coerces to []. Do NOT silently drop it as
# "extracted nothing": record the field on profile.asked so the
# post-recap pricing-bundle gate (_unresolved_pricing_bundle)
# treats it RESOLVED and never re-asks a slot the user answered.
if fld == "family_medical_history" and value is not None and str(value).strip().lower() in (
"none", "no", "nil", "nothing", "n/a", "na", "none.",
"no family history", "no family medical history",
"no family medical history.", "no family history.",
):
try:
if fld not in profile.asked:
profile.asked.append(fld)
except Exception: # noqa: BLE001
pass
return {
"saved": True,
"field": fld,
"value": [],
"profile_complete": _profile_complete(profile),
}
return {
"saved": False,
"field": fld,
"value": value,
"error": "normalized_empty",
"profile_complete": _profile_complete(profile),
}
if not hasattr(profile, fld):
return {
"saved": False,
"field": fld,
"value": value,
"error": "field_not_on_profile_dataclass",
"profile_complete": _profile_complete(profile),
}
setattr(profile, fld, normalized)
# #64 β€” when the user states a budget, ALSO preserve the EXACT β‚Ή amount
# losslessly (not just the 4-bucket band) so the slider shows what they
# actually said ("β‚Ή15,000"), never a band representative ("β‚Ή12k"). The
# band stays the pricing contract; budget_inr is the display truth.
if fld == "budget_band":
try:
if isinstance(value, bool):
_exact = None
elif isinstance(value, (int, float)):
_exact = int(value)
else:
from backend.needs_finder import _parse_inr_amount as _pinr
import re as _re2
# Strip per-annum qualifiers FIRST β€” same KI-161 guard
# _coerce_budget_band handles, so "β‚Ή15,000/yr" / "15000 a
# year" / "more than 15000 a year" yield the exact β‚Ή, not
# None (the parser otherwise reads "year" as age context).
_s = str(value)
_cleaned = _re2.sub(
r"\b(?:per\s*(?:year|annum)|p\.?\s*a\.?|/\s*(?:yr|year|"
r"annum)|a\s*year|annually|yearly|/\s*yr)\b",
" ",
_s,
flags=_re2.IGNORECASE,
)
_exact = _pinr(_cleaned) or _pinr(_s)
if _exact and _exact > 0 and hasattr(profile, "budget_inr"):
profile.budget_inr = int(_exact)
if "budget_inr" not in getattr(profile, "asked", []):
profile.asked.append("budget_inr")
except Exception: # noqa: BLE001 β€” exact β‚Ή is best-effort; band still saved
pass
# Track that the brain has "asked" this field so the rest of the
# codebase's helpers (which inspect profile.asked) stay in sync.
try:
if fld not in getattr(profile, "asked", []):
profile.asked.append(fld)
except Exception: # noqa: BLE001 β€” best-effort bookkeeping
pass
return {
"saved": True,
"field": fld,
"value": normalized,
"profile_complete": _profile_complete(profile),
}
_qseed_cache: dict = {} # (profile_sig) -> [seed chunk dicts]
# BUG #30 (B1-c) β€” per-signature count of TRAILING existing-cover top-up
# seeds in `_qseed_cache[sig]`. They sit AFTER the primary window and must
# survive the final `[:limit]` slice so a relevant super-top-up is never
# truncated out of contention for a user who already holds base cover.
_qseed_topup_n: dict = {}
def _qseed_slice(rows: list[dict], sig: str, limit: int) -> list[dict]:
"""Return up to `limit` primary seeds PLUS all existing-cover top-up
seeds (which trail the list), so the top-up seeds are never cut."""
n_top = _qseed_topup_n.get(sig, 0)
if n_top <= 0:
return rows[:limit]
primaries = rows[:-n_top] if n_top < len(rows) else []
topups = rows[-n_top:]
return primaries[:limit] + topups
def _quality_seed_candidates(profile, limit: int = 25) -> list[dict]:
"""Seed the candidate pool with the catalogue's top policies by the
profile-tuned scorecard overall, so a strong policy that isn't
cosine-similar to a generic query still enters contention. This only
ADDS candidates β€” filter_pipeline still applies precise eligibility +
profile-fit ranking on the union; it never bypasses eligibility or
fabricates a recommendation. Cached per profile-signature."""
out: list[dict] = []
try:
prof_sig = repr(sorted(
(s, getattr(profile, s, None))
for s in (
"age", "income_band", "primary_goal", "existing_cover_inr",
"copay_pct", "dependents", "health_conditions",
"budget_band", "location_tier", "parents_to_insure",
"parents_age_max",
)
)) if profile is not None else "none"
if prof_sig in _qseed_cache:
return _qseed_slice(_qseed_cache[prof_sig], prof_sig, limit)
cur = _curated_facts_all()
scored: list[tuple[float, str, dict, dict]] = []
seen: set[str] = set()
for key, data in cur.items():
if not isinstance(data, dict):
continue
pid = (data.get("policy_id") or key or "").strip()
if not pid or pid in seen:
continue
seen.add(pid)
if not _has_extraction(pid):
# Curated-graded but no extracted corpus β†’ its card would
# render as N/A / "No extraction available for this policy"
# / "Data not indexed". Never seed a non-renderable policy.
continue
sig = _scorecard_signal(pid, profile=profile)
ov = sig.get("_overall_score")
if ov is None:
continue
try:
ovf = float(ov)
except (TypeError, ValueError):
continue
scored.append((ovf, pid, data, sig))
scored.sort(key=lambda t: -t[0])
# BUG #30 (B1-c) β€” when the user already holds ANY base cover, also
# union-in the top-N top-up / super-top-up policies even if they fall
# outside the profile-neutral top-25 window, so a directly relevant
# super-top-up is seeded into contention (filter_pipeline then ranks
# the union via _fit_score, which now carries the existing-cover term).
# Deterministic: drawn from the already-sorted `scored` list, no RNG.
_existing = getattr(profile, "existing_cover_inr", None) \
if profile is not None else None
try:
_existing_int = int(str(_existing).replace(",", "").strip()) \
if _existing not in (None, "") else 0
except (TypeError, ValueError):
_existing_int = 0
window = max(limit, 25)
primary_rows = scored[:window]
extra: list[tuple[float, str, dict, dict]] = []
if _existing_int > 0:
from backend.retrieval_filters import _is_top_up as _rf_is_top_up
in_window = {pid for _, pid, _, _ in primary_rows}
_TOPUP_SEED_MAX = 3 # bounded extra seeds; keeps pool deterministic
for ovf, pid, data, sig in scored[window:]:
if pid in in_window:
continue
probe = {
"policy_name": data.get("policy_name", pid),
**_load_policy_facts(pid),
}
if _rf_is_top_up(probe):
extra.append((ovf, pid, data, sig))
if len(extra) >= _TOPUP_SEED_MAX:
break
# `_n_topup_seeds` is preserved on the cached list so the final
# slice keeps the existing-cover top-up seeds (they sit AFTER the
# primary window and would otherwise be cut by `out[:limit]`).
for ovf, pid, data, sig in primary_rows + extra:
ch = {
"chunk_id": f"{pid}__qseed",
"policy_id": pid,
"policy_name": clean_display_policy_name(
data.get("policy_name", pid)
),
"insurer_slug": data.get("insurer_slug")
or (pid.split("__", 1)[0] if "__" in pid else ""),
"doc_type": "policy",
"source_url": data.get("_primary_source_pdf") or "",
# No cosine text β€” filter_pipeline ranks the union by
# _fit_score (scorecard grade + profile fit), not cosine,
# so a 0.0 cosine score does not bury a seeded A-policy.
"chunk_text": "",
"score": 0.0,
}
ch.update(_load_policy_facts(pid))
ch.update(sig)
out.append(ch)
_qseed_cache[prof_sig] = out
_qseed_topup_n[prof_sig] = len(extra)
return _qseed_slice(out, prof_sig, limit)
except Exception as e: # noqa: BLE001 β€” seeding must never break retrieval
_log.warning("quality-seed failed: %s", e)
return out[:limit]
# ---- uploaded-doc (quarantine) bypass helpers ------------------------------
def _session_has_quarantine_docs(session_id: str) -> bool:
"""True iff this session has at least one chunk in the SEPARATE
`user_uploads_quarantine` Chroma collection.
Strictly session-scoped (where={"session_id": session_id}); cheap
metadata-only `.get(limit=1)`. Never raises β€” a Chroma hiccup just
means "treat as no upload" so the normal profile-gate path runs.
"""
if not session_id:
return False
try:
from rag.ingest import get_quarantine_collection
q = get_quarantine_collection().get(
where={"session_id": session_id},
limit=1,
include=[],
)
return bool(q and q.get("ids"))
except Exception as e: # noqa: BLE001 β€” best-effort probe
_log.warning(
"quarantine presence probe failed (sid=%s): %s: %s",
session_id, type(e).__name__, str(e)[:160],
)
return False
async def _retrieve_uploaded_only(
query: str, session_id: str, top_k: int
) -> dict:
"""Quarantine-scoped retrieval that returns ONLY this session's
uploaded-doc chunks, bypassing the recommendation profile-gate.
Reuses `rag.retrieve.retrieve(session_id=...)` (whose quarantine boost
pass already prepends this session's uploaded chunks), then filters the
result down to `doc_type == "user_upload"` so the bypass response can
NEVER contain general-corpus policy chunks against an incomplete
profile. Returns the same dict shape as `retrieve_policies`.
"""
try:
from rag.retrieve import retrieve as _retrieve
chunks = await _retrieve(
query=query,
top_k=max(int(top_k) if top_k else 8, 3),
session_id=session_id,
)
except Exception as e: # noqa: BLE001 β€” graceful empty
_log.warning(
"uploaded-only retrieve failed (sid=%s): %s: %s",
session_id, type(e).__name__, str(e)[:160],
)
return {"chunks": [], "count": 0, "error": f"{type(e).__name__}"}
uploaded = []
for c in chunks or []:
if (getattr(c, "doc_type", "") or "").lower() != "user_upload":
continue
# Triple-check session ownership before exposing the chunk β€”
# belt + suspenders on top of the where={"session_id"} filter.
if (getattr(c, "session_id", None) or session_id) != session_id and \
getattr(c, "session_id", None) not in (None, ""):
continue
uploaded.append({
"chunk_id": getattr(c, "chunk_id", ""),
"policy_id": getattr(c, "policy_id", ""),
"policy_name": clean_display_policy_name(
getattr(c, "policy_name", "")
),
"insurer_slug": getattr(c, "insurer_slug", "") or "user-upload",
"doc_type": "user_upload",
"source_url": getattr(c, "source_url", ""),
"chunk_text": (getattr(c, "text", "") or "")[:1200],
"score": float(getattr(c, "score", 0.0) or 0.0),
})
if not uploaded:
return {"chunks": [], "count": 0}
return {
"chunks": uploaded,
"count": len(uploaded),
"query": query,
"source": "uploaded_doc_quarantine",
"note": (
"These chunks come from the user's OWN uploaded policy PDF "
"(session-scoped quarantine). Answer the user's question "
"about THIS document directly. Do NOT treat this as a "
"general recommendation and do NOT block on profile "
"completeness β€” the user explicitly asked about their "
"uploaded file."
),
}
# ---- retrieve_policies -----------------------------------------------------
_NP_STOP = {
"health", "insurance", "insurances", "policy", "policies", "plan",
"plans", "the", "and", "of", "for", "cover", "covers", "coverage",
"india", "general", "life", "assurance", "co", "ltd", "limited",
"company", "scheme", "what", "is", "are", "tell", "me", "about",
"detail", "details", "benefit", "benefits",
}
def _resolve_named_policy(query: str) -> Optional[str]:
"""#61 β€” if the question UNAMBIGUOUSLY names a specific catalogue
policy, return its policy_id so a factual Q&A can retrieve it without
being blocked by the recommendation profile-gate. Conservative: >=2
significant name tokens present, >=60% of the policy_name's significant
tokens in the query, and the top match strictly ahead of the runner-up
(no ambiguous grab). Returns None otherwise. Best-effort; never raises."""
import re as _re
try:
q = " " + _re.sub(r"[^a-z0-9 ]", " ", (query or "").lower()) + " "
if len(q) < 8:
return None
from backend.main import _marketplace_catalogue # lazy: avoids cycle
scored: list[tuple[int, float, str, str]] = []
for c in _marketplace_catalogue(None):
name = (getattr(c, "policy_name", "") or "").lower()
toks = {
t for t in _re.sub(r"[^a-z0-9 ]", " ", name).split()
if len(t) > 2 and t not in _NP_STOP
}
if len(toks) < 2:
continue
hit = sum(1 for t in toks if f" {t} " in q)
cov = hit / len(toks)
if hit >= 2 and cov >= 0.6:
pid = getattr(c, "policy_id", None)
if pid:
scored.append((hit, cov, pid, name))
if not scored:
return None
# VERSION DISAMBIGUATION (#61) β€” when the user explicitly states a
# version ("...Health Companion V2022" / "V22"), the matching card
# that carries that exact version token MUST win, even if a shorter
# base-name card scores more generic tokens. Without this the base
# "Niva Bupa Health Companion" (0 corpus chunks) was chosen over
# "Health Companion V2022" (the card the user named, which HAS
# chunks) β†’ retrieval returned nothing β†’ "I couldn't find it".
_ver = set(_re.findall(r"\bv?\d{2,4}\b", q))
if _ver:
_vmatch = [
s for s in scored
if any(v in _re.sub(r"[^a-z0-9 ]", " ",
s[3]).split() for v in _ver)
]
if len(_vmatch) == 1:
return _vmatch[0][2]
if _vmatch:
_vmatch.sort(key=lambda s: (s[0], s[1]), reverse=True)
if len(_vmatch) == 1 or _vmatch[0][0] > _vmatch[1][0]:
return _vmatch[0][2]
scored.sort(key=lambda s: (s[0], s[1]), reverse=True)
if len(scored) == 1 or scored[0][0] > scored[1][0]:
return scored[0][2]
except Exception: # noqa: BLE001 β€” name resolution is best-effort
pass
return None
async def retrieve_policies(
query: str,
top_k: int = 8,
policy_filter_ids: Optional[list[str]] = None,
profile=None,
intent: str = "recommendation",
session=None,
session_id: Optional[str] = None,
) -> dict:
"""Call the existing Chroma retriever and return policy chunks.
Returns:
{"chunks": [{policy_id, policy_name, insurer_slug, chunk_text,
doc_type, source_url, score, ...}, ...],
"count": N,
"query": query,
"guard": optional {reason, fallback} if filter pipeline says abort}
On failure: {"chunks": [], "count": 0, "error": "..."}.
QUARANTINE-RETRIEVAL FIX (2026-05-16) β€” `session_id` is threaded all the
way down to `rag.retrieve.retrieve(...)` so a PDF the caller uploaded via
POST /api/upload-policy (indexed into the SEPARATE
`user_uploads_quarantine` Chroma collection, tagged with this session's
id) becomes retrievable BY THE CHAT BRAIN FOR THAT SESSION ONLY. Before
this fix the upload was embedded but the brain never forwarded
session_id, so the quarantine boost pass in retrieve.py never fired and
an uploaded policy could never surface in the conversation. Resolution
order: explicit `session_id` arg > `session.session_id` attribute >
None (no quarantine lookup). User A's session_id never leaks into user
B's retrieval because the quarantine `where={"session_id": ...}` filter
is strictly equality-scoped (see rag/retrieve.py).
"""
# Resolve the effective session id. The single_brain dispatcher passes
# the live SessionState as `session`; older callers may pass session_id
# explicitly. Never raise β€” a missing id just means "no quarantine".
eff_session_id = session_id
if eff_session_id is None and session is not None:
eff_session_id = getattr(session, "session_id", None)
if isinstance(eff_session_id, str):
eff_session_id = eff_session_id.strip() or None
elif eff_session_id is not None:
eff_session_id = None
if not isinstance(query, str) or not query.strip():
return {"chunks": [], "count": 0, "error": "empty_query"}
# NAMED-POLICY Q&A BYPASS (#61, 2026-05-18) β€” a direct factual question
# about a SPECIFIC catalogue policy ("what is the PED waiting period for
# HDFC ERGO Optima Restore?") must be answerable on a cold /
# incomplete-profile session. The profile-complete gate below blocks
# ALL policy_filter_ids=None retrieval until the 7-slot fact-find is
# done β€” so the bot replied "I couldn't find that policy" for policies
# that ARE indexed (#26/#28). The gate exists to not RECOMMEND before
# fact-find, not to refuse a factual lookup. When the query
# unambiguously names a known policy, resolve it and reuse the existing
# TRUSTED known-policy path (legitimately gate-bypassing + getting the
# #61 canonical-family $in expansion in rag/retrieve.py).
#
# NOT gated on `intent`: single_brain._execute_tool HARDCODES
# intent="recommendation" for EVERY tool call (single_brain.py:1529),
# so an intent-conditioned bypass is dead code on the live path β€” the
# proven (2026-05-18) #61 root cause: every named-policy Q&A arrived
# with intent="recommendation" β†’ the bypass never fired β†’ the gate
# blocked β†’ "I couldn't find that policy". Safety is the RESOLVER's
# conservatism, not the intent: a broad "recommend me a plan" / generic
# profile request names NO specific policy β†’ _resolve_named_policy
# returns None β†’ still fully gated (verified). Only an explicit,
# unambiguous name (>=2 significant tokens, >=60% coverage,
# version-aware, top strictly ahead of runner-up) triggers it.
if not policy_filter_ids:
_np = _resolve_named_policy(query)
if _np:
policy_filter_ids = [_np]
# UPLOADED-DOC BYPASS (2026-05-18) β€” the user is explicitly promised in
# the UI ("βœ“ Indexed … It's now searchable in this chat. Ask me about
# it.") that an uploaded policy PDF is immediately queryable. But the
# profile-complete gate below blocks ALL retrieval until the 7-slot
# fact-find is done (unless policy_filter_ids is set, which the LLM
# can't know for a freshly-uploaded doc). An uploaded doc is, by
# definition, a known-doc-FOR-THIS-SESSION β€” semantically identical to
# the policy_filter_ids follow-up branch that already legitimately
# bypasses the gate. So: when this session has chunks in the SEPARATE
# `user_uploads_quarantine` collection, run a quarantine-ONLY retrieval
# that bypasses the recommendation profile-gate. Strictly session-
# scoped (where={"session_id": eff_session_id}) so user A's upload
# never leaks into user B. The full recommendation flow (general
# corpus + eligibility + scorecard) still requires the complete
# profile β€” this bypass only surfaces the user's OWN uploaded doc.
if (
eff_session_id
and not policy_filter_ids
and _session_has_quarantine_docs(eff_session_id)
):
up = await _retrieve_uploaded_only(query, eff_session_id, top_k)
# Only short-circuit if the gate would otherwise block (incomplete
# profile) AND we actually found uploaded chunks. If the profile is
# already complete we fall through so the upload is folded into the
# normal ranked recommendation pool by the quarantine boost pass.
_profile_incomplete = profile is not None and any(
getattr(profile, slot, None) in (None, "", [])
for slot in _REQUIRED_FOR_READY
)
if up.get("count", 0) > 0 and _profile_incomplete:
return up
# Profile-complete gate (skip if caller supplied policy_filter_ids β€” that
# branch is a known-policy follow-up). Defense-in-depth: even if the LLM
# ignores RULE 2 of the system prompt, this refuses to return chunks
# against an incomplete profile.
if profile is not None and not policy_filter_ids:
missing = [
slot for slot in _REQUIRED_FOR_READY
if getattr(profile, slot, None) in (None, "", [])
]
if missing:
# KI-Z6-NONE follow-up (2026-05-15): make the response
# extremely directive so Gemini doesn't burn an iteration
# re-trying retrieve_policies on the same incomplete profile.
# Provide an exact_question string the model can literally
# relay to the user for the first missing slot.
_SLOT_QUESTIONS = {
"name": "What's your name?",
"age": "How old are you?",
"dependents": (
"Who would you like the cover to include β€” just you, "
"or spouse / kids / parents?"
),
"location_tier": "Which city do you live in?",
"income_band": (
"Roughly what's your annual household income β€” "
"under 10 lakh, 10-25 lakh, or above 25 lakh?"
),
"primary_goal": (
"Is this your first health policy, an upgrade, for "
"tax planning, or to find a cheaper option?"
),
"health_conditions": (
"Do you or your family have any pre-existing health "
"conditions like diabetes, BP, or thyroid? If none, "
"just say no."
),
}
first = missing[0]
return {
"chunks": [],
"count": 0,
"error": "profile_incomplete",
"missing_slots": missing,
"action_required": "ask_user_for",
"field": first,
"exact_question": _SLOT_QUESTIONS.get(
first,
f"Could you share your {first.replace('_', ' ')}?",
),
"instruction": (
f"Profile is incomplete β€” missing: {', '.join(missing)}. "
"Do NOT call retrieve_policies again this turn. Do NOT "
"retry save_profile_field for the same field. Emit a "
"TEXT reply that asks the user the `exact_question` "
"above verbatim."
),
}
# Bug #108 + #110 (2026-05-16) β€” POST-RECAP pricing & family-history
# bundle re-ask gate. The hard 7-slot gate above is satisfied; before we
# return chunks for a RECOMMENDATION, ensure every bundle item (SI /
# budget / existing cover / co-pay / FAMILY MEDICAL HISTORY / smoker /
# parents-age-if-applicable) has been RESOLVED β€” captured OR explicitly
# skipped. If the user answered some but not all, re-ask ONLY the
# unresolved ones β€” but exactly ONCE per session (one-shot guard) so we
# never hard-loop and SOFT-capture semantics survive (the user can still
# skip on the re-ask). Follow-ups (policy_filter_ids) bypass this β€” it's
# purely the first recommendation path.
if (
profile is not None
and not policy_filter_ids
and (intent or "").lower() == "recommendation"
and session is not None
and not bool(getattr(session, "pricing_bundle_reasked", False))
):
unresolved = _unresolved_pricing_bundle(profile, session)
if unresolved:
# One-shot: mark so the NEXT recommendation retrieve proceeds
# even if the user skips on the re-ask (SOFT capture, not a hard
# gate β€” Bug #108 fix must re-ask, not loop forever).
try:
session.pricing_bundle_reasked = True
except Exception: # noqa: BLE001 β€” bookkeeping must not break
pass
_qs = [
_PRICING_BUNDLE_QUESTIONS.get(
s, f"Could you share your {s.replace('_', ' ')}?"
)
for s in unresolved
]
_numbered = "\n".join(
f"{i}. {q}" for i, q in enumerate(_qs, start=1)
)
return {
"chunks": [],
"count": 0,
"error": "pricing_inputs_incomplete",
"missing_slots": unresolved,
"action_required": "ask_user_for",
"field": unresolved[0],
"exact_question": (
"Before I pull your recommendations, just a couple more "
"(you can skip any):\n" + _numbered
),
"instruction": (
"The user answered some but not all of the pricing / "
"family-history questions. Do NOT call retrieve_policies "
"again this turn and do NOT recommend yet. Emit a TEXT "
"reply that asks ONLY the still-missing items in "
"`exact_question` verbatim. If the user then provides "
"them, save each via save_profile_field; if they skip, "
"proceed to retrieve_policies on the next turn."
),
}
try:
from rag.retrieve import retrieve as _retrieve
# Decouple the Chroma recall pool from the caller's top_k: pull a
# wide pool so eligibility + scorecard-aware ranking choose from a
# broad set, then return the top-k best-fit survivors (truncated
# after filter_pipeline below). For an explicit known-policy
# follow-up (policy_filter_ids) keep the narrow top_k.
_recall_pool = (
(int(top_k) if top_k else 8)
if policy_filter_ids
else _RECALL_POOL
)
chunks = await _retrieve(
query=query,
top_k=_recall_pool,
policy_ids=policy_filter_ids or None,
session_id=eff_session_id,
)
except Exception as e: # noqa: BLE001 β€” return graceful empty
_log.warning(
"retrieve_policies failed (q=%r): %s: %s",
query[:120], type(e).__name__, str(e)[:200],
)
return {
"chunks": [],
"count": 0,
"error": f"{type(e).__name__}:{str(e)[:200]}",
}
raw: list[dict] = []
for c in chunks or []:
pid = getattr(c, "policy_id", "")
doc_type = getattr(c, "doc_type", "")
chunk_dict = {
"chunk_id": getattr(c, "chunk_id", ""),
"policy_id": pid,
"policy_name": clean_display_policy_name(
getattr(c, "policy_name", "")
),
"insurer_slug": getattr(c, "insurer_slug", ""),
"doc_type": doc_type,
"source_url": getattr(c, "source_url", ""),
"chunk_text": (getattr(c, "text", "") or "")[:1200],
"score": float(getattr(c, "score", 0.0) or 0.0),
"min_entry_age": getattr(c, "min_entry_age", None),
"max_entry_age": getattr(c, "max_entry_age", None),
}
# KI-278 β€” enrich policy chunks with the structural facts the
# eligibility/profile-fit filter needs (top-up signal, SI options,
# mandatory co-pay, scorecard grade). Skip non-policy chunks
# (profile/regulatory/review) β€” they're never recommendable policies
# and the eligibility rules don't apply to them.
if pid and (doc_type or "").lower() not in (
"profile", "regulatory", "review"
):
try:
chunk_dict.update(_load_policy_facts(pid))
chunk_dict.update(_scorecard_signal(pid, profile=profile))
except Exception as e: # noqa: BLE001 β€” never break retrieval
_log.warning("KI-278 enrich(%s) failed: %s", pid, e)
raw.append(chunk_dict)
# X5 sidecar: apply profile-fit + citation-grounding + dedup. Skip when
# caller supplied an explicit policy_filter_ids (we already know which
# policies they want).
guard_signal = None
filtered = raw
if not policy_filter_ids:
# Union the cosine pool with the catalogue's top profile-graded
# policies so a strong policy that isn't cosine-similar to a
# generic needs-query still enters contention. filter_pipeline
# then applies eligibility + profile-fit ranking on the union. A
# cosine chunk wins on a dup (it carries real retrieved text);
# seeds only add missing candidates.
if profile is not None:
_seen_pids = {
(c.get("policy_id") or "").strip() for c in raw
}
for _seed in _quality_seed_candidates(profile, limit=25):
_sp = (_seed.get("policy_id") or "").strip()
if _sp and _sp not in _seen_pids:
_seen_pids.add(_sp)
raw.append(_seed)
try:
from backend.retrieval_filters import filter_pipeline
filtered, guard_signal = filter_pipeline(
raw, profile=profile, query=query, intent=intent,
)
except Exception as e: # noqa: BLE001 β€” pipeline must never break retrieval
_log.warning("retrieval_filters.filter_pipeline failed: %s", e)
filtered = raw
# filter_pipeline has eligibility-filtered + ranked the wide pool
# best-fit-first (scorecard-aware, deduped to ~1 chunk/policy).
# Return only the top-N best-fit survivors β€” enough for the LLM to
# pick 2-4 recommendations and for the citation builder to select
# from, without the wide tail as context. This is the only
# truncation, done after quality ranking, so the LLM and the cards
# see the same best-fit set.
filtered = filtered[: max((int(top_k) if top_k else 8), 12)]
# For a recommendation turn, apply the same fitness floor the
# citation builder uses (_recommendation_fit) here, so the LLM only
# sees cardable policies β€” prose and cards stay 1:1 (it cannot name
# a policy it won't card). If nothing clears the floor the set is
# empty and the LLM says there is no strong match rather than
# padding with a sub-floor policy. qa / follow-up intents are
# untouched (they cite supporting source chunks regardless of
# recommendation grade).
if (intent or "").lower() == "recommendation" and filtered:
try:
from backend.single_brain import _recommendation_fit
filtered = [c for c in filtered if _recommendation_fit(c)[0]]
except Exception as e: # noqa: BLE001 β€” never break retrieval
_log.warning("rec-fit 1:1 gate failed: %s", e)
# X7 — cache slug→insurer lookups on session so a subsequent
# mark_recommendation call (same turn) can stamp the right insurer on the
# shown_policies event. Each retrieve_policies call MERGES into the cache
# rather than overwriting, so an LLM that hits multiple retrieves before
# marking still resolves every cited slug. Keep last_retrieved_chunks too
# for parity with future tools that need the full chunk objects.
if session is not None:
try:
slug_to_insurer = dict(getattr(session, "slug_to_insurer", {}) or {})
for c in filtered:
slug = (c.get("policy_id") or "").strip()
insurer = (c.get("insurer_slug") or "").strip()
if slug and insurer:
slug_to_insurer[slug] = insurer
session.slug_to_insurer = slug_to_insurer
session.last_retrieved_chunks = list(filtered)
except Exception: # noqa: BLE001 β€” bookkeeping must never fail retrieval
pass
out = {
"chunks": filtered,
"count": len(filtered),
"query": query,
}
if guard_signal is not None:
out["guard"] = guard_signal
return out
# ---- mark_recommendation ---------------------------------------------------
def get_policy_facts(session, policy_ids: Optional[list[str]] = None) -> dict:
"""Return AUTHORITATIVE claim / reputation / scorecard / coverage facts
for one or more policy_ids β€” the SAME data the frontend detail-modal
shows (insurer reviews `claim_metrics` + scorecard grade + the curated
coverage facts). This is the tool the brain calls for ANY follow-up
about claim-settlement ratio, claim denials/rejections, complaints,
incurred-claim ratio, insurer reputation, or to COMPARE two policies
the user already saw.
Root cause (2026-05-18): the brain previously had NO way to reach
claim/denial/complaint/review data β€” retrieve_policies returns policy
WORDING chunks only β€” so it falsely answered "I don't have enough
information" for claims questions and could not back a verbal
comparison. This tool closes that gap; it reuses the existing
`_insurer_reviews` / `_scorecard_signal` / `_load_policy_facts` /
`_curated_facts_all` loaders so a verbal answer matches the modal.
`policy_ids` empty/None β†’ falls back to the active shortlist
(`session.last_recommendation_ids`) so "compare the ones you showed"
works without the model re-deriving ids.
"""
ids = [str(p).strip() for p in (policy_ids or []) if str(p).strip()]
if not ids:
ids = [
str(p).strip()
for p in (getattr(session, "last_recommendation_ids", []) or [])
if str(p).strip()
]
if not ids:
return {
"ok": False,
"error": (
"no_policy_ids β€” pass policy_ids, or recommend policies "
"first so there is an active shortlist"
),
}
# Resolve policy_id β†’ (policy_name, insurer_slug) from the caches
# retrieve_policies stashed this session, then the curated catalogue.
slug_map = dict(getattr(session, "slug_to_insurer", {}) or {})
name_by_id: dict[str, str] = {}
for c in getattr(session, "last_retrieved_chunks", []) or []:
pid = (c.get("policy_id") or "").strip()
if not pid:
continue
name_by_id.setdefault(pid, c.get("policy_name") or pid)
if pid not in slug_map and c.get("insurer_slug"):
slug_map[pid] = c.get("insurer_slug")
try:
curated = _curated_facts_all()
except Exception: # noqa: BLE001 β€” curated layer optional
curated = {}
out: list[dict] = []
for pid in ids:
cur = curated.get(pid) or {}
slug = (slug_map.get(pid) or cur.get("insurer_slug") or "").strip()
pname = clean_display_policy_name(
name_by_id.get(pid) or cur.get("policy_name") or pid
)
rv = _insurer_reviews(slug) or {}
cm = rv.get("claim_metrics") or {}
agg = rv.get("aggregate_score") or {}
sig = _scorecard_signal(pid) or {}
try:
facts = _load_policy_facts(pid) or {}
except Exception: # noqa: BLE001 β€” facts optional
facts = {}
out.append(
{
"policy_id": pid,
"policy_name": pname,
"insurer_slug": slug,
"insurer_name": rv.get("insurer_name")
or (slug.replace("-", " ").title() if slug else ""),
"scorecard_grade": sig.get("_grade"),
"scorecard_overall_0_100": sig.get("_overall_score"),
"claim_settlement_ratio_pct": cm.get(
"claim_settlement_ratio_pct"
),
"claim_settlement_ratio_year": cm.get(
"claim_settlement_ratio_year"
),
"three_year_avg_csr_pct": cm.get("three_year_avg_csr_pct"),
"complaints_per_10k_policies": cm.get(
"complaints_per_10k_policies"
),
"complaints_year": cm.get("complaints_year"),
"claims_rejected_fy24": cm.get("claims_rejected_fy24"),
"incurred_claim_ratio_pct": cm.get("incurred_claim_ratio_pct"),
"reputation_headline": agg.get("headline"),
"reputation_grade": agg.get("letter_grade"),
"claim_data_source_url": (
cm.get("source_irdai_url")
or cm.get("source_secondary_url")
or cm.get("source_complaints_url")
),
# Bug #44 β€” decision-critical fields (PED waiting, initial
# waiting, copay, room-rent cap, CSR) are resolved from the
# SAME canonical curated entry the scorecard / #31 path
# uses, so a verbal answer / comparison table built from
# this tool can never contradict the policy's scorecard
# card. They OVERRIDE the divergent _load_policy_facts
# 7-key resolver for these keys, agreeing by construction.
"key_coverage_facts": {
**{k: v for k, v in facts.items() if v not in (None, "", [])},
**canonical_decision_facts(pid),
},
"reviews_available": bool(rv),
}
)
return {
"ok": True,
"count": len(out),
"policies": out,
"note": (
"This is the authoritative claim-settlement / complaint / "
"denial / scorecard data (IRDAI + scorecard) β€” answer the "
"user's question directly from it; do NOT say you lack this "
"information. Cite as "
"[Source: <insurer> claim data (IRDAI), <claim_data_source_url>]."
),
}
def mark_recommendation(
session,
policy_ids: list[str],
is_final: bool = False,
) -> dict:
"""Persist the policies just recommended so follow-up turns can resolve
references like "tell me about #2".
Sets `session.last_recommendation_ids = policy_ids` (the same field the
orchestrator already maintains for follow-up routing β€” KI-224 / KI-228).
`is_final` is accepted for forward-compat (when the session grows a
`closed` field); today it's logged but not persisted.
Returns: {"recorded": True, "policy_ids": [...], "is_final": bool}
"""
if not isinstance(policy_ids, list):
return {"recorded": False, "error": "policy_ids_not_list"}
# Coerce + dedupe while preserving order.
seen: set[str] = set()
cleaned: list[str] = []
for pid in policy_ids:
s = str(pid).strip()
if s and s not in seen:
seen.add(s)
cleaned.append(s)
# Z2 fix β€” Issue 2 (hallucinated closure). Vikram T6 saw the LLM emit
# mark_recommendation with an empty policy_ids list, the tool silently
# returned {"recorded": True, "policy_ids": []}, and the bot then said
# "I'm glad we found a good fit" despite ZERO cards shown. Two
# preconditions, gated BEFORE any session write so we don't poison
# last_recommendation_ids / shown_policies events with junk:
# (a) empty (after dedup) β†’ no_policies_supplied
# (b) non-empty BUT no retrieval history this session β†’ caller
# must run retrieve_policies first (Y2 cache check via
# session.last_retrieved_chunks)
if not cleaned:
return {"recorded": False, "error": "no_policies_supplied"}
_retrieval_cache = getattr(session, "last_retrieved_chunks", None)
if not _retrieval_cache:
return {
"recorded": False,
"error": "no_retrieval_history β€” call retrieve_policies first",
}
try:
session.last_recommendation_ids = cleaned
except Exception as e: # noqa: BLE001
return {
"recorded": False,
"error": f"setattr_failed:{type(e).__name__}:{e}",
}
if is_final and hasattr(session, "closed"):
try:
session.closed = True # type: ignore[attr-defined]
except Exception: # noqa: BLE001
pass
# X7 β€” write a shown_policies event per cited policy so the admin
# Recommendation History panel needs single_brain turns logged the same
# way the (now-removed) orchestrator did its `_log_shown_policies`
# (KI-063): dedupe by slug, resolve insurer via the slug→insurer
# cache `retrieve_policies` stashed on the session this turn, and stamp
# `turn_idx=session.turn_idx` so the frontend "Conversation turn" column
# has a real value instead of "β€”".
#
# No profile name β†’ no JSON file to write to (anonymous session). No
# insurer resolution for a slug β†’ skip that slug. All errors swallowed
# so a logging failure never breaks the tool reply back to Gemini.
# ADR-043 (2026-05-27) β€” record_policy_event used to write the shown
# policy onto the named-profile JSON for cross-session "have I shown
# this before" tracking. Cross-session persistence is gone; the
# in-memory equivalent (avoid re-pitching within the same session)
# is handled by session.last_recommendation_ids / shown_policies on
# the live Profile dataclass.
try:
profile = getattr(session, "profile", None)
if profile is not None and cleaned:
shown = list(getattr(profile, "shown_policies", None) or [])
existing_slugs = {(e or {}).get("policy_slug") for e in shown}
slug_to_insurer = dict(getattr(session, "slug_to_insurer", {}) or {})
turn_idx = int(getattr(session, "turn_idx", 0) or 0)
session_id = getattr(session, "session_id", None)
now_iso = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
for slug in cleaned:
if not slug or slug in existing_slugs:
continue
insurer = slug_to_insurer.get(slug)
if not insurer:
continue
shown.append({
"policy_slug": slug,
"insurer": insurer,
"event_at": now_iso,
"session_id": session_id,
"reason": "shown_in_recommendation",
"turn_idx": turn_idx,
})
existing_slugs.add(slug)
profile.shown_policies = shown
except Exception as e: # noqa: BLE001 β€” never break the tool reply
_log.warning(
"mark_recommendation shown-event logging (in-memory) failed: "
"%s: %s",
type(e).__name__, str(e)[:200],
)
return {
"recorded": True,
"policy_ids": cleaned,
"is_final": bool(is_final),
}
# ---- private normalizers ---------------------------------------------------
def _coerce_age(value: Any) -> Optional[int]:
"""int(value), clamped to [0, 110]. Empty / non-numeric β†’ None."""
if value is None:
return None
try:
if isinstance(value, bool): # bool is an int subclass β€” block first
return None
n = int(value)
except (TypeError, ValueError):
# Try string parse β€” Gemini sometimes emits "29" as a JSON string
try:
n = int(str(value).strip())
except (TypeError, ValueError):
return None
if n < 0:
n = 0
if n > 110:
n = 110
return n
# Ported from sales_brain_normalizer.py (B6 cleanup, 2026-05-15) β€” the legacy
# normalizer module is being deleted; this is the only consumer left.
_DEPENDENT_VAGUE_TERMS = (
"family", "everyone", "all of us", "everybody",
"whole family", "joint family",
)
def _normalize_dependents_inline(value: Any) -> Optional[str]:
"""Port of sales_brain_normalizer._normalize_dependents.
Schema-free variant β€” the original required a `schema["values"]` lookup
for direct enum hits; here we hard-code the canonical bucket set used
by the bot (kept in sync with the previous fact-find schema).
"""
if not isinstance(value, str):
return None
s = value.strip().lower()
if not s:
return None
# Canonical enum values (formerly schema["values"]).
_CANONICAL_VALUES = (
"self",
"self+spouse",
"self+spouse+kids",
"self+spouse+parents",
"self+spouse+kids+parents",
"self+kids",
"self+parents",
)
if s in _CANONICAL_VALUES:
return s
has_spouse = any(k in s for k in ("spouse", "wife", "husband", "partner"))
has_kids = ("kid" in s) or ("child" in s) or ("children" in s) or ("son" in s) or ("daughter" in s)
has_parents = "parent" in s
# Vague terms with no specific signals β†’ cannot coerce
if any(v in s for v in _DEPENDENT_VAGUE_TERMS) and not (has_spouse or has_kids or has_parents):
return None
if has_spouse and has_kids and has_parents:
return "self+spouse+kids+parents"
if has_spouse and has_parents:
return "self+spouse+parents"
if has_spouse and has_kids:
return "self+spouse+kids"
if has_spouse:
return "self+spouse"
if has_kids and has_parents:
# No canonical bucket β€” fold parents into the wider bundle
return "self+spouse+kids+parents"
if has_kids:
return "self+kids"
if has_parents:
return "self+parents"
# KI-222 β€” expand the "self" alias set. Live captures showed users
# answering "single", "unmarried", "no dependents", "just myself" etc.,
# which previously fell through to None and got silently dropped β€” the
# bot then re-asked the same slot on the next turn.
_SELF_ALIASES = (
"self", "me", "just me", "only me", "myself", "only self",
"single", "unmarried", "alone", "bachelor", "no dependents",
"just myself", "nobody else", "no one else", "nobody",
"myself only", "by myself", "solo",
)
if s in _SELF_ALIASES:
return "self"
# Substring fall-through for the same intents when wrapped in extra prose
# (e.g. "i'm single right now", "just myself for now").
if any(alias in s for alias in (
"single", "unmarried", "no dependents", "just myself",
"by myself", "nobody else", "no one else", "myself only",
"bachelor", "solo",
)):
return "self"
return None
def _coerce_dependents(value: Any) -> Optional[str]:
"""Normalize raw dependents text to a canonical bucket; see
`_normalize_dependents_inline` (ported from sales_brain_normalizer)."""
if value is None:
return None
try:
return _normalize_dependents_inline(value)
except Exception: # noqa: BLE001 β€” best-effort
s = str(value).strip()
return s or None
def _coerce_existing_cover(value: Any) -> Optional[int]:
"""Parse INR amounts like "5L" / "5 lakh" / 500000 via the canonical
parser. Numeric pass-throughs are clamped to >= 0.
"""
if value is None:
return None
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
n = int(value)
return max(0, n)
try:
from backend.needs_finder import _parse_inr_amount
parsed = _parse_inr_amount(str(value))
if parsed is not None:
return max(0, int(parsed))
except Exception: # noqa: BLE001
pass
# Last-ditch: strip non-digits
try:
digits = "".join(ch for ch in str(value) if ch.isdigit())
if digits:
return max(0, int(digits))
except Exception: # noqa: BLE001
pass
return None
# Canonical budget bands β€” the EXACT string contract the frontend
# ProfileBuilderPanel round-trips via budgetBandToInr / budgetInrToBand
# (frontend/src/app/page.tsx) and that needs_finder._parse_budget_band
# emits. Keeping this set inline makes the contract greppable from the
# capture surface.
_CANONICAL_BUDGET_BANDS: frozenset[str] = frozenset(
{"under_15k", "15k_30k", "30k_60k", "60k+"}
)
def _coerce_budget_band(value: Any) -> Optional[str]:
"""Normalise a budget capture to the documented `budget_band` contract.
Bug #109 (2026-05-16). The user states a premium budget in chat
("max β‚Ή15,000/yr", "around 30k", "15000"). Gemini calls
save_profile_field(field="budget_band", value=...) but often passes the
NUMERIC the user said, not a canonical band. The old code let
budget_band fall through the generic string pass-through, so
profile.budget_band was stored as the raw "15000" β€” which the frontend
ProfileBuilderPanel's budgetBandToInr() switch can't map, so the panel
never pre-filled the budget slider even though the bot's summary showed
it.
Mapping (matches frontend budgetInrToBand + needs_finder bands):
β€’ already a canonical band ("15k_30k") β†’ passed through unchanged
β€’ free-text / numeric ("max β‚Ή15,000/yr", "30k", 22000, "10-15K")
β†’ delegated to needs_finder._parse_budget_band β†’ canonical band
β€’ unrecognised β†’ None (KI-091 null-overwrite guard then refuses to
clobber a previously-captured band)
"""
if value is None:
return None
# Numeric β†’ bucket directly (β‚Ή amount per year).
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
v = int(value)
if v < 15_000:
return "under_15k"
if v < 30_000:
return "15k_30k"
if v < 60_000:
return "30k_60k"
return "60k+"
s = str(value).strip()
if not s:
return None
# Already canonical β€” accept verbatim (case/space tolerant).
norm = s.lower().replace(" ", "")
if norm in _CANONICAL_BUDGET_BANDS:
return norm
# Strip ANNUAL-budget qualifiers FIRST. needs_finder._parse_inr_amount
# treats a bare "yr" / "year" as an AGE context and refuses to read any
# number as currency (the KI-161 age guard) β€” so the canonical live
# phrasing "max β‚Ή15,000/yr" / "β‚Ή15000 per year" parsed to None and the
# band was never captured (Bug #109). These suffixes are unambiguous
# PER-ANNUM budget markers here (the field is explicitly the premium
# budget), not an age, so we drop them before delegating.
import re as _re
cleaned = _re.sub(
r"\b(?:per\s*(?:year|annum)|p\.?\s*a\.?|/\s*(?:yr|year|annum)|"
r"a\s*year|annually|yearly|/\s*yr)\b",
" ",
s,
flags=_re.IGNORECASE,
)
# Free-text / amount β†’ canonical band via the shared parser. This
# handles "max β‚Ή15,000", "30k", "around 25000", "15-30k", "1 lakh".
try:
from backend.needs_finder import _parse_budget_band
band = _parse_budget_band(cleaned)
if band in _CANONICAL_BUDGET_BANDS:
return band
# Last-resort: if the qualifier strip left only the amount, try the
# raw string too (covers phrasings the regex didn't anticipate).
if cleaned != s:
band = _parse_budget_band(s)
if band in _CANONICAL_BUDGET_BANDS:
return band
except Exception: # noqa: BLE001 β€” parser optional; fall through to None
pass
return None
def _coerce_bool(value: Any) -> Optional[bool]:
"""Tri-state bool coercion for parents_to_insure / parents_has_ped.
Accepts: True / False / "yes" / "no" / "y" / "n" / "true" / "false"
/ 1 / 0. Anything else β†’ None (so the KI-091 null-overwrite guard
refuses to clobber a previously-captured value).
"""
if value is None:
return None
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
s = str(value).strip().lower()
if s in ("true", "yes", "y", "1"):
return True
if s in ("false", "no", "n", "0"):
return False
return None
def _coerce_smoker(value: Any) -> Optional[bool]:
"""KI-275 (2026-05-15) β€” tri-state bool for smoker / tobacco use.
Accepts:
- True / "yes" / "true" / "smoker" / "smokes" / "tobacco" / 1 β†’ True
- False / "no" / "false" / "non-smoker" / "doesn't smoke" / 0 β†’ False
- None / "" / unclear β†’ None
Returning None lets the KI-091 null-overwrite guard in
save_profile_field refuse to clobber a previously-captured value.
"""
if value is None:
return None
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
s = str(value).strip().lower()
if not s:
return None
_YES = {
"yes", "y", "true", "1",
"smoker", "smokes", "smoke", "i smoke",
"tobacco", "tobacco user", "uses tobacco",
"i do", "yep", "yeah", "yup",
}
_NO = {
"no", "n", "false", "0",
"non-smoker", "nonsmoker", "non smoker",
"doesn't smoke", "does not smoke", "dont smoke", "don't smoke",
"i don't", "i do not", "nope", "never", "no tobacco",
}
if s in _YES:
return True
if s in _NO:
return False
# Substring fall-through for prose ("I'm a non-smoker", "I smoke daily").
if any(tok in s for tok in ("non-smoker", "nonsmoker", "non smoker", "don't smoke",
"doesn't smoke", "do not smoke", "no tobacco")):
return False
if any(tok in s for tok in ("smoker", "smokes", "tobacco")):
return True
return None
def _coerce_desired_sum_insured(value: Any) -> Optional[int]:
"""Parse desired sum insured (cover amount) as integer rupees.
Accepts: "10L" / "10 lakh" / "1 crore" / "1Cr" / 1000000 /
"β‚Ή10,00,000" / "five lakh" (rejected β€” words not numerals).
Delegates to `_parse_inr_amount` from needs_finder for the heavy lift,
falls back to bare-digit extraction. Clamps to [50_000, 500_000_000]
(β‚Ή50K floor, β‚Ή50Cr ceiling) β€” anything outside is implausible for a
health-insurance sum insured and likely a parse error.
"""
if value is None:
return None
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
n = int(value)
return max(50_000, min(500_000_000, n))
try:
from backend.needs_finder import _parse_inr_amount
parsed = _parse_inr_amount(str(value))
if parsed is not None:
return max(50_000, min(500_000_000, int(parsed)))
except Exception: # noqa: BLE001
pass
# Last-ditch: strip non-digits (handles "β‚Ή10,00,000" if parser missed).
try:
digits = "".join(ch for ch in str(value) if ch.isdigit())
if digits:
n = int(digits)
if n >= 50_000:
return min(500_000_000, n)
except Exception: # noqa: BLE001
pass
return None
def _coerce_health_conditions(value: Any) -> Optional[list[str]]:
"""Always return list[str] lowercase, stripped, empties dropped.
KI-Z6-NONE (2026-05-15): "none" / "no" / "n/a" β€” used to be stripped to
`[]`, but downstream `save_profile_field` then hits the KI-091 null-
overwrite guard (`normalized in (None, "", [])`) and refuses to persist
the slot. Result: profile.health_conditions stays empty forever,
`_profile_complete` returns False, retrieve_policies returns
profile_incomplete, the brain loops, MAX_ITERATIONS exhausts, the bot
emits "Sorry β€” I lost my train of thought" (W1 Turn 3 live blocker).
Fix: keep the explicit-negation sentinel `["none"]` so:
β€’ the slot is non-empty β†’ _profile_complete=True β†’ retrieve fires
β€’ downstream consumers can still detect "no PED" via the literal
token `"none"` in the list (callers already lowercase-compare).
"""
if value is None:
return None
if isinstance(value, str):
# Gemini sometimes emits comma-joined strings instead of a list.
items = [t.strip() for t in value.split(",")]
elif isinstance(value, (list, tuple)):
items = [str(t).strip() for t in value]
else:
items = [str(value).strip()]
cleaned = [t.lower() for t in items if t]
# Explicit-negation tokens β€” collapse to the canonical sentinel
# `["none"]` rather than `[]` so the slot is captured, not blanked.
_NEGATION = {"none", "no", "n/a", "na", "nil", "nothing", "healthy"}
if cleaned and all(t in _NEGATION for t in cleaned):
return ["none"]
# Mixed input ("diabetes, none") β€” drop the negation noise, keep real
# conditions.
real = [t for t in cleaned if t not in _NEGATION]
return real
# ---------------------------------------------------------------------------
# D2 (2026-05-15) β€” copay_pct + family_medical_history coercers
# ---------------------------------------------------------------------------
# Word-number map for "twenty", "ten" etc. (RULE 2.5 asks the user in
# multiples of 10; Gemini sometimes echoes the user's word verbatim).
_COPAY_WORD_TO_INT: dict[str, int] = {
"zero": 0, "none": 0, "no": 0,
"ten": 10, "fifteen": 15, "twenty": 20,
"twenty five": 25, "twenty-five": 25,
"thirty": 30, "forty": 40, "fifty": 50,
}
def _coerce_copay_pct(value: Any) -> Optional[int]:
"""Parse a co-pay tolerance percent, clamped to [0, 50].
Accepts:
- int / float β†’ int + clamp
- "20", "20%", " 20 % ", "20 percent" β†’ 20
- "no copay" / "zero" / "none" β†’ 0
- word numbers like "twenty" β†’ 20
- bool β†’ blocked (KI-091 null-overwrite caution: bool is an int subclass)
Returns None for unrecognised input so the null-overwrite guard in
save_profile_field can refuse to clobber a previously-captured slot.
"""
if value is None:
return None
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
n = int(value)
return max(0, min(50, n))
s = str(value).strip().lower()
if not s:
return None
# Explicit zero phrasings.
if s in ("no", "none", "nil", "zero", "no copay", "no co-pay", "no co pay"):
return 0
# Word-number lookup (exact match).
if s in _COPAY_WORD_TO_INT:
return _COPAY_WORD_TO_INT[s]
# Strip "%" + "percent" + "pct".
cleaned = (
s.replace("%", " ")
.replace("percent", " ")
.replace("pct", " ")
.replace("copay", " ")
.replace("co-pay", " ")
.replace("co pay", " ")
)
# Digit run.
import re as _re
m = _re.search(r"\d+(?:\.\d+)?", cleaned)
if m:
try:
n = int(float(m.group(0)))
return max(0, min(50, n))
except ValueError:
return None
# Word-number fall-through (substring on cleaned text).
for word, num in _COPAY_WORD_TO_INT.items():
if word in cleaned.split():
return num
return None
# Alias map for family medical history β€” same canonicalisation logic as
# health_conditions but kept inline so this slot stays self-contained.
_FAMILY_HISTORY_ALIASES: dict[str, str] = {
"bp": "hypertension",
"high bp": "hypertension",
"high-bp": "hypertension",
"hi-bp": "hypertension",
"high blood pressure": "hypertension",
"blood pressure": "hypertension",
"sugar": "diabetes",
"diabetic": "diabetes",
"type 2 diabetes": "diabetes",
"type 1 diabetes": "diabetes",
"heart attack": "heart",
"heart disease": "heart",
"cardiac": "heart",
"cardiac disease": "heart",
"stroke": "heart",
"tumor": "cancer",
"tumour": "cancer",
"carcinoma": "cancer",
}
_FAMILY_HISTORY_NEGATION = {
"none", "no", "n/a", "na", "nil", "nothing", "healthy",
"no family history", "no history", "no medical history",
}
def _coerce_family_medical_history(value: Any) -> Optional[list[str]]:
"""Return list[str] lowercase canonical conditions running in BLOOD family.
Accepts:
- list / tuple of strings
- comma-joined string ("cancer, diabetes")
- "none" / "no family history" β†’ []
Alias map collapses BP/sugar/cardiac/tumor β†’ hypertension/diabetes/heart/
cancer respectively (same family as _coerce_health_conditions). Negation
sentinels return `[]` since downstream pricing & retrieval BOTH treat an
empty list as the "no family history" branch (different from health_
conditions where the explicit `["none"]` sentinel is needed for the
profile-completeness gate).
"""
if value is None:
return None
if isinstance(value, str):
items = [t.strip() for t in value.split(",")]
elif isinstance(value, (list, tuple)):
items = [str(t).strip() for t in value]
else:
items = [str(value).strip()]
cleaned = [t.lower() for t in items if t]
# Full-string negation collapses to [].
if cleaned and all(t in _FAMILY_HISTORY_NEGATION for t in cleaned):
return []
# Drop negation noise from mixed input ("cancer, none").
cleaned = [t for t in cleaned if t not in _FAMILY_HISTORY_NEGATION]
# Canonicalise via alias map.
canonical: list[str] = []
seen: set[str] = set()
for t in cleaned:
c = _FAMILY_HISTORY_ALIASES.get(t, t)
if c and c not in seen:
seen.add(c)
canonical.append(c)
return canonical
__all__ = [
"save_profile_field",
"retrieve_policies",
"mark_recommendation",
"SLOT_UNION",
"union_snapshot",
]