Spaces:
Sleeping
Sleeping
| """Tool functions for single_brain.py (Path B β single-LLM architecture). | |
| The Gemini Flash model exposed in `backend/single_brain.py` calls these | |
| three tools to (a) persist captured profile fields, (b) retrieve policy | |
| chunks from Chroma, and (c) mark which policies it has recommended on | |
| the current turn so follow-ups like "tell me more about #2" can resolve. | |
| Each function: | |
| * Takes plain JSON-serialisable inputs (str / int / list[str]). | |
| * Returns a plain JSON-serialisable dict that gets fed back to the LLM | |
| on the next iteration of the function-calling loop. | |
| * Never raises β failures are surfaced via {"ok": False, "error": "..."} | |
| so the LLM can decide whether to retry or recover. | |
| The Gemini function-calling DSL (JSON Schema-flavoured) for these three | |
| tools is generated by `single_brain.TOOL_SCHEMAS` from this module's | |
| metadata. | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SLOT_UNION β SINGLE SOURCE OF TRUTH FOR CAPTURED FIELDS (B6, 2026-05-15) | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SLOT_UNION enumerates every captured field that influences EITHER the | |
| recommendation pipeline (retrieval query + scorecard) OR the pricing | |
| pipeline (premium_calculator.estimate / bulk_estimate). It is the contract | |
| between brain_tools (capture surface), single_brain (LLM tool calls), | |
| premium_calculator (pricing inputs) and scorecard (match scoring). | |
| Slot β consumer matrix: | |
| RECOMMENDATION SLOTS (in _REQUIRED_FOR_READY β hard gate) | |
| name β profile identity (no pricing influence) | |
| age β retrieval query + scorecard + pricing (age band) | |
| dependents β retrieval query + scorecard + pricing (family loading) | |
| location_tier β retrieval query + scorecard + pricing (location loading) | |
| income_band β retrieval query + scorecard | |
| primary_goal β retrieval query + scorecard | |
| health_conditions β retrieval query + scorecard + pricing (health loading) | |
| PRICING-ONLY SLOTS (B5 + B6 additions β SOFT capture, post-recap) | |
| budget_band β pricing band match | |
| desired_sum_insured_inr β pricing SI override (per-policy estimate) | |
| existing_cover_inr β pricing (existing-cover discount loading) | |
| FAMILY-DETAIL SLOTS (used by pricing if dependents includes parents) | |
| parents_to_insure β triggers parents_loading branch | |
| parents_age_max β pricing (parents age loading 1.0Γ / 1.4Γ / 1.8Γ) | |
| parents_has_ped β pricing (PED loading inflation for parents) | |
| D2 ADDITIONS (2026-05-15 β copay + family medical history) | |
| copay_pct β pricing (copay discount 1.0Γ / 0.95Γ / 0.88Γ / 0.80Γ) | |
| family_medical_history β pricing (family-history loading) + retrieval boost | |
| KI-275 (2026-05-15 β smoker / tobacco) | |
| smoker β pricing (smoker_loading 1.0Γ / 1.40Γ, +30-50%) | |
| Total: 16 slots. `gender` is tolerated by save_profile_field for forward | |
| compat but is NOT on the Profile dataclass today; it does not appear in | |
| SLOT_UNION because no consumer reads it. | |
| """ | |
| from __future__ import annotations | |
| import json as _json | |
| import logging | |
| import time | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| from backend.config import settings | |
| from backend.policy_identity import clean_display_policy_name | |
| _log = logging.getLogger(__name__) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # KI-278 (2026-05-16) β policy-fact enrichment for the eligibility/ranking | |
| # filter. The Chroma chunk only carries citation metadata; the structural | |
| # facts the eligibility filter needs (top-up signal, sum-insured options, | |
| # mandatory co-pay) live in 40-data/policy_facts/*.json. | |
| # | |
| # IMPORTANT: this code only READS those JSON files and only reads STABLE | |
| # SCHEMA KEYS (policy_type_indemnity_or_fixed / deductible_amount / | |
| # co_payment_pct / sum_insured_options). The concurrent scorecard-repair | |
| # process rewrites the *values*; the *field names* are the contract and do | |
| # not change. We never write to policy_facts/*.json. | |
| # | |
| # A short in-process cache keeps this off the hot path (the catalog is ~250 | |
| # files; we only ever touch the β€8 retrieved per turn, and memoise stems). | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _FACTS_DIR = settings.DATA_DIR / "policy_facts" | |
| _DOCTYPE_SUFFIXES = ("__wordings", "__brochure", "__cis", "__prospectus") | |
| # Width of the Chroma candidate pool pulled before eligibility + | |
| # scorecard-aware ranking β wide enough that a strong policy ranked only | |
| # mid-cosine for a generic query still enters contention. The LLM still | |
| # only sees the top-k best-fit survivors. | |
| _RECALL_POOL = 40 | |
| _FACT_KEYS = ( | |
| "policy_type_indemnity_or_fixed", | |
| # Raw catalog type key β some curated files carry the product type | |
| # only under `policy_type` (e.g. "hospital_cash") with no | |
| # `policy_type_indemnity_or_fixed`; the fixed-benefit gate needs it. | |
| "policy_type", | |
| "deductible_amount", | |
| "co_payment_pct", | |
| "sum_insured_options", | |
| # Eligibility-gate keys (read-only): | |
| # uin_code β canonical dedup identity (1 UIN = 1 product; | |
| # collapses doctype-sibling/rename duplicates). | |
| # max_entry_age β hard eligibility for the insured person; the | |
| # curated value is authoritative and overrides | |
| # a missing/None Chroma chunk field. | |
| # maternity_coverage β required-feature gate: an explicit maternity / | |
| # newborn_coverage newborn need ranks confirmed plans first. | |
| "uin_code", | |
| "max_entry_age", | |
| "maternity_coverage", | |
| "newborn_coverage", | |
| ) | |
| # Bug #44 (2026-05-19) β DECISION-CRITICAL coverage fields that MUST be | |
| # resolved from the SAME canonical curated entry the marketplace | |
| # scorecard / #31 profile-summary path uses, so a verbal answer (or a | |
| # comparison TABLE the LLM builds from get_policy_facts) can never | |
| # contradict the scorecard card for the same policy. | |
| # | |
| # Root cause: each product has a base 40-data/policy_facts/<id>.json AND | |
| # doctype siblings (__wordings/__cis/__brochure/__prospectus). The | |
| # scorecard path resolves via main._load_curated_facts() (KI-219/KI-251 | |
| # canonical precedence). get_policy_facts previously surfaced facts via | |
| # _load_policy_facts() β a DIFFERENT _candidate_stems resolver whose | |
| # 7-key _FACT_KEYS doesn't even include PED β so the two paths could | |
| # read different files and disagree on PED waiting (a live audit found | |
| # the comparison table said 24mo while the #31 card said "0 months"). | |
| # | |
| # Fix: these specific fields are taken from the canonical curated entry | |
| # (`_curated_facts_all()[pid]`, the EXACT dict main._load_curated_facts() | |
| # feeds build_scorecard) so both surfaces agree BY CONSTRUCTION. This | |
| # mirrors the existing _scorecard_signal β marketplace_grade single- | |
| # source pattern (#40 / KI-219). The guard | |
| # tests/test_policy_facts_source_consistency.py asserts catalogue-wide | |
| # agreement. | |
| _DECISION_CRITICAL_FACT_KEYS = ( | |
| "pre_existing_disease_waiting_months", | |
| "initial_waiting_period_days", | |
| "copayment_pct", | |
| "room_rent_capping", | |
| "claim_settlement_ratio", | |
| ) | |
| def canonical_decision_facts(policy_id: str) -> dict: | |
| """Decision-critical coverage facts for `policy_id`, resolved from the | |
| SAME canonical curated entry the marketplace scorecard path uses | |
| (main._load_curated_facts via _curated_facts_all). Returns only the | |
| keys in _DECISION_CRITICAL_FACT_KEYS that have a non-empty value. | |
| Single source of truth (Bug #44): the scorecard / #31 path and | |
| get_policy_facts BOTH read these fields from this one canonical entry, | |
| so they cannot drift. Read-only; never raises (returns {} on any | |
| failure so the tool degrades gracefully).""" | |
| pid = (policy_id or "").strip() | |
| if not pid: | |
| return {} | |
| try: | |
| all_cur = _curated_facts_all() or {} | |
| except Exception: # noqa: BLE001 β curated layer optional | |
| return {} | |
| entry = all_cur.get(pid) | |
| if entry is None: | |
| # Curated layer registers every doctype-suffix permutation + | |
| # each sibling stem/policy_id pointing at the canonical entry | |
| # (main._load_curated_facts Pass-2). Fall back to the canonical | |
| # (doctype-stripped) form for callers holding a suffixed id. | |
| for suf in _DOCTYPE_SUFFIXES: | |
| if pid.endswith(suf): | |
| entry = all_cur.get(pid[: -len(suf)]) | |
| break | |
| if not isinstance(entry, dict): | |
| return {} | |
| out: dict = {} | |
| for k in _DECISION_CRITICAL_FACT_KEYS: | |
| v = entry.get(k) | |
| # _load_curated_facts already unwraps {value, source_*} to scalar; | |
| # accept the wrapped shape too, defensively. | |
| if isinstance(v, dict) and "value" in v: | |
| v = v.get("value") | |
| if v in (None, "", []): | |
| continue | |
| out[k] = v | |
| return out | |
| _facts_cache: dict[str, dict] = {} | |
| def _unwrap(v: Any) -> Any: | |
| """policy_facts values are `{value, source_pdf_path, source_quote}` | |
| wrappers OR bare scalars. Return the underlying value either way.""" | |
| if isinstance(v, dict) and "value" in v: | |
| return v["value"] | |
| return v | |
| def _candidate_stems(policy_id: str) -> list[str]: | |
| """A retrieved chunk's policy_id may be the canonical stem | |
| (`insurer__product`) OR a doctype-suffixed stem | |
| (`insurer__product__cis`). Try the exact id first, then the canonical | |
| form, then every doctype sibling β first existing file wins. | |
| """ | |
| pid = (policy_id or "").strip() | |
| if not pid: | |
| return [] | |
| stems = [pid] | |
| base = pid | |
| for suf in _DOCTYPE_SUFFIXES: | |
| if base.endswith(suf): | |
| base = base[: -len(suf)] | |
| break | |
| if base != pid: | |
| stems.append(base) | |
| for suf in _DOCTYPE_SUFFIXES: | |
| cand = base + suf | |
| if cand not in stems: | |
| stems.append(cand) | |
| return stems | |
| _extraction_cache: dict = {} | |
| def _has_extraction(policy_id: str) -> bool: | |
| """True iff this policy has an LLM-extracted corpus file β the EXACT | |
| renderability rule the marketplace uses (main builds its card set from | |
| settings.EXTRACTED_DIR/*.json). A policy with curated facts but no | |
| extracted file renders as a BROKEN card: raw policy_id as the title, | |
| grade "N/A", "No extraction available for this policy.", "Data not | |
| indexed" (/api/bulk-scorecard). Such a policy must NEVER be quality- | |
| seeded or cited. Canonical-stem aware (doctype siblings count); cached | |
| incl. negatives.""" | |
| pid = (policy_id or "").strip() | |
| if not pid: | |
| return False | |
| if pid in _extraction_cache: | |
| return _extraction_cache[pid] | |
| ok = False | |
| try: | |
| for stem in _candidate_stems(pid): | |
| if (settings.EXTRACTED_DIR / f"{stem}.json").exists(): | |
| ok = True | |
| break | |
| except Exception: # noqa: BLE001 β predicate must never break retrieval | |
| ok = False | |
| _extraction_cache[pid] = ok | |
| return ok | |
| def _load_policy_facts(policy_id: str) -> dict: | |
| """Return {fact_key: value} for a policy_id, or {} when no facts file | |
| exists / is unreadable. Cached per policy_id (incl. negative cache).""" | |
| if policy_id in _facts_cache: | |
| return _facts_cache[policy_id] | |
| out: dict = {} | |
| try: | |
| for stem in _candidate_stems(policy_id): | |
| fp = _FACTS_DIR / f"{stem}.json" | |
| if not fp.exists(): | |
| continue | |
| try: | |
| d = _json.loads(fp.read_text()) | |
| except Exception: # noqa: BLE001 β corrupt mid-repair β skip | |
| continue | |
| for k in _FACT_KEYS: | |
| if k in d and k not in out: | |
| out[k] = _unwrap(d[k]) | |
| # Stop once we have the decision-critical signals. KI-279: a | |
| # type signal is "have it" if EITHER the canonical key OR the | |
| # raw `policy_type` key is present (some files only carry the | |
| # latter), so the fixed-benefit gate is never starved. | |
| # KI-280: also require the unified-gate signals (uin_code + | |
| # max_entry_age) before breaking so the eligibility/dedup rules | |
| # are not starved when a sibling doctype file carries them. The | |
| # candidate-stem list is short (β€5) so iterating it fully is | |
| # cheap; this just stops us breaking after stem #1 when the UIN | |
| # / entry-age lives in stem #2. | |
| _have_type = ( | |
| "policy_type_indemnity_or_fixed" in out | |
| or "policy_type" in out | |
| ) | |
| if ( | |
| _have_type | |
| and "sum_insured_options" in out | |
| and "uin_code" in out | |
| and "max_entry_age" in out | |
| ): | |
| break | |
| except Exception as e: # noqa: BLE001 β enrichment must never break retrieval | |
| _log.warning("KI-278 _load_policy_facts(%s) failed: %s", policy_id, e) | |
| out = {} | |
| _facts_cache[policy_id] = out | |
| return out | |
| _curated_all_cache: dict = {} # single-slot cache for the full curated layer | |
| _reviews_cache: dict = {} | |
| def _curated_facts_all() -> dict: | |
| """The full curated-facts layer β the same source the marketplace | |
| scores from (main._load_curated_facts, KI-219/251 canonical | |
| precedence; lazy import since main is loaded at request time). Using | |
| it here makes a policy's recommendation grade identical to its | |
| marketplace grade. The 7-key `_load_policy_facts` is the input for the | |
| eligibility/dedup gate, not for grading.""" | |
| if "d" not in _curated_all_cache: | |
| from backend.main import _load_curated_facts # lazy: avoids cycle | |
| _curated_all_cache["d"] = _load_curated_facts() | |
| return _curated_all_cache["d"] | |
| def _insurer_reviews(slug: str) -> Optional[dict]: | |
| """Insurer reviews (40-data/reviews/<slug>.json) passed to | |
| build_scorecard β drives the claim-experience sub-score; the same | |
| source the marketplace uses.""" | |
| if not slug: | |
| return None | |
| if slug in _reviews_cache: | |
| return _reviews_cache[slug] | |
| ir = None | |
| try: | |
| rp = settings.DATA_DIR / "reviews" / f"{slug}.json" | |
| if rp.exists(): | |
| ir = _json.loads(rp.read_text()) | |
| except Exception: # noqa: BLE001 β reviews optional | |
| ir = None | |
| _reviews_cache[slug] = ir | |
| return ir | |
| def _scorecard_signal(policy_id: str, profile=None) -> dict: | |
| """{_grade, _overall_score} for policy_id β the SAME grade its | |
| marketplace card shows (#40 SINGLE SOURCE OF TRUTH, 2026-05-18). | |
| Delegates to backend.main.marketplace_grade, which derives the grade | |
| from the ONE marketplace catalogue computation | |
| (backend.main._marketplace_catalogue) using UIN-canonical resolution: | |
| a marketing-rename / KI-145 variant id resolves onto its canonical | |
| card. There is no longer a parallel doctype-rank/_merge | |
| re-implementation here, so the cited-card grade cannot drift from the | |
| marketplace card grade β for ALL 148 by construction, asserted by | |
| tests/test_scorecard_parity.py. | |
| `profile` is accepted for call-site compatibility. The parity-relevant | |
| grade LETTER (what _recommendation_fit gates on) is the profile-neutral | |
| marketplace grade, identical to the /api/policies/all catalogue view. | |
| Read-only; returns {} only when the policy is unknown to the | |
| marketplace (ranking still works off co-pay + SI headroom + cosine).""" | |
| try: | |
| from backend.main import marketplace_grade # lazy: avoids cycle | |
| sig = marketplace_grade(policy_id) | |
| if sig and sig.get("_grade"): | |
| return { | |
| "_grade": sig.get("_grade"), | |
| "_overall_score": sig.get("_overall_score"), | |
| } | |
| return {} | |
| except Exception: # noqa: BLE001 β scorecard optional; ranking degrades gracefully | |
| return {} | |
| # ---- accepted fields for save_profile_field -------------------------------- | |
| # Mirrors the slot list in sales_brain._REQUIRED_FOR_READY plus the | |
| # nice-to-have fields the LLM may capture opportunistically. `gender` | |
| # is listed in the spec but is NOT a Profile dataclass field today β | |
| # we silently no-op on it rather than rejecting (forward-compat). | |
| _ACCEPTED_FIELDS = { | |
| "name", | |
| "age", | |
| "dependents", | |
| "location_tier", | |
| "income_band", | |
| "primary_goal", | |
| "health_conditions", | |
| "existing_cover_inr", | |
| "budget_band", | |
| "budget_inr", # #64 β exact βΉ/yr; set as a side-effect of budget_band | |
| "desired_sum_insured_inr", # SOFT capture (pricing input, post-recap) | |
| # Family-detail pricing inputs (B6) β already on the Profile dataclass | |
| # via needs_finder.Profile (parents_to_insure / parents_age_max / | |
| # parents_has_ped). Listed here so save_profile_field can persist them | |
| # when Gemini extracts them post-recap. | |
| "parents_to_insure", | |
| "parents_age_max", | |
| "parents_has_ped", | |
| # D2 (2026-05-15) β coupled additions: co-pay tolerance + family medical history | |
| "copay_pct", | |
| "family_medical_history", | |
| # KI-275 (2026-05-15) β smoker / tobacco use, +30-50% premium loading. | |
| "smoker", | |
| "gender", # tolerated; not persisted unless Profile gains the field | |
| } | |
| # These are the slots the brain MUST capture before recommending β same | |
| # list as sales_brain._REQUIRED_FOR_READY. Kept inline (not imported) | |
| # to avoid coupling single_brain to sales_brain. | |
| _REQUIRED_FOR_READY = ( | |
| "name", | |
| "age", | |
| "dependents", | |
| "location_tier", | |
| "income_band", | |
| "primary_goal", | |
| "health_conditions", | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Bug #108 + #110 (2026-05-16) β POST-RECAP pricing & family-history bundle. | |
| # | |
| # Bug #108: the RULE 2.5 bundle (SI / budget / co-pay / family history / | |
| # smoker) is asked in ONE prompt. When the user answers SOME but | |
| # not all, the LLM treated the bundle as fully satisfied (it's | |
| # SOFT capture) and recommended with the unanswered slot blank β | |
| # e.g. asked SI/budget/co-pay/smoker, user gave SI/budget/co-pay, | |
| # smoker never re-asked, bot recommended. | |
| # Bug #110: family_medical_history (item 5 of the bundle) was the slot most | |
| # often silently dropped β the fact-find effectively NEVER asked it. | |
| # | |
| # FIX: a deterministic ONE-SHOT re-ask gate. After the hard 7-slot gate | |
| # passes, on a recommendation retrieve_policies call, if any UNRESOLVED | |
| # bundle slot remains we return a directive `pricing_inputs_incomplete` | |
| # response (re-ask exactly the unresolved ones, verbatim) β but ONLY once | |
| # per session, and ONLY when the user has not explicitly skipped. After the | |
| # single re-ask we proceed (SOFT-capture semantics preserved: the user can | |
| # still skip; we never hard-loop). | |
| # | |
| # A bundle slot is RESOLVED when it is either captured on the profile OR the | |
| # user explicitly declined/skipped it (session.pricing_bundle_skipped β set | |
| # by single_brain when the user says "just show me options" / "skip" / "you | |
| # decide"). parents_age_max is conditional: only part of the bundle when the | |
| # user is covering parents and it isn't captured yet. | |
| _PRICING_BUNDLE_CORE: tuple[str, ...] = ( | |
| "desired_sum_insured_inr", | |
| "budget_band", | |
| "existing_cover_inr", | |
| "copay_pct", | |
| "family_medical_history", # Bug #110 β must be asked, same skip handling | |
| "smoker", | |
| ) | |
| # Exact verbatim re-ask phrasing per bundle slot (mirrors RULE 2.5 wording so | |
| # the re-ask reads identically to the first ask). family_medical_history is | |
| # first so a forgotten Bug #110 slot leads the re-ask. | |
| _PRICING_BUNDLE_QUESTIONS: dict[str, str] = { | |
| "family_medical_history": ( | |
| "Any major conditions running in your blood family " | |
| "(parents/siblings) β cancer / diabetes / heart disease / " | |
| "hypertension? (say 'none' if not)" | |
| ), | |
| "desired_sum_insured_inr": ( | |
| "How much sum insured would you like? (e.g., βΉ5L / βΉ10L / " | |
| "βΉ25L / βΉ1Cr)" | |
| ), | |
| "budget_band": ( | |
| "What's your annual premium budget? (e.g., βΉ10-15K/year, or " | |
| "βΉ50K+ for premium covers)" | |
| ), | |
| "existing_cover_inr": ( | |
| "Any existing health cover from work or otherwise? " | |
| "(e.g., '5L through employer', or 'no')" | |
| ), | |
| "copay_pct": ( | |
| "Are you OK with a co-pay β sharing 10-30% of every claim β to " | |
| "lower the premium? Or do you want zero co-pay?" | |
| ), | |
| "smoker": "Do you smoke or use tobacco products? (yes / no)", | |
| "parents_age_max": ( | |
| "Roughly what's the age of the eldest parent you'd cover?" | |
| ), | |
| } | |
| def _profile_has_parents(profile) -> bool: | |
| """True when the captured dependents indicate parents are covered (so | |
| parents_age_max becomes part of the post-recap bundle).""" | |
| dep = getattr(profile, "dependents", None) | |
| if isinstance(dep, str) and "parent" in dep.lower(): | |
| return True | |
| return bool(getattr(profile, "parents_to_insure", None)) | |
| def _unresolved_pricing_bundle(profile, session) -> list[str]: | |
| """Return the bundle slots that are NEITHER captured on the profile NOR | |
| explicitly skipped by the user this session. Empty list β nothing left | |
| to re-ask (recommend may proceed).""" | |
| if session is not None and bool( | |
| getattr(session, "pricing_bundle_skipped", False) | |
| ): | |
| # User explicitly said "just show me options / you decide / skip". | |
| return [] | |
| bundle = list(_PRICING_BUNDLE_CORE) | |
| if _profile_has_parents(profile): | |
| bundle.append("parents_age_max") | |
| # #41 (2026-05-21) β a slot the user has ANSWERED is resolved even if | |
| # the answer coerces to an empty value (family_medical_history="none" | |
| # β []). profile.asked records answered slots, so an empty-but-asked | |
| # slot is NOT re-asked. | |
| asked = set(getattr(profile, "asked", None) or []) | |
| unresolved: list[str] = [] | |
| for slot in bundle: | |
| if getattr(profile, slot, None) in (None, "", []) and slot not in asked: | |
| unresolved.append(slot) | |
| return unresolved | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SLOT_UNION (B6, 2026-05-15) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Single source of truth for every captured field that drives EITHER the | |
| # profile (recommendation match + scorecard) OR the pricing pipeline | |
| # (premium_calculator.estimate / bulk_estimate). See module docstring above | |
| # for the full slotβconsumer matrix. | |
| # | |
| # Ordering convention (do not re-order without auditing union_snapshot | |
| # callers): required slots first, then pricing-only slots, then | |
| # family-detail slots. Total = 13. | |
| SLOT_UNION: tuple[str, ...] = ( | |
| # Recommendation slots (in _REQUIRED_FOR_READY) | |
| "name", | |
| "age", | |
| "dependents", | |
| "location_tier", | |
| "income_band", | |
| "primary_goal", | |
| "health_conditions", | |
| # Pricing slots (B5 + B6 additions) | |
| "budget_band", | |
| "budget_inr", # #64 β exact βΉ/yr (lossless companion to budget_band) | |
| "desired_sum_insured_inr", | |
| "existing_cover_inr", | |
| # Family-detail slots (used by pricing if applicable) | |
| "parents_to_insure", | |
| "parents_age_max", | |
| "parents_has_ped", | |
| # D2 additions (2026-05-15) | |
| "copay_pct", | |
| "family_medical_history", | |
| # KI-275 (2026-05-15) β smoker / tobacco use, +30-50% premium loading. | |
| "smoker", | |
| ) | |
| # Invariant: every SLOT_UNION field must be accepted by save_profile_field | |
| # (otherwise the LLM can never capture it). Validated at import time so a | |
| # future delete here is loud, not silent. | |
| assert all(_s in _ACCEPTED_FIELDS for _s in SLOT_UNION), ( | |
| "SLOT_UNION contains a field not in _ACCEPTED_FIELDS: " | |
| f"{[s for s in SLOT_UNION if s not in _ACCEPTED_FIELDS]}" | |
| ) | |
| def union_snapshot(profile) -> dict: | |
| """Return a JSON-safe dict of every SLOT_UNION field currently captured | |
| on `profile`. Empty / None / [] slots are EXCLUDED so the pricing | |
| pipeline can safely treat presence as "captured" (KI-091 null-overwrite | |
| rule β never pass None where 0 is meaningful). | |
| Used by premium_calculator.estimate / bulk_estimate to read pricing | |
| inputs without re-implementing the field-name list on each side. | |
| """ | |
| snap: dict = {} | |
| for fld in SLOT_UNION: | |
| try: | |
| v = getattr(profile, fld, None) | |
| except Exception: # noqa: BLE001 | |
| v = None | |
| if v in (None, "", []): | |
| continue | |
| snap[fld] = v | |
| return snap | |
| def _profile_complete(profile) -> bool: | |
| """Return True when every slot in _REQUIRED_FOR_READY is non-empty on | |
| the live Profile dataclass.""" | |
| for slot in _REQUIRED_FOR_READY: | |
| val = getattr(profile, slot, None) | |
| if val in (None, "", []): | |
| return False | |
| return True | |
| # ---- save_profile_field ---------------------------------------------------- | |
| def save_profile_field(session, field: str, value: Any) -> dict: | |
| """Validate + persist a captured profile field on session.profile. | |
| Accepted fields: name, age, dependents, location_tier, income_band, | |
| primary_goal, health_conditions, existing_cover_inr, | |
| budget_band, gender (tolerated, may no-op). | |
| Lightweight normalization: | |
| - age: int, clamp to [0, 110] | |
| - dependents: pass through `_normalize_dependents` when available, | |
| else stringify. | |
| - existing_cover_inr: parse via `_parse_inr_amount` when available, | |
| else int-coerce. | |
| - health_conditions: coerce to list[str], lowercase, strip empties. | |
| - everything else: string pass-through (Gemini already emits | |
| canonical values when prompted correctly). | |
| Returns: {"saved": True, "field": ..., "value": ..., | |
| "profile_complete": bool} | |
| On unknown field: {"saved": False, "error": "unknown_field"} | |
| """ | |
| if not isinstance(field, str) or not field: | |
| return {"saved": False, "error": "missing_field_name"} | |
| fld = field.strip().lower() | |
| if fld not in _ACCEPTED_FIELDS: | |
| return {"saved": False, "error": f"unknown_field:{field}"} | |
| profile = session.profile | |
| normalized: Any = value | |
| try: | |
| if fld == "age": | |
| normalized = _coerce_age(value) | |
| elif fld == "dependents": | |
| normalized = _coerce_dependents(value) | |
| elif fld == "health_conditions": | |
| normalized = _coerce_health_conditions(value) | |
| elif fld == "existing_cover_inr": | |
| normalized = _coerce_existing_cover(value) | |
| elif fld == "desired_sum_insured_inr": | |
| normalized = _coerce_desired_sum_insured(value) | |
| elif fld in ("parents_to_insure", "parents_has_ped"): | |
| normalized = _coerce_bool(value) | |
| elif fld == "parents_age_max": | |
| normalized = _coerce_age(value) | |
| elif fld == "copay_pct": | |
| normalized = _coerce_copay_pct(value) | |
| elif fld == "family_medical_history": | |
| normalized = _coerce_family_medical_history(value) | |
| elif fld == "smoker": | |
| normalized = _coerce_smoker(value) | |
| elif fld == "budget_band": | |
| normalized = _coerce_budget_band(value) | |
| elif fld == "name": | |
| normalized = (str(value).strip() if value is not None else None) or None | |
| elif fld == "gender": | |
| # Profile dataclass has no gender field today; silently | |
| # accept + skip persistence so the LLM doesn't loop trying | |
| # to save it. Forward-compat: when Profile gains gender, | |
| # this branch just becomes `normalized = str(value).strip()`. | |
| return { | |
| "saved": False, | |
| "field": fld, | |
| "value": value, | |
| "error": "field_not_on_profile_dataclass", | |
| "profile_complete": _profile_complete(profile), | |
| } | |
| else: | |
| # location_tier / income_band / primary_goal β pass through | |
| # (budget_band is normalised above via _coerce_budget_band) | |
| normalized = (str(value).strip() if value is not None else None) or None | |
| except (TypeError, ValueError) as e: | |
| return { | |
| "saved": False, | |
| "field": fld, | |
| "value": value, | |
| "error": f"normalize_failed:{type(e).__name__}:{e}", | |
| } | |
| # Drop None/empty so we don't overwrite a previously-captured slot | |
| # with a Gemini turn that "didn't extract anything". Universal rule | |
| # from KI-091/094 (extractor null overwrite). | |
| if normalized in (None, "", []): | |
| # #41 (2026-05-21) β an explicit NEGATIVE family-history answer | |
| # ("none") legitimately coerces to []. Do NOT silently drop it as | |
| # "extracted nothing": record the field on profile.asked so the | |
| # post-recap pricing-bundle gate (_unresolved_pricing_bundle) | |
| # treats it RESOLVED and never re-asks a slot the user answered. | |
| if fld == "family_medical_history" and value is not None and str(value).strip().lower() in ( | |
| "none", "no", "nil", "nothing", "n/a", "na", "none.", | |
| "no family history", "no family medical history", | |
| "no family medical history.", "no family history.", | |
| ): | |
| try: | |
| if fld not in profile.asked: | |
| profile.asked.append(fld) | |
| except Exception: # noqa: BLE001 | |
| pass | |
| return { | |
| "saved": True, | |
| "field": fld, | |
| "value": [], | |
| "profile_complete": _profile_complete(profile), | |
| } | |
| return { | |
| "saved": False, | |
| "field": fld, | |
| "value": value, | |
| "error": "normalized_empty", | |
| "profile_complete": _profile_complete(profile), | |
| } | |
| if not hasattr(profile, fld): | |
| return { | |
| "saved": False, | |
| "field": fld, | |
| "value": value, | |
| "error": "field_not_on_profile_dataclass", | |
| "profile_complete": _profile_complete(profile), | |
| } | |
| setattr(profile, fld, normalized) | |
| # #64 β when the user states a budget, ALSO preserve the EXACT βΉ amount | |
| # losslessly (not just the 4-bucket band) so the slider shows what they | |
| # actually said ("βΉ15,000"), never a band representative ("βΉ12k"). The | |
| # band stays the pricing contract; budget_inr is the display truth. | |
| if fld == "budget_band": | |
| try: | |
| if isinstance(value, bool): | |
| _exact = None | |
| elif isinstance(value, (int, float)): | |
| _exact = int(value) | |
| else: | |
| from backend.needs_finder import _parse_inr_amount as _pinr | |
| import re as _re2 | |
| # Strip per-annum qualifiers FIRST β same KI-161 guard | |
| # _coerce_budget_band handles, so "βΉ15,000/yr" / "15000 a | |
| # year" / "more than 15000 a year" yield the exact βΉ, not | |
| # None (the parser otherwise reads "year" as age context). | |
| _s = str(value) | |
| _cleaned = _re2.sub( | |
| r"\b(?:per\s*(?:year|annum)|p\.?\s*a\.?|/\s*(?:yr|year|" | |
| r"annum)|a\s*year|annually|yearly|/\s*yr)\b", | |
| " ", | |
| _s, | |
| flags=_re2.IGNORECASE, | |
| ) | |
| _exact = _pinr(_cleaned) or _pinr(_s) | |
| if _exact and _exact > 0 and hasattr(profile, "budget_inr"): | |
| profile.budget_inr = int(_exact) | |
| if "budget_inr" not in getattr(profile, "asked", []): | |
| profile.asked.append("budget_inr") | |
| except Exception: # noqa: BLE001 β exact βΉ is best-effort; band still saved | |
| pass | |
| # Track that the brain has "asked" this field so the rest of the | |
| # codebase's helpers (which inspect profile.asked) stay in sync. | |
| try: | |
| if fld not in getattr(profile, "asked", []): | |
| profile.asked.append(fld) | |
| except Exception: # noqa: BLE001 β best-effort bookkeeping | |
| pass | |
| return { | |
| "saved": True, | |
| "field": fld, | |
| "value": normalized, | |
| "profile_complete": _profile_complete(profile), | |
| } | |
| _qseed_cache: dict = {} # (profile_sig) -> [seed chunk dicts] | |
| # BUG #30 (B1-c) β per-signature count of TRAILING existing-cover top-up | |
| # seeds in `_qseed_cache[sig]`. They sit AFTER the primary window and must | |
| # survive the final `[:limit]` slice so a relevant super-top-up is never | |
| # truncated out of contention for a user who already holds base cover. | |
| _qseed_topup_n: dict = {} | |
| def _qseed_slice(rows: list[dict], sig: str, limit: int) -> list[dict]: | |
| """Return up to `limit` primary seeds PLUS all existing-cover top-up | |
| seeds (which trail the list), so the top-up seeds are never cut.""" | |
| n_top = _qseed_topup_n.get(sig, 0) | |
| if n_top <= 0: | |
| return rows[:limit] | |
| primaries = rows[:-n_top] if n_top < len(rows) else [] | |
| topups = rows[-n_top:] | |
| return primaries[:limit] + topups | |
| def _quality_seed_candidates(profile, limit: int = 25) -> list[dict]: | |
| """Seed the candidate pool with the catalogue's top policies by the | |
| profile-tuned scorecard overall, so a strong policy that isn't | |
| cosine-similar to a generic query still enters contention. This only | |
| ADDS candidates β filter_pipeline still applies precise eligibility + | |
| profile-fit ranking on the union; it never bypasses eligibility or | |
| fabricates a recommendation. Cached per profile-signature.""" | |
| out: list[dict] = [] | |
| try: | |
| prof_sig = repr(sorted( | |
| (s, getattr(profile, s, None)) | |
| for s in ( | |
| "age", "income_band", "primary_goal", "existing_cover_inr", | |
| "copay_pct", "dependents", "health_conditions", | |
| "budget_band", "location_tier", "parents_to_insure", | |
| "parents_age_max", | |
| ) | |
| )) if profile is not None else "none" | |
| if prof_sig in _qseed_cache: | |
| return _qseed_slice(_qseed_cache[prof_sig], prof_sig, limit) | |
| cur = _curated_facts_all() | |
| scored: list[tuple[float, str, dict, dict]] = [] | |
| seen: set[str] = set() | |
| for key, data in cur.items(): | |
| if not isinstance(data, dict): | |
| continue | |
| pid = (data.get("policy_id") or key or "").strip() | |
| if not pid or pid in seen: | |
| continue | |
| seen.add(pid) | |
| if not _has_extraction(pid): | |
| # Curated-graded but no extracted corpus β its card would | |
| # render as N/A / "No extraction available for this policy" | |
| # / "Data not indexed". Never seed a non-renderable policy. | |
| continue | |
| sig = _scorecard_signal(pid, profile=profile) | |
| ov = sig.get("_overall_score") | |
| if ov is None: | |
| continue | |
| try: | |
| ovf = float(ov) | |
| except (TypeError, ValueError): | |
| continue | |
| scored.append((ovf, pid, data, sig)) | |
| scored.sort(key=lambda t: -t[0]) | |
| # BUG #30 (B1-c) β when the user already holds ANY base cover, also | |
| # union-in the top-N top-up / super-top-up policies even if they fall | |
| # outside the profile-neutral top-25 window, so a directly relevant | |
| # super-top-up is seeded into contention (filter_pipeline then ranks | |
| # the union via _fit_score, which now carries the existing-cover term). | |
| # Deterministic: drawn from the already-sorted `scored` list, no RNG. | |
| _existing = getattr(profile, "existing_cover_inr", None) \ | |
| if profile is not None else None | |
| try: | |
| _existing_int = int(str(_existing).replace(",", "").strip()) \ | |
| if _existing not in (None, "") else 0 | |
| except (TypeError, ValueError): | |
| _existing_int = 0 | |
| window = max(limit, 25) | |
| primary_rows = scored[:window] | |
| extra: list[tuple[float, str, dict, dict]] = [] | |
| if _existing_int > 0: | |
| from backend.retrieval_filters import _is_top_up as _rf_is_top_up | |
| in_window = {pid for _, pid, _, _ in primary_rows} | |
| _TOPUP_SEED_MAX = 3 # bounded extra seeds; keeps pool deterministic | |
| for ovf, pid, data, sig in scored[window:]: | |
| if pid in in_window: | |
| continue | |
| probe = { | |
| "policy_name": data.get("policy_name", pid), | |
| **_load_policy_facts(pid), | |
| } | |
| if _rf_is_top_up(probe): | |
| extra.append((ovf, pid, data, sig)) | |
| if len(extra) >= _TOPUP_SEED_MAX: | |
| break | |
| # `_n_topup_seeds` is preserved on the cached list so the final | |
| # slice keeps the existing-cover top-up seeds (they sit AFTER the | |
| # primary window and would otherwise be cut by `out[:limit]`). | |
| for ovf, pid, data, sig in primary_rows + extra: | |
| ch = { | |
| "chunk_id": f"{pid}__qseed", | |
| "policy_id": pid, | |
| "policy_name": clean_display_policy_name( | |
| data.get("policy_name", pid) | |
| ), | |
| "insurer_slug": data.get("insurer_slug") | |
| or (pid.split("__", 1)[0] if "__" in pid else ""), | |
| "doc_type": "policy", | |
| "source_url": data.get("_primary_source_pdf") or "", | |
| # No cosine text β filter_pipeline ranks the union by | |
| # _fit_score (scorecard grade + profile fit), not cosine, | |
| # so a 0.0 cosine score does not bury a seeded A-policy. | |
| "chunk_text": "", | |
| "score": 0.0, | |
| } | |
| ch.update(_load_policy_facts(pid)) | |
| ch.update(sig) | |
| out.append(ch) | |
| _qseed_cache[prof_sig] = out | |
| _qseed_topup_n[prof_sig] = len(extra) | |
| return _qseed_slice(out, prof_sig, limit) | |
| except Exception as e: # noqa: BLE001 β seeding must never break retrieval | |
| _log.warning("quality-seed failed: %s", e) | |
| return out[:limit] | |
| # ---- uploaded-doc (quarantine) bypass helpers ------------------------------ | |
| def _session_has_quarantine_docs(session_id: str) -> bool: | |
| """True iff this session has at least one chunk in the SEPARATE | |
| `user_uploads_quarantine` Chroma collection. | |
| Strictly session-scoped (where={"session_id": session_id}); cheap | |
| metadata-only `.get(limit=1)`. Never raises β a Chroma hiccup just | |
| means "treat as no upload" so the normal profile-gate path runs. | |
| """ | |
| if not session_id: | |
| return False | |
| try: | |
| from rag.ingest import get_quarantine_collection | |
| q = get_quarantine_collection().get( | |
| where={"session_id": session_id}, | |
| limit=1, | |
| include=[], | |
| ) | |
| return bool(q and q.get("ids")) | |
| except Exception as e: # noqa: BLE001 β best-effort probe | |
| _log.warning( | |
| "quarantine presence probe failed (sid=%s): %s: %s", | |
| session_id, type(e).__name__, str(e)[:160], | |
| ) | |
| return False | |
| async def _retrieve_uploaded_only( | |
| query: str, session_id: str, top_k: int | |
| ) -> dict: | |
| """Quarantine-scoped retrieval that returns ONLY this session's | |
| uploaded-doc chunks, bypassing the recommendation profile-gate. | |
| Reuses `rag.retrieve.retrieve(session_id=...)` (whose quarantine boost | |
| pass already prepends this session's uploaded chunks), then filters the | |
| result down to `doc_type == "user_upload"` so the bypass response can | |
| NEVER contain general-corpus policy chunks against an incomplete | |
| profile. Returns the same dict shape as `retrieve_policies`. | |
| """ | |
| try: | |
| from rag.retrieve import retrieve as _retrieve | |
| chunks = await _retrieve( | |
| query=query, | |
| top_k=max(int(top_k) if top_k else 8, 3), | |
| session_id=session_id, | |
| ) | |
| except Exception as e: # noqa: BLE001 β graceful empty | |
| _log.warning( | |
| "uploaded-only retrieve failed (sid=%s): %s: %s", | |
| session_id, type(e).__name__, str(e)[:160], | |
| ) | |
| return {"chunks": [], "count": 0, "error": f"{type(e).__name__}"} | |
| uploaded = [] | |
| for c in chunks or []: | |
| if (getattr(c, "doc_type", "") or "").lower() != "user_upload": | |
| continue | |
| # Triple-check session ownership before exposing the chunk β | |
| # belt + suspenders on top of the where={"session_id"} filter. | |
| if (getattr(c, "session_id", None) or session_id) != session_id and \ | |
| getattr(c, "session_id", None) not in (None, ""): | |
| continue | |
| uploaded.append({ | |
| "chunk_id": getattr(c, "chunk_id", ""), | |
| "policy_id": getattr(c, "policy_id", ""), | |
| "policy_name": clean_display_policy_name( | |
| getattr(c, "policy_name", "") | |
| ), | |
| "insurer_slug": getattr(c, "insurer_slug", "") or "user-upload", | |
| "doc_type": "user_upload", | |
| "source_url": getattr(c, "source_url", ""), | |
| "chunk_text": (getattr(c, "text", "") or "")[:1200], | |
| "score": float(getattr(c, "score", 0.0) or 0.0), | |
| }) | |
| if not uploaded: | |
| return {"chunks": [], "count": 0} | |
| return { | |
| "chunks": uploaded, | |
| "count": len(uploaded), | |
| "query": query, | |
| "source": "uploaded_doc_quarantine", | |
| "note": ( | |
| "These chunks come from the user's OWN uploaded policy PDF " | |
| "(session-scoped quarantine). Answer the user's question " | |
| "about THIS document directly. Do NOT treat this as a " | |
| "general recommendation and do NOT block on profile " | |
| "completeness β the user explicitly asked about their " | |
| "uploaded file." | |
| ), | |
| } | |
| # ---- retrieve_policies ----------------------------------------------------- | |
| _NP_STOP = { | |
| "health", "insurance", "insurances", "policy", "policies", "plan", | |
| "plans", "the", "and", "of", "for", "cover", "covers", "coverage", | |
| "india", "general", "life", "assurance", "co", "ltd", "limited", | |
| "company", "scheme", "what", "is", "are", "tell", "me", "about", | |
| "detail", "details", "benefit", "benefits", | |
| } | |
| def _resolve_named_policy(query: str) -> Optional[str]: | |
| """#61 β if the question UNAMBIGUOUSLY names a specific catalogue | |
| policy, return its policy_id so a factual Q&A can retrieve it without | |
| being blocked by the recommendation profile-gate. Conservative: >=2 | |
| significant name tokens present, >=60% of the policy_name's significant | |
| tokens in the query, and the top match strictly ahead of the runner-up | |
| (no ambiguous grab). Returns None otherwise. Best-effort; never raises.""" | |
| import re as _re | |
| try: | |
| q = " " + _re.sub(r"[^a-z0-9 ]", " ", (query or "").lower()) + " " | |
| if len(q) < 8: | |
| return None | |
| from backend.main import _marketplace_catalogue # lazy: avoids cycle | |
| scored: list[tuple[int, float, str, str]] = [] | |
| for c in _marketplace_catalogue(None): | |
| name = (getattr(c, "policy_name", "") or "").lower() | |
| toks = { | |
| t for t in _re.sub(r"[^a-z0-9 ]", " ", name).split() | |
| if len(t) > 2 and t not in _NP_STOP | |
| } | |
| if len(toks) < 2: | |
| continue | |
| hit = sum(1 for t in toks if f" {t} " in q) | |
| cov = hit / len(toks) | |
| if hit >= 2 and cov >= 0.6: | |
| pid = getattr(c, "policy_id", None) | |
| if pid: | |
| scored.append((hit, cov, pid, name)) | |
| if not scored: | |
| return None | |
| # VERSION DISAMBIGUATION (#61) β when the user explicitly states a | |
| # version ("...Health Companion V2022" / "V22"), the matching card | |
| # that carries that exact version token MUST win, even if a shorter | |
| # base-name card scores more generic tokens. Without this the base | |
| # "Niva Bupa Health Companion" (0 corpus chunks) was chosen over | |
| # "Health Companion V2022" (the card the user named, which HAS | |
| # chunks) β retrieval returned nothing β "I couldn't find it". | |
| _ver = set(_re.findall(r"\bv?\d{2,4}\b", q)) | |
| if _ver: | |
| _vmatch = [ | |
| s for s in scored | |
| if any(v in _re.sub(r"[^a-z0-9 ]", " ", | |
| s[3]).split() for v in _ver) | |
| ] | |
| if len(_vmatch) == 1: | |
| return _vmatch[0][2] | |
| if _vmatch: | |
| _vmatch.sort(key=lambda s: (s[0], s[1]), reverse=True) | |
| if len(_vmatch) == 1 or _vmatch[0][0] > _vmatch[1][0]: | |
| return _vmatch[0][2] | |
| scored.sort(key=lambda s: (s[0], s[1]), reverse=True) | |
| if len(scored) == 1 or scored[0][0] > scored[1][0]: | |
| return scored[0][2] | |
| except Exception: # noqa: BLE001 β name resolution is best-effort | |
| pass | |
| return None | |
| async def retrieve_policies( | |
| query: str, | |
| top_k: int = 8, | |
| policy_filter_ids: Optional[list[str]] = None, | |
| profile=None, | |
| intent: str = "recommendation", | |
| session=None, | |
| session_id: Optional[str] = None, | |
| ) -> dict: | |
| """Call the existing Chroma retriever and return policy chunks. | |
| Returns: | |
| {"chunks": [{policy_id, policy_name, insurer_slug, chunk_text, | |
| doc_type, source_url, score, ...}, ...], | |
| "count": N, | |
| "query": query, | |
| "guard": optional {reason, fallback} if filter pipeline says abort} | |
| On failure: {"chunks": [], "count": 0, "error": "..."}. | |
| QUARANTINE-RETRIEVAL FIX (2026-05-16) β `session_id` is threaded all the | |
| way down to `rag.retrieve.retrieve(...)` so a PDF the caller uploaded via | |
| POST /api/upload-policy (indexed into the SEPARATE | |
| `user_uploads_quarantine` Chroma collection, tagged with this session's | |
| id) becomes retrievable BY THE CHAT BRAIN FOR THAT SESSION ONLY. Before | |
| this fix the upload was embedded but the brain never forwarded | |
| session_id, so the quarantine boost pass in retrieve.py never fired and | |
| an uploaded policy could never surface in the conversation. Resolution | |
| order: explicit `session_id` arg > `session.session_id` attribute > | |
| None (no quarantine lookup). User A's session_id never leaks into user | |
| B's retrieval because the quarantine `where={"session_id": ...}` filter | |
| is strictly equality-scoped (see rag/retrieve.py). | |
| """ | |
| # Resolve the effective session id. The single_brain dispatcher passes | |
| # the live SessionState as `session`; older callers may pass session_id | |
| # explicitly. Never raise β a missing id just means "no quarantine". | |
| eff_session_id = session_id | |
| if eff_session_id is None and session is not None: | |
| eff_session_id = getattr(session, "session_id", None) | |
| if isinstance(eff_session_id, str): | |
| eff_session_id = eff_session_id.strip() or None | |
| elif eff_session_id is not None: | |
| eff_session_id = None | |
| if not isinstance(query, str) or not query.strip(): | |
| return {"chunks": [], "count": 0, "error": "empty_query"} | |
| # NAMED-POLICY Q&A BYPASS (#61, 2026-05-18) β a direct factual question | |
| # about a SPECIFIC catalogue policy ("what is the PED waiting period for | |
| # HDFC ERGO Optima Restore?") must be answerable on a cold / | |
| # incomplete-profile session. The profile-complete gate below blocks | |
| # ALL policy_filter_ids=None retrieval until the 7-slot fact-find is | |
| # done β so the bot replied "I couldn't find that policy" for policies | |
| # that ARE indexed (#26/#28). The gate exists to not RECOMMEND before | |
| # fact-find, not to refuse a factual lookup. When the query | |
| # unambiguously names a known policy, resolve it and reuse the existing | |
| # TRUSTED known-policy path (legitimately gate-bypassing + getting the | |
| # #61 canonical-family $in expansion in rag/retrieve.py). | |
| # | |
| # NOT gated on `intent`: single_brain._execute_tool HARDCODES | |
| # intent="recommendation" for EVERY tool call (single_brain.py:1529), | |
| # so an intent-conditioned bypass is dead code on the live path β the | |
| # proven (2026-05-18) #61 root cause: every named-policy Q&A arrived | |
| # with intent="recommendation" β the bypass never fired β the gate | |
| # blocked β "I couldn't find that policy". Safety is the RESOLVER's | |
| # conservatism, not the intent: a broad "recommend me a plan" / generic | |
| # profile request names NO specific policy β _resolve_named_policy | |
| # returns None β still fully gated (verified). Only an explicit, | |
| # unambiguous name (>=2 significant tokens, >=60% coverage, | |
| # version-aware, top strictly ahead of runner-up) triggers it. | |
| if not policy_filter_ids: | |
| _np = _resolve_named_policy(query) | |
| if _np: | |
| policy_filter_ids = [_np] | |
| # UPLOADED-DOC BYPASS (2026-05-18) β the user is explicitly promised in | |
| # the UI ("β Indexed β¦ It's now searchable in this chat. Ask me about | |
| # it.") that an uploaded policy PDF is immediately queryable. But the | |
| # profile-complete gate below blocks ALL retrieval until the 7-slot | |
| # fact-find is done (unless policy_filter_ids is set, which the LLM | |
| # can't know for a freshly-uploaded doc). An uploaded doc is, by | |
| # definition, a known-doc-FOR-THIS-SESSION β semantically identical to | |
| # the policy_filter_ids follow-up branch that already legitimately | |
| # bypasses the gate. So: when this session has chunks in the SEPARATE | |
| # `user_uploads_quarantine` collection, run a quarantine-ONLY retrieval | |
| # that bypasses the recommendation profile-gate. Strictly session- | |
| # scoped (where={"session_id": eff_session_id}) so user A's upload | |
| # never leaks into user B. The full recommendation flow (general | |
| # corpus + eligibility + scorecard) still requires the complete | |
| # profile β this bypass only surfaces the user's OWN uploaded doc. | |
| if ( | |
| eff_session_id | |
| and not policy_filter_ids | |
| and _session_has_quarantine_docs(eff_session_id) | |
| ): | |
| up = await _retrieve_uploaded_only(query, eff_session_id, top_k) | |
| # Only short-circuit if the gate would otherwise block (incomplete | |
| # profile) AND we actually found uploaded chunks. If the profile is | |
| # already complete we fall through so the upload is folded into the | |
| # normal ranked recommendation pool by the quarantine boost pass. | |
| _profile_incomplete = profile is not None and any( | |
| getattr(profile, slot, None) in (None, "", []) | |
| for slot in _REQUIRED_FOR_READY | |
| ) | |
| if up.get("count", 0) > 0 and _profile_incomplete: | |
| return up | |
| # Profile-complete gate (skip if caller supplied policy_filter_ids β that | |
| # branch is a known-policy follow-up). Defense-in-depth: even if the LLM | |
| # ignores RULE 2 of the system prompt, this refuses to return chunks | |
| # against an incomplete profile. | |
| if profile is not None and not policy_filter_ids: | |
| missing = [ | |
| slot for slot in _REQUIRED_FOR_READY | |
| if getattr(profile, slot, None) in (None, "", []) | |
| ] | |
| if missing: | |
| # KI-Z6-NONE follow-up (2026-05-15): make the response | |
| # extremely directive so Gemini doesn't burn an iteration | |
| # re-trying retrieve_policies on the same incomplete profile. | |
| # Provide an exact_question string the model can literally | |
| # relay to the user for the first missing slot. | |
| _SLOT_QUESTIONS = { | |
| "name": "What's your name?", | |
| "age": "How old are you?", | |
| "dependents": ( | |
| "Who would you like the cover to include β just you, " | |
| "or spouse / kids / parents?" | |
| ), | |
| "location_tier": "Which city do you live in?", | |
| "income_band": ( | |
| "Roughly what's your annual household income β " | |
| "under 10 lakh, 10-25 lakh, or above 25 lakh?" | |
| ), | |
| "primary_goal": ( | |
| "Is this your first health policy, an upgrade, for " | |
| "tax planning, or to find a cheaper option?" | |
| ), | |
| "health_conditions": ( | |
| "Do you or your family have any pre-existing health " | |
| "conditions like diabetes, BP, or thyroid? If none, " | |
| "just say no." | |
| ), | |
| } | |
| first = missing[0] | |
| return { | |
| "chunks": [], | |
| "count": 0, | |
| "error": "profile_incomplete", | |
| "missing_slots": missing, | |
| "action_required": "ask_user_for", | |
| "field": first, | |
| "exact_question": _SLOT_QUESTIONS.get( | |
| first, | |
| f"Could you share your {first.replace('_', ' ')}?", | |
| ), | |
| "instruction": ( | |
| f"Profile is incomplete β missing: {', '.join(missing)}. " | |
| "Do NOT call retrieve_policies again this turn. Do NOT " | |
| "retry save_profile_field for the same field. Emit a " | |
| "TEXT reply that asks the user the `exact_question` " | |
| "above verbatim." | |
| ), | |
| } | |
| # Bug #108 + #110 (2026-05-16) β POST-RECAP pricing & family-history | |
| # bundle re-ask gate. The hard 7-slot gate above is satisfied; before we | |
| # return chunks for a RECOMMENDATION, ensure every bundle item (SI / | |
| # budget / existing cover / co-pay / FAMILY MEDICAL HISTORY / smoker / | |
| # parents-age-if-applicable) has been RESOLVED β captured OR explicitly | |
| # skipped. If the user answered some but not all, re-ask ONLY the | |
| # unresolved ones β but exactly ONCE per session (one-shot guard) so we | |
| # never hard-loop and SOFT-capture semantics survive (the user can still | |
| # skip on the re-ask). Follow-ups (policy_filter_ids) bypass this β it's | |
| # purely the first recommendation path. | |
| if ( | |
| profile is not None | |
| and not policy_filter_ids | |
| and (intent or "").lower() == "recommendation" | |
| and session is not None | |
| and not bool(getattr(session, "pricing_bundle_reasked", False)) | |
| ): | |
| unresolved = _unresolved_pricing_bundle(profile, session) | |
| if unresolved: | |
| # One-shot: mark so the NEXT recommendation retrieve proceeds | |
| # even if the user skips on the re-ask (SOFT capture, not a hard | |
| # gate β Bug #108 fix must re-ask, not loop forever). | |
| try: | |
| session.pricing_bundle_reasked = True | |
| except Exception: # noqa: BLE001 β bookkeeping must not break | |
| pass | |
| _qs = [ | |
| _PRICING_BUNDLE_QUESTIONS.get( | |
| s, f"Could you share your {s.replace('_', ' ')}?" | |
| ) | |
| for s in unresolved | |
| ] | |
| _numbered = "\n".join( | |
| f"{i}. {q}" for i, q in enumerate(_qs, start=1) | |
| ) | |
| return { | |
| "chunks": [], | |
| "count": 0, | |
| "error": "pricing_inputs_incomplete", | |
| "missing_slots": unresolved, | |
| "action_required": "ask_user_for", | |
| "field": unresolved[0], | |
| "exact_question": ( | |
| "Before I pull your recommendations, just a couple more " | |
| "(you can skip any):\n" + _numbered | |
| ), | |
| "instruction": ( | |
| "The user answered some but not all of the pricing / " | |
| "family-history questions. Do NOT call retrieve_policies " | |
| "again this turn and do NOT recommend yet. Emit a TEXT " | |
| "reply that asks ONLY the still-missing items in " | |
| "`exact_question` verbatim. If the user then provides " | |
| "them, save each via save_profile_field; if they skip, " | |
| "proceed to retrieve_policies on the next turn." | |
| ), | |
| } | |
| try: | |
| from rag.retrieve import retrieve as _retrieve | |
| # Decouple the Chroma recall pool from the caller's top_k: pull a | |
| # wide pool so eligibility + scorecard-aware ranking choose from a | |
| # broad set, then return the top-k best-fit survivors (truncated | |
| # after filter_pipeline below). For an explicit known-policy | |
| # follow-up (policy_filter_ids) keep the narrow top_k. | |
| _recall_pool = ( | |
| (int(top_k) if top_k else 8) | |
| if policy_filter_ids | |
| else _RECALL_POOL | |
| ) | |
| chunks = await _retrieve( | |
| query=query, | |
| top_k=_recall_pool, | |
| policy_ids=policy_filter_ids or None, | |
| session_id=eff_session_id, | |
| ) | |
| except Exception as e: # noqa: BLE001 β return graceful empty | |
| _log.warning( | |
| "retrieve_policies failed (q=%r): %s: %s", | |
| query[:120], type(e).__name__, str(e)[:200], | |
| ) | |
| return { | |
| "chunks": [], | |
| "count": 0, | |
| "error": f"{type(e).__name__}:{str(e)[:200]}", | |
| } | |
| raw: list[dict] = [] | |
| for c in chunks or []: | |
| pid = getattr(c, "policy_id", "") | |
| doc_type = getattr(c, "doc_type", "") | |
| chunk_dict = { | |
| "chunk_id": getattr(c, "chunk_id", ""), | |
| "policy_id": pid, | |
| "policy_name": clean_display_policy_name( | |
| getattr(c, "policy_name", "") | |
| ), | |
| "insurer_slug": getattr(c, "insurer_slug", ""), | |
| "doc_type": doc_type, | |
| "source_url": getattr(c, "source_url", ""), | |
| "chunk_text": (getattr(c, "text", "") or "")[:1200], | |
| "score": float(getattr(c, "score", 0.0) or 0.0), | |
| "min_entry_age": getattr(c, "min_entry_age", None), | |
| "max_entry_age": getattr(c, "max_entry_age", None), | |
| } | |
| # KI-278 β enrich policy chunks with the structural facts the | |
| # eligibility/profile-fit filter needs (top-up signal, SI options, | |
| # mandatory co-pay, scorecard grade). Skip non-policy chunks | |
| # (profile/regulatory/review) β they're never recommendable policies | |
| # and the eligibility rules don't apply to them. | |
| if pid and (doc_type or "").lower() not in ( | |
| "profile", "regulatory", "review" | |
| ): | |
| try: | |
| chunk_dict.update(_load_policy_facts(pid)) | |
| chunk_dict.update(_scorecard_signal(pid, profile=profile)) | |
| except Exception as e: # noqa: BLE001 β never break retrieval | |
| _log.warning("KI-278 enrich(%s) failed: %s", pid, e) | |
| raw.append(chunk_dict) | |
| # X5 sidecar: apply profile-fit + citation-grounding + dedup. Skip when | |
| # caller supplied an explicit policy_filter_ids (we already know which | |
| # policies they want). | |
| guard_signal = None | |
| filtered = raw | |
| if not policy_filter_ids: | |
| # Union the cosine pool with the catalogue's top profile-graded | |
| # policies so a strong policy that isn't cosine-similar to a | |
| # generic needs-query still enters contention. filter_pipeline | |
| # then applies eligibility + profile-fit ranking on the union. A | |
| # cosine chunk wins on a dup (it carries real retrieved text); | |
| # seeds only add missing candidates. | |
| if profile is not None: | |
| _seen_pids = { | |
| (c.get("policy_id") or "").strip() for c in raw | |
| } | |
| for _seed in _quality_seed_candidates(profile, limit=25): | |
| _sp = (_seed.get("policy_id") or "").strip() | |
| if _sp and _sp not in _seen_pids: | |
| _seen_pids.add(_sp) | |
| raw.append(_seed) | |
| try: | |
| from backend.retrieval_filters import filter_pipeline | |
| filtered, guard_signal = filter_pipeline( | |
| raw, profile=profile, query=query, intent=intent, | |
| ) | |
| except Exception as e: # noqa: BLE001 β pipeline must never break retrieval | |
| _log.warning("retrieval_filters.filter_pipeline failed: %s", e) | |
| filtered = raw | |
| # filter_pipeline has eligibility-filtered + ranked the wide pool | |
| # best-fit-first (scorecard-aware, deduped to ~1 chunk/policy). | |
| # Return only the top-N best-fit survivors β enough for the LLM to | |
| # pick 2-4 recommendations and for the citation builder to select | |
| # from, without the wide tail as context. This is the only | |
| # truncation, done after quality ranking, so the LLM and the cards | |
| # see the same best-fit set. | |
| filtered = filtered[: max((int(top_k) if top_k else 8), 12)] | |
| # For a recommendation turn, apply the same fitness floor the | |
| # citation builder uses (_recommendation_fit) here, so the LLM only | |
| # sees cardable policies β prose and cards stay 1:1 (it cannot name | |
| # a policy it won't card). If nothing clears the floor the set is | |
| # empty and the LLM says there is no strong match rather than | |
| # padding with a sub-floor policy. qa / follow-up intents are | |
| # untouched (they cite supporting source chunks regardless of | |
| # recommendation grade). | |
| if (intent or "").lower() == "recommendation" and filtered: | |
| try: | |
| from backend.single_brain import _recommendation_fit | |
| filtered = [c for c in filtered if _recommendation_fit(c)[0]] | |
| except Exception as e: # noqa: BLE001 β never break retrieval | |
| _log.warning("rec-fit 1:1 gate failed: %s", e) | |
| # X7 β cache slugβinsurer lookups on session so a subsequent | |
| # mark_recommendation call (same turn) can stamp the right insurer on the | |
| # shown_policies event. Each retrieve_policies call MERGES into the cache | |
| # rather than overwriting, so an LLM that hits multiple retrieves before | |
| # marking still resolves every cited slug. Keep last_retrieved_chunks too | |
| # for parity with future tools that need the full chunk objects. | |
| if session is not None: | |
| try: | |
| slug_to_insurer = dict(getattr(session, "slug_to_insurer", {}) or {}) | |
| for c in filtered: | |
| slug = (c.get("policy_id") or "").strip() | |
| insurer = (c.get("insurer_slug") or "").strip() | |
| if slug and insurer: | |
| slug_to_insurer[slug] = insurer | |
| session.slug_to_insurer = slug_to_insurer | |
| session.last_retrieved_chunks = list(filtered) | |
| except Exception: # noqa: BLE001 β bookkeeping must never fail retrieval | |
| pass | |
| out = { | |
| "chunks": filtered, | |
| "count": len(filtered), | |
| "query": query, | |
| } | |
| if guard_signal is not None: | |
| out["guard"] = guard_signal | |
| return out | |
| # ---- mark_recommendation --------------------------------------------------- | |
| def get_policy_facts(session, policy_ids: Optional[list[str]] = None) -> dict: | |
| """Return AUTHORITATIVE claim / reputation / scorecard / coverage facts | |
| for one or more policy_ids β the SAME data the frontend detail-modal | |
| shows (insurer reviews `claim_metrics` + scorecard grade + the curated | |
| coverage facts). This is the tool the brain calls for ANY follow-up | |
| about claim-settlement ratio, claim denials/rejections, complaints, | |
| incurred-claim ratio, insurer reputation, or to COMPARE two policies | |
| the user already saw. | |
| Root cause (2026-05-18): the brain previously had NO way to reach | |
| claim/denial/complaint/review data β retrieve_policies returns policy | |
| WORDING chunks only β so it falsely answered "I don't have enough | |
| information" for claims questions and could not back a verbal | |
| comparison. This tool closes that gap; it reuses the existing | |
| `_insurer_reviews` / `_scorecard_signal` / `_load_policy_facts` / | |
| `_curated_facts_all` loaders so a verbal answer matches the modal. | |
| `policy_ids` empty/None β falls back to the active shortlist | |
| (`session.last_recommendation_ids`) so "compare the ones you showed" | |
| works without the model re-deriving ids. | |
| """ | |
| ids = [str(p).strip() for p in (policy_ids or []) if str(p).strip()] | |
| if not ids: | |
| ids = [ | |
| str(p).strip() | |
| for p in (getattr(session, "last_recommendation_ids", []) or []) | |
| if str(p).strip() | |
| ] | |
| if not ids: | |
| return { | |
| "ok": False, | |
| "error": ( | |
| "no_policy_ids β pass policy_ids, or recommend policies " | |
| "first so there is an active shortlist" | |
| ), | |
| } | |
| # Resolve policy_id β (policy_name, insurer_slug) from the caches | |
| # retrieve_policies stashed this session, then the curated catalogue. | |
| slug_map = dict(getattr(session, "slug_to_insurer", {}) or {}) | |
| name_by_id: dict[str, str] = {} | |
| for c in getattr(session, "last_retrieved_chunks", []) or []: | |
| pid = (c.get("policy_id") or "").strip() | |
| if not pid: | |
| continue | |
| name_by_id.setdefault(pid, c.get("policy_name") or pid) | |
| if pid not in slug_map and c.get("insurer_slug"): | |
| slug_map[pid] = c.get("insurer_slug") | |
| try: | |
| curated = _curated_facts_all() | |
| except Exception: # noqa: BLE001 β curated layer optional | |
| curated = {} | |
| out: list[dict] = [] | |
| for pid in ids: | |
| cur = curated.get(pid) or {} | |
| slug = (slug_map.get(pid) or cur.get("insurer_slug") or "").strip() | |
| pname = clean_display_policy_name( | |
| name_by_id.get(pid) or cur.get("policy_name") or pid | |
| ) | |
| rv = _insurer_reviews(slug) or {} | |
| cm = rv.get("claim_metrics") or {} | |
| agg = rv.get("aggregate_score") or {} | |
| sig = _scorecard_signal(pid) or {} | |
| try: | |
| facts = _load_policy_facts(pid) or {} | |
| except Exception: # noqa: BLE001 β facts optional | |
| facts = {} | |
| out.append( | |
| { | |
| "policy_id": pid, | |
| "policy_name": pname, | |
| "insurer_slug": slug, | |
| "insurer_name": rv.get("insurer_name") | |
| or (slug.replace("-", " ").title() if slug else ""), | |
| "scorecard_grade": sig.get("_grade"), | |
| "scorecard_overall_0_100": sig.get("_overall_score"), | |
| "claim_settlement_ratio_pct": cm.get( | |
| "claim_settlement_ratio_pct" | |
| ), | |
| "claim_settlement_ratio_year": cm.get( | |
| "claim_settlement_ratio_year" | |
| ), | |
| "three_year_avg_csr_pct": cm.get("three_year_avg_csr_pct"), | |
| "complaints_per_10k_policies": cm.get( | |
| "complaints_per_10k_policies" | |
| ), | |
| "complaints_year": cm.get("complaints_year"), | |
| "claims_rejected_fy24": cm.get("claims_rejected_fy24"), | |
| "incurred_claim_ratio_pct": cm.get("incurred_claim_ratio_pct"), | |
| "reputation_headline": agg.get("headline"), | |
| "reputation_grade": agg.get("letter_grade"), | |
| "claim_data_source_url": ( | |
| cm.get("source_irdai_url") | |
| or cm.get("source_secondary_url") | |
| or cm.get("source_complaints_url") | |
| ), | |
| # Bug #44 β decision-critical fields (PED waiting, initial | |
| # waiting, copay, room-rent cap, CSR) are resolved from the | |
| # SAME canonical curated entry the scorecard / #31 path | |
| # uses, so a verbal answer / comparison table built from | |
| # this tool can never contradict the policy's scorecard | |
| # card. They OVERRIDE the divergent _load_policy_facts | |
| # 7-key resolver for these keys, agreeing by construction. | |
| "key_coverage_facts": { | |
| **{k: v for k, v in facts.items() if v not in (None, "", [])}, | |
| **canonical_decision_facts(pid), | |
| }, | |
| "reviews_available": bool(rv), | |
| } | |
| ) | |
| return { | |
| "ok": True, | |
| "count": len(out), | |
| "policies": out, | |
| "note": ( | |
| "This is the authoritative claim-settlement / complaint / " | |
| "denial / scorecard data (IRDAI + scorecard) β answer the " | |
| "user's question directly from it; do NOT say you lack this " | |
| "information. Cite as " | |
| "[Source: <insurer> claim data (IRDAI), <claim_data_source_url>]." | |
| ), | |
| } | |
| def mark_recommendation( | |
| session, | |
| policy_ids: list[str], | |
| is_final: bool = False, | |
| ) -> dict: | |
| """Persist the policies just recommended so follow-up turns can resolve | |
| references like "tell me about #2". | |
| Sets `session.last_recommendation_ids = policy_ids` (the same field the | |
| orchestrator already maintains for follow-up routing β KI-224 / KI-228). | |
| `is_final` is accepted for forward-compat (when the session grows a | |
| `closed` field); today it's logged but not persisted. | |
| Returns: {"recorded": True, "policy_ids": [...], "is_final": bool} | |
| """ | |
| if not isinstance(policy_ids, list): | |
| return {"recorded": False, "error": "policy_ids_not_list"} | |
| # Coerce + dedupe while preserving order. | |
| seen: set[str] = set() | |
| cleaned: list[str] = [] | |
| for pid in policy_ids: | |
| s = str(pid).strip() | |
| if s and s not in seen: | |
| seen.add(s) | |
| cleaned.append(s) | |
| # Z2 fix β Issue 2 (hallucinated closure). Vikram T6 saw the LLM emit | |
| # mark_recommendation with an empty policy_ids list, the tool silently | |
| # returned {"recorded": True, "policy_ids": []}, and the bot then said | |
| # "I'm glad we found a good fit" despite ZERO cards shown. Two | |
| # preconditions, gated BEFORE any session write so we don't poison | |
| # last_recommendation_ids / shown_policies events with junk: | |
| # (a) empty (after dedup) β no_policies_supplied | |
| # (b) non-empty BUT no retrieval history this session β caller | |
| # must run retrieve_policies first (Y2 cache check via | |
| # session.last_retrieved_chunks) | |
| if not cleaned: | |
| return {"recorded": False, "error": "no_policies_supplied"} | |
| _retrieval_cache = getattr(session, "last_retrieved_chunks", None) | |
| if not _retrieval_cache: | |
| return { | |
| "recorded": False, | |
| "error": "no_retrieval_history β call retrieve_policies first", | |
| } | |
| try: | |
| session.last_recommendation_ids = cleaned | |
| except Exception as e: # noqa: BLE001 | |
| return { | |
| "recorded": False, | |
| "error": f"setattr_failed:{type(e).__name__}:{e}", | |
| } | |
| if is_final and hasattr(session, "closed"): | |
| try: | |
| session.closed = True # type: ignore[attr-defined] | |
| except Exception: # noqa: BLE001 | |
| pass | |
| # X7 β write a shown_policies event per cited policy so the admin | |
| # Recommendation History panel needs single_brain turns logged the same | |
| # way the (now-removed) orchestrator did its `_log_shown_policies` | |
| # (KI-063): dedupe by slug, resolve insurer via the slugβinsurer | |
| # cache `retrieve_policies` stashed on the session this turn, and stamp | |
| # `turn_idx=session.turn_idx` so the frontend "Conversation turn" column | |
| # has a real value instead of "β". | |
| # | |
| # No profile name β no JSON file to write to (anonymous session). No | |
| # insurer resolution for a slug β skip that slug. All errors swallowed | |
| # so a logging failure never breaks the tool reply back to Gemini. | |
| # ADR-043 (2026-05-27) β record_policy_event used to write the shown | |
| # policy onto the named-profile JSON for cross-session "have I shown | |
| # this before" tracking. Cross-session persistence is gone; the | |
| # in-memory equivalent (avoid re-pitching within the same session) | |
| # is handled by session.last_recommendation_ids / shown_policies on | |
| # the live Profile dataclass. | |
| try: | |
| profile = getattr(session, "profile", None) | |
| if profile is not None and cleaned: | |
| shown = list(getattr(profile, "shown_policies", None) or []) | |
| existing_slugs = {(e or {}).get("policy_slug") for e in shown} | |
| slug_to_insurer = dict(getattr(session, "slug_to_insurer", {}) or {}) | |
| turn_idx = int(getattr(session, "turn_idx", 0) or 0) | |
| session_id = getattr(session, "session_id", None) | |
| now_iso = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) | |
| for slug in cleaned: | |
| if not slug or slug in existing_slugs: | |
| continue | |
| insurer = slug_to_insurer.get(slug) | |
| if not insurer: | |
| continue | |
| shown.append({ | |
| "policy_slug": slug, | |
| "insurer": insurer, | |
| "event_at": now_iso, | |
| "session_id": session_id, | |
| "reason": "shown_in_recommendation", | |
| "turn_idx": turn_idx, | |
| }) | |
| existing_slugs.add(slug) | |
| profile.shown_policies = shown | |
| except Exception as e: # noqa: BLE001 β never break the tool reply | |
| _log.warning( | |
| "mark_recommendation shown-event logging (in-memory) failed: " | |
| "%s: %s", | |
| type(e).__name__, str(e)[:200], | |
| ) | |
| return { | |
| "recorded": True, | |
| "policy_ids": cleaned, | |
| "is_final": bool(is_final), | |
| } | |
| # ---- private normalizers --------------------------------------------------- | |
| def _coerce_age(value: Any) -> Optional[int]: | |
| """int(value), clamped to [0, 110]. Empty / non-numeric β None.""" | |
| if value is None: | |
| return None | |
| try: | |
| if isinstance(value, bool): # bool is an int subclass β block first | |
| return None | |
| n = int(value) | |
| except (TypeError, ValueError): | |
| # Try string parse β Gemini sometimes emits "29" as a JSON string | |
| try: | |
| n = int(str(value).strip()) | |
| except (TypeError, ValueError): | |
| return None | |
| if n < 0: | |
| n = 0 | |
| if n > 110: | |
| n = 110 | |
| return n | |
| # Ported from sales_brain_normalizer.py (B6 cleanup, 2026-05-15) β the legacy | |
| # normalizer module is being deleted; this is the only consumer left. | |
| _DEPENDENT_VAGUE_TERMS = ( | |
| "family", "everyone", "all of us", "everybody", | |
| "whole family", "joint family", | |
| ) | |
| def _normalize_dependents_inline(value: Any) -> Optional[str]: | |
| """Port of sales_brain_normalizer._normalize_dependents. | |
| Schema-free variant β the original required a `schema["values"]` lookup | |
| for direct enum hits; here we hard-code the canonical bucket set used | |
| by the bot (kept in sync with the previous fact-find schema). | |
| """ | |
| if not isinstance(value, str): | |
| return None | |
| s = value.strip().lower() | |
| if not s: | |
| return None | |
| # Canonical enum values (formerly schema["values"]). | |
| _CANONICAL_VALUES = ( | |
| "self", | |
| "self+spouse", | |
| "self+spouse+kids", | |
| "self+spouse+parents", | |
| "self+spouse+kids+parents", | |
| "self+kids", | |
| "self+parents", | |
| ) | |
| if s in _CANONICAL_VALUES: | |
| return s | |
| has_spouse = any(k in s for k in ("spouse", "wife", "husband", "partner")) | |
| has_kids = ("kid" in s) or ("child" in s) or ("children" in s) or ("son" in s) or ("daughter" in s) | |
| has_parents = "parent" in s | |
| # Vague terms with no specific signals β cannot coerce | |
| if any(v in s for v in _DEPENDENT_VAGUE_TERMS) and not (has_spouse or has_kids or has_parents): | |
| return None | |
| if has_spouse and has_kids and has_parents: | |
| return "self+spouse+kids+parents" | |
| if has_spouse and has_parents: | |
| return "self+spouse+parents" | |
| if has_spouse and has_kids: | |
| return "self+spouse+kids" | |
| if has_spouse: | |
| return "self+spouse" | |
| if has_kids and has_parents: | |
| # No canonical bucket β fold parents into the wider bundle | |
| return "self+spouse+kids+parents" | |
| if has_kids: | |
| return "self+kids" | |
| if has_parents: | |
| return "self+parents" | |
| # KI-222 β expand the "self" alias set. Live captures showed users | |
| # answering "single", "unmarried", "no dependents", "just myself" etc., | |
| # which previously fell through to None and got silently dropped β the | |
| # bot then re-asked the same slot on the next turn. | |
| _SELF_ALIASES = ( | |
| "self", "me", "just me", "only me", "myself", "only self", | |
| "single", "unmarried", "alone", "bachelor", "no dependents", | |
| "just myself", "nobody else", "no one else", "nobody", | |
| "myself only", "by myself", "solo", | |
| ) | |
| if s in _SELF_ALIASES: | |
| return "self" | |
| # Substring fall-through for the same intents when wrapped in extra prose | |
| # (e.g. "i'm single right now", "just myself for now"). | |
| if any(alias in s for alias in ( | |
| "single", "unmarried", "no dependents", "just myself", | |
| "by myself", "nobody else", "no one else", "myself only", | |
| "bachelor", "solo", | |
| )): | |
| return "self" | |
| return None | |
| def _coerce_dependents(value: Any) -> Optional[str]: | |
| """Normalize raw dependents text to a canonical bucket; see | |
| `_normalize_dependents_inline` (ported from sales_brain_normalizer).""" | |
| if value is None: | |
| return None | |
| try: | |
| return _normalize_dependents_inline(value) | |
| except Exception: # noqa: BLE001 β best-effort | |
| s = str(value).strip() | |
| return s or None | |
| def _coerce_existing_cover(value: Any) -> Optional[int]: | |
| """Parse INR amounts like "5L" / "5 lakh" / 500000 via the canonical | |
| parser. Numeric pass-throughs are clamped to >= 0. | |
| """ | |
| if value is None: | |
| return None | |
| if isinstance(value, bool): | |
| return None | |
| if isinstance(value, (int, float)): | |
| n = int(value) | |
| return max(0, n) | |
| try: | |
| from backend.needs_finder import _parse_inr_amount | |
| parsed = _parse_inr_amount(str(value)) | |
| if parsed is not None: | |
| return max(0, int(parsed)) | |
| except Exception: # noqa: BLE001 | |
| pass | |
| # Last-ditch: strip non-digits | |
| try: | |
| digits = "".join(ch for ch in str(value) if ch.isdigit()) | |
| if digits: | |
| return max(0, int(digits)) | |
| except Exception: # noqa: BLE001 | |
| pass | |
| return None | |
| # Canonical budget bands β the EXACT string contract the frontend | |
| # ProfileBuilderPanel round-trips via budgetBandToInr / budgetInrToBand | |
| # (frontend/src/app/page.tsx) and that needs_finder._parse_budget_band | |
| # emits. Keeping this set inline makes the contract greppable from the | |
| # capture surface. | |
| _CANONICAL_BUDGET_BANDS: frozenset[str] = frozenset( | |
| {"under_15k", "15k_30k", "30k_60k", "60k+"} | |
| ) | |
| def _coerce_budget_band(value: Any) -> Optional[str]: | |
| """Normalise a budget capture to the documented `budget_band` contract. | |
| Bug #109 (2026-05-16). The user states a premium budget in chat | |
| ("max βΉ15,000/yr", "around 30k", "15000"). Gemini calls | |
| save_profile_field(field="budget_band", value=...) but often passes the | |
| NUMERIC the user said, not a canonical band. The old code let | |
| budget_band fall through the generic string pass-through, so | |
| profile.budget_band was stored as the raw "15000" β which the frontend | |
| ProfileBuilderPanel's budgetBandToInr() switch can't map, so the panel | |
| never pre-filled the budget slider even though the bot's summary showed | |
| it. | |
| Mapping (matches frontend budgetInrToBand + needs_finder bands): | |
| β’ already a canonical band ("15k_30k") β passed through unchanged | |
| β’ free-text / numeric ("max βΉ15,000/yr", "30k", 22000, "10-15K") | |
| β delegated to needs_finder._parse_budget_band β canonical band | |
| β’ unrecognised β None (KI-091 null-overwrite guard then refuses to | |
| clobber a previously-captured band) | |
| """ | |
| if value is None: | |
| return None | |
| # Numeric β bucket directly (βΉ amount per year). | |
| if isinstance(value, bool): | |
| return None | |
| if isinstance(value, (int, float)): | |
| v = int(value) | |
| if v < 15_000: | |
| return "under_15k" | |
| if v < 30_000: | |
| return "15k_30k" | |
| if v < 60_000: | |
| return "30k_60k" | |
| return "60k+" | |
| s = str(value).strip() | |
| if not s: | |
| return None | |
| # Already canonical β accept verbatim (case/space tolerant). | |
| norm = s.lower().replace(" ", "") | |
| if norm in _CANONICAL_BUDGET_BANDS: | |
| return norm | |
| # Strip ANNUAL-budget qualifiers FIRST. needs_finder._parse_inr_amount | |
| # treats a bare "yr" / "year" as an AGE context and refuses to read any | |
| # number as currency (the KI-161 age guard) β so the canonical live | |
| # phrasing "max βΉ15,000/yr" / "βΉ15000 per year" parsed to None and the | |
| # band was never captured (Bug #109). These suffixes are unambiguous | |
| # PER-ANNUM budget markers here (the field is explicitly the premium | |
| # budget), not an age, so we drop them before delegating. | |
| import re as _re | |
| cleaned = _re.sub( | |
| r"\b(?:per\s*(?:year|annum)|p\.?\s*a\.?|/\s*(?:yr|year|annum)|" | |
| r"a\s*year|annually|yearly|/\s*yr)\b", | |
| " ", | |
| s, | |
| flags=_re.IGNORECASE, | |
| ) | |
| # Free-text / amount β canonical band via the shared parser. This | |
| # handles "max βΉ15,000", "30k", "around 25000", "15-30k", "1 lakh". | |
| try: | |
| from backend.needs_finder import _parse_budget_band | |
| band = _parse_budget_band(cleaned) | |
| if band in _CANONICAL_BUDGET_BANDS: | |
| return band | |
| # Last-resort: if the qualifier strip left only the amount, try the | |
| # raw string too (covers phrasings the regex didn't anticipate). | |
| if cleaned != s: | |
| band = _parse_budget_band(s) | |
| if band in _CANONICAL_BUDGET_BANDS: | |
| return band | |
| except Exception: # noqa: BLE001 β parser optional; fall through to None | |
| pass | |
| return None | |
| def _coerce_bool(value: Any) -> Optional[bool]: | |
| """Tri-state bool coercion for parents_to_insure / parents_has_ped. | |
| Accepts: True / False / "yes" / "no" / "y" / "n" / "true" / "false" | |
| / 1 / 0. Anything else β None (so the KI-091 null-overwrite guard | |
| refuses to clobber a previously-captured value). | |
| """ | |
| if value is None: | |
| return None | |
| if isinstance(value, bool): | |
| return value | |
| if isinstance(value, (int, float)): | |
| return bool(value) | |
| s = str(value).strip().lower() | |
| if s in ("true", "yes", "y", "1"): | |
| return True | |
| if s in ("false", "no", "n", "0"): | |
| return False | |
| return None | |
| def _coerce_smoker(value: Any) -> Optional[bool]: | |
| """KI-275 (2026-05-15) β tri-state bool for smoker / tobacco use. | |
| Accepts: | |
| - True / "yes" / "true" / "smoker" / "smokes" / "tobacco" / 1 β True | |
| - False / "no" / "false" / "non-smoker" / "doesn't smoke" / 0 β False | |
| - None / "" / unclear β None | |
| Returning None lets the KI-091 null-overwrite guard in | |
| save_profile_field refuse to clobber a previously-captured value. | |
| """ | |
| if value is None: | |
| return None | |
| if isinstance(value, bool): | |
| return value | |
| if isinstance(value, (int, float)): | |
| return bool(value) | |
| s = str(value).strip().lower() | |
| if not s: | |
| return None | |
| _YES = { | |
| "yes", "y", "true", "1", | |
| "smoker", "smokes", "smoke", "i smoke", | |
| "tobacco", "tobacco user", "uses tobacco", | |
| "i do", "yep", "yeah", "yup", | |
| } | |
| _NO = { | |
| "no", "n", "false", "0", | |
| "non-smoker", "nonsmoker", "non smoker", | |
| "doesn't smoke", "does not smoke", "dont smoke", "don't smoke", | |
| "i don't", "i do not", "nope", "never", "no tobacco", | |
| } | |
| if s in _YES: | |
| return True | |
| if s in _NO: | |
| return False | |
| # Substring fall-through for prose ("I'm a non-smoker", "I smoke daily"). | |
| if any(tok in s for tok in ("non-smoker", "nonsmoker", "non smoker", "don't smoke", | |
| "doesn't smoke", "do not smoke", "no tobacco")): | |
| return False | |
| if any(tok in s for tok in ("smoker", "smokes", "tobacco")): | |
| return True | |
| return None | |
| def _coerce_desired_sum_insured(value: Any) -> Optional[int]: | |
| """Parse desired sum insured (cover amount) as integer rupees. | |
| Accepts: "10L" / "10 lakh" / "1 crore" / "1Cr" / 1000000 / | |
| "βΉ10,00,000" / "five lakh" (rejected β words not numerals). | |
| Delegates to `_parse_inr_amount` from needs_finder for the heavy lift, | |
| falls back to bare-digit extraction. Clamps to [50_000, 500_000_000] | |
| (βΉ50K floor, βΉ50Cr ceiling) β anything outside is implausible for a | |
| health-insurance sum insured and likely a parse error. | |
| """ | |
| if value is None: | |
| return None | |
| if isinstance(value, bool): | |
| return None | |
| if isinstance(value, (int, float)): | |
| n = int(value) | |
| return max(50_000, min(500_000_000, n)) | |
| try: | |
| from backend.needs_finder import _parse_inr_amount | |
| parsed = _parse_inr_amount(str(value)) | |
| if parsed is not None: | |
| return max(50_000, min(500_000_000, int(parsed))) | |
| except Exception: # noqa: BLE001 | |
| pass | |
| # Last-ditch: strip non-digits (handles "βΉ10,00,000" if parser missed). | |
| try: | |
| digits = "".join(ch for ch in str(value) if ch.isdigit()) | |
| if digits: | |
| n = int(digits) | |
| if n >= 50_000: | |
| return min(500_000_000, n) | |
| except Exception: # noqa: BLE001 | |
| pass | |
| return None | |
| def _coerce_health_conditions(value: Any) -> Optional[list[str]]: | |
| """Always return list[str] lowercase, stripped, empties dropped. | |
| KI-Z6-NONE (2026-05-15): "none" / "no" / "n/a" β used to be stripped to | |
| `[]`, but downstream `save_profile_field` then hits the KI-091 null- | |
| overwrite guard (`normalized in (None, "", [])`) and refuses to persist | |
| the slot. Result: profile.health_conditions stays empty forever, | |
| `_profile_complete` returns False, retrieve_policies returns | |
| profile_incomplete, the brain loops, MAX_ITERATIONS exhausts, the bot | |
| emits "Sorry β I lost my train of thought" (W1 Turn 3 live blocker). | |
| Fix: keep the explicit-negation sentinel `["none"]` so: | |
| β’ the slot is non-empty β _profile_complete=True β retrieve fires | |
| β’ downstream consumers can still detect "no PED" via the literal | |
| token `"none"` in the list (callers already lowercase-compare). | |
| """ | |
| if value is None: | |
| return None | |
| if isinstance(value, str): | |
| # Gemini sometimes emits comma-joined strings instead of a list. | |
| items = [t.strip() for t in value.split(",")] | |
| elif isinstance(value, (list, tuple)): | |
| items = [str(t).strip() for t in value] | |
| else: | |
| items = [str(value).strip()] | |
| cleaned = [t.lower() for t in items if t] | |
| # Explicit-negation tokens β collapse to the canonical sentinel | |
| # `["none"]` rather than `[]` so the slot is captured, not blanked. | |
| _NEGATION = {"none", "no", "n/a", "na", "nil", "nothing", "healthy"} | |
| if cleaned and all(t in _NEGATION for t in cleaned): | |
| return ["none"] | |
| # Mixed input ("diabetes, none") β drop the negation noise, keep real | |
| # conditions. | |
| real = [t for t in cleaned if t not in _NEGATION] | |
| return real | |
| # --------------------------------------------------------------------------- | |
| # D2 (2026-05-15) β copay_pct + family_medical_history coercers | |
| # --------------------------------------------------------------------------- | |
| # Word-number map for "twenty", "ten" etc. (RULE 2.5 asks the user in | |
| # multiples of 10; Gemini sometimes echoes the user's word verbatim). | |
| _COPAY_WORD_TO_INT: dict[str, int] = { | |
| "zero": 0, "none": 0, "no": 0, | |
| "ten": 10, "fifteen": 15, "twenty": 20, | |
| "twenty five": 25, "twenty-five": 25, | |
| "thirty": 30, "forty": 40, "fifty": 50, | |
| } | |
| def _coerce_copay_pct(value: Any) -> Optional[int]: | |
| """Parse a co-pay tolerance percent, clamped to [0, 50]. | |
| Accepts: | |
| - int / float β int + clamp | |
| - "20", "20%", " 20 % ", "20 percent" β 20 | |
| - "no copay" / "zero" / "none" β 0 | |
| - word numbers like "twenty" β 20 | |
| - bool β blocked (KI-091 null-overwrite caution: bool is an int subclass) | |
| Returns None for unrecognised input so the null-overwrite guard in | |
| save_profile_field can refuse to clobber a previously-captured slot. | |
| """ | |
| if value is None: | |
| return None | |
| if isinstance(value, bool): | |
| return None | |
| if isinstance(value, (int, float)): | |
| n = int(value) | |
| return max(0, min(50, n)) | |
| s = str(value).strip().lower() | |
| if not s: | |
| return None | |
| # Explicit zero phrasings. | |
| if s in ("no", "none", "nil", "zero", "no copay", "no co-pay", "no co pay"): | |
| return 0 | |
| # Word-number lookup (exact match). | |
| if s in _COPAY_WORD_TO_INT: | |
| return _COPAY_WORD_TO_INT[s] | |
| # Strip "%" + "percent" + "pct". | |
| cleaned = ( | |
| s.replace("%", " ") | |
| .replace("percent", " ") | |
| .replace("pct", " ") | |
| .replace("copay", " ") | |
| .replace("co-pay", " ") | |
| .replace("co pay", " ") | |
| ) | |
| # Digit run. | |
| import re as _re | |
| m = _re.search(r"\d+(?:\.\d+)?", cleaned) | |
| if m: | |
| try: | |
| n = int(float(m.group(0))) | |
| return max(0, min(50, n)) | |
| except ValueError: | |
| return None | |
| # Word-number fall-through (substring on cleaned text). | |
| for word, num in _COPAY_WORD_TO_INT.items(): | |
| if word in cleaned.split(): | |
| return num | |
| return None | |
| # Alias map for family medical history β same canonicalisation logic as | |
| # health_conditions but kept inline so this slot stays self-contained. | |
| _FAMILY_HISTORY_ALIASES: dict[str, str] = { | |
| "bp": "hypertension", | |
| "high bp": "hypertension", | |
| "high-bp": "hypertension", | |
| "hi-bp": "hypertension", | |
| "high blood pressure": "hypertension", | |
| "blood pressure": "hypertension", | |
| "sugar": "diabetes", | |
| "diabetic": "diabetes", | |
| "type 2 diabetes": "diabetes", | |
| "type 1 diabetes": "diabetes", | |
| "heart attack": "heart", | |
| "heart disease": "heart", | |
| "cardiac": "heart", | |
| "cardiac disease": "heart", | |
| "stroke": "heart", | |
| "tumor": "cancer", | |
| "tumour": "cancer", | |
| "carcinoma": "cancer", | |
| } | |
| _FAMILY_HISTORY_NEGATION = { | |
| "none", "no", "n/a", "na", "nil", "nothing", "healthy", | |
| "no family history", "no history", "no medical history", | |
| } | |
| def _coerce_family_medical_history(value: Any) -> Optional[list[str]]: | |
| """Return list[str] lowercase canonical conditions running in BLOOD family. | |
| Accepts: | |
| - list / tuple of strings | |
| - comma-joined string ("cancer, diabetes") | |
| - "none" / "no family history" β [] | |
| Alias map collapses BP/sugar/cardiac/tumor β hypertension/diabetes/heart/ | |
| cancer respectively (same family as _coerce_health_conditions). Negation | |
| sentinels return `[]` since downstream pricing & retrieval BOTH treat an | |
| empty list as the "no family history" branch (different from health_ | |
| conditions where the explicit `["none"]` sentinel is needed for the | |
| profile-completeness gate). | |
| """ | |
| if value is None: | |
| return None | |
| if isinstance(value, str): | |
| items = [t.strip() for t in value.split(",")] | |
| elif isinstance(value, (list, tuple)): | |
| items = [str(t).strip() for t in value] | |
| else: | |
| items = [str(value).strip()] | |
| cleaned = [t.lower() for t in items if t] | |
| # Full-string negation collapses to []. | |
| if cleaned and all(t in _FAMILY_HISTORY_NEGATION for t in cleaned): | |
| return [] | |
| # Drop negation noise from mixed input ("cancer, none"). | |
| cleaned = [t for t in cleaned if t not in _FAMILY_HISTORY_NEGATION] | |
| # Canonicalise via alias map. | |
| canonical: list[str] = [] | |
| seen: set[str] = set() | |
| for t in cleaned: | |
| c = _FAMILY_HISTORY_ALIASES.get(t, t) | |
| if c and c not in seen: | |
| seen.add(c) | |
| canonical.append(c) | |
| return canonical | |
| __all__ = [ | |
| "save_profile_field", | |
| "retrieve_policies", | |
| "mark_recommendation", | |
| "SLOT_UNION", | |
| "union_snapshot", | |
| ] | |