"""Tool functions for single_brain.py (Path B — single-LLM architecture). The Gemini Flash model exposed in `backend/single_brain.py` calls these three tools to (a) persist captured profile fields, (b) retrieve policy chunks from Chroma, and (c) mark which policies it has recommended on the current turn so follow-ups like "tell me more about #2" can resolve. Each function: * Takes plain JSON-serialisable inputs (str / int / list[str]). * Returns a plain JSON-serialisable dict that gets fed back to the LLM on the next iteration of the function-calling loop. * Never raises — failures are surfaced via {"ok": False, "error": "..."} so the LLM can decide whether to retry or recover. The Gemini function-calling DSL (JSON Schema-flavoured) for these three tools is generated by `single_brain.TOOL_SCHEMAS` from this module's metadata. ═══════════════════════════════════════════════════════════════════════════ SLOT_UNION — SINGLE SOURCE OF TRUTH FOR CAPTURED FIELDS (B6, 2026-05-15) ═══════════════════════════════════════════════════════════════════════════ SLOT_UNION enumerates every captured field that influences EITHER the recommendation pipeline (retrieval query + scorecard) OR the pricing pipeline (premium_calculator.estimate / bulk_estimate). It is the contract between brain_tools (capture surface), single_brain (LLM tool calls), premium_calculator (pricing inputs) and scorecard (match scoring). Slot → consumer matrix: RECOMMENDATION SLOTS (in _REQUIRED_FOR_READY — hard gate) name → profile identity (no pricing influence) age → retrieval query + scorecard + pricing (age band) dependents → retrieval query + scorecard + pricing (family loading) location_tier → retrieval query + scorecard + pricing (location loading) income_band → retrieval query + scorecard primary_goal → retrieval query + scorecard health_conditions → retrieval query + scorecard + pricing (health loading) PRICING-ONLY SLOTS (B5 + B6 additions — SOFT capture, post-recap) budget_band → pricing band match desired_sum_insured_inr → pricing SI override (per-policy estimate) existing_cover_inr → pricing (existing-cover discount loading) FAMILY-DETAIL SLOTS (used by pricing if dependents includes parents) parents_to_insure → triggers parents_loading branch parents_age_max → pricing (parents age loading 1.0× / 1.4× / 1.8×) parents_has_ped → pricing (PED loading inflation for parents) D2 ADDITIONS (2026-05-15 — copay + family medical history) copay_pct → pricing (copay discount 1.0× / 0.95× / 0.88× / 0.80×) family_medical_history → pricing (family-history loading) + retrieval boost KI-275 (2026-05-15 — smoker / tobacco) smoker → pricing (smoker_loading 1.0× / 1.40×, +30-50%) Total: 16 slots. `gender` is tolerated by save_profile_field for forward compat but is NOT on the Profile dataclass today; it does not appear in SLOT_UNION because no consumer reads it. """ from __future__ import annotations import json as _json import logging import time from pathlib import Path from typing import Any, Optional from backend.config import settings from backend.policy_identity import clean_display_policy_name _log = logging.getLogger(__name__) # ─────────────────────────────────────────────────────────────────────────── # KI-278 (2026-05-16) — policy-fact enrichment for the eligibility/ranking # filter. The Chroma chunk only carries citation metadata; the structural # facts the eligibility filter needs (top-up signal, sum-insured options, # mandatory co-pay) live in 40-data/policy_facts/*.json. # # IMPORTANT: this code only READS those JSON files and only reads STABLE # SCHEMA KEYS (policy_type_indemnity_or_fixed / deductible_amount / # co_payment_pct / sum_insured_options). The concurrent scorecard-repair # process rewrites the *values*; the *field names* are the contract and do # not change. We never write to policy_facts/*.json. # # A short in-process cache keeps this off the hot path (the catalog is ~250 # files; we only ever touch the ≤8 retrieved per turn, and memoise stems). # ─────────────────────────────────────────────────────────────────────────── _FACTS_DIR = settings.DATA_DIR / "policy_facts" _DOCTYPE_SUFFIXES = ("__wordings", "__brochure", "__cis", "__prospectus") # Width of the Chroma candidate pool pulled before eligibility + # scorecard-aware ranking — wide enough that a strong policy ranked only # mid-cosine for a generic query still enters contention. The LLM still # only sees the top-k best-fit survivors. _RECALL_POOL = 40 _FACT_KEYS = ( "policy_type_indemnity_or_fixed", # Raw catalog type key — some curated files carry the product type # only under `policy_type` (e.g. "hospital_cash") with no # `policy_type_indemnity_or_fixed`; the fixed-benefit gate needs it. "policy_type", "deductible_amount", "co_payment_pct", "sum_insured_options", # Eligibility-gate keys (read-only): # uin_code → canonical dedup identity (1 UIN = 1 product; # collapses doctype-sibling/rename duplicates). # max_entry_age → hard eligibility for the insured person; the # curated value is authoritative and overrides # a missing/None Chroma chunk field. # maternity_coverage → required-feature gate: an explicit maternity / # newborn_coverage newborn need ranks confirmed plans first. "uin_code", "max_entry_age", "maternity_coverage", "newborn_coverage", ) # Bug #44 (2026-05-19) — DECISION-CRITICAL coverage fields that MUST be # resolved from the SAME canonical curated entry the marketplace # scorecard / #31 profile-summary path uses, so a verbal answer (or a # comparison TABLE the LLM builds from get_policy_facts) can never # contradict the scorecard card for the same policy. # # Root cause: each product has a base 40-data/policy_facts/.json AND # doctype siblings (__wordings/__cis/__brochure/__prospectus). The # scorecard path resolves via main._load_curated_facts() (KI-219/KI-251 # canonical precedence). get_policy_facts previously surfaced facts via # _load_policy_facts() — a DIFFERENT _candidate_stems resolver whose # 7-key _FACT_KEYS doesn't even include PED — so the two paths could # read different files and disagree on PED waiting (a live audit found # the comparison table said 24mo while the #31 card said "0 months"). # # Fix: these specific fields are taken from the canonical curated entry # (`_curated_facts_all()[pid]`, the EXACT dict main._load_curated_facts() # feeds build_scorecard) so both surfaces agree BY CONSTRUCTION. This # mirrors the existing _scorecard_signal → marketplace_grade single- # source pattern (#40 / KI-219). The guard # tests/test_policy_facts_source_consistency.py asserts catalogue-wide # agreement. _DECISION_CRITICAL_FACT_KEYS = ( "pre_existing_disease_waiting_months", "initial_waiting_period_days", "copayment_pct", "room_rent_capping", "claim_settlement_ratio", ) def canonical_decision_facts(policy_id: str) -> dict: """Decision-critical coverage facts for `policy_id`, resolved from the SAME canonical curated entry the marketplace scorecard path uses (main._load_curated_facts via _curated_facts_all). Returns only the keys in _DECISION_CRITICAL_FACT_KEYS that have a non-empty value. Single source of truth (Bug #44): the scorecard / #31 path and get_policy_facts BOTH read these fields from this one canonical entry, so they cannot drift. Read-only; never raises (returns {} on any failure so the tool degrades gracefully).""" pid = (policy_id or "").strip() if not pid: return {} try: all_cur = _curated_facts_all() or {} except Exception: # noqa: BLE001 — curated layer optional return {} entry = all_cur.get(pid) if entry is None: # Curated layer registers every doctype-suffix permutation + # each sibling stem/policy_id pointing at the canonical entry # (main._load_curated_facts Pass-2). Fall back to the canonical # (doctype-stripped) form for callers holding a suffixed id. for suf in _DOCTYPE_SUFFIXES: if pid.endswith(suf): entry = all_cur.get(pid[: -len(suf)]) break if not isinstance(entry, dict): return {} out: dict = {} for k in _DECISION_CRITICAL_FACT_KEYS: v = entry.get(k) # _load_curated_facts already unwraps {value, source_*} to scalar; # accept the wrapped shape too, defensively. if isinstance(v, dict) and "value" in v: v = v.get("value") if v in (None, "", []): continue out[k] = v return out _facts_cache: dict[str, dict] = {} def _unwrap(v: Any) -> Any: """policy_facts values are `{value, source_pdf_path, source_quote}` wrappers OR bare scalars. Return the underlying value either way.""" if isinstance(v, dict) and "value" in v: return v["value"] return v def _candidate_stems(policy_id: str) -> list[str]: """A retrieved chunk's policy_id may be the canonical stem (`insurer__product`) OR a doctype-suffixed stem (`insurer__product__cis`). Try the exact id first, then the canonical form, then every doctype sibling — first existing file wins. """ pid = (policy_id or "").strip() if not pid: return [] stems = [pid] base = pid for suf in _DOCTYPE_SUFFIXES: if base.endswith(suf): base = base[: -len(suf)] break if base != pid: stems.append(base) for suf in _DOCTYPE_SUFFIXES: cand = base + suf if cand not in stems: stems.append(cand) return stems _extraction_cache: dict = {} def _has_extraction(policy_id: str) -> bool: """True iff this policy has an LLM-extracted corpus file — the EXACT renderability rule the marketplace uses (main builds its card set from settings.EXTRACTED_DIR/*.json). A policy with curated facts but no extracted file renders as a BROKEN card: raw policy_id as the title, grade "N/A", "No extraction available for this policy.", "Data not indexed" (/api/bulk-scorecard). Such a policy must NEVER be quality- seeded or cited. Canonical-stem aware (doctype siblings count); cached incl. negatives.""" pid = (policy_id or "").strip() if not pid: return False if pid in _extraction_cache: return _extraction_cache[pid] ok = False try: for stem in _candidate_stems(pid): if (settings.EXTRACTED_DIR / f"{stem}.json").exists(): ok = True break except Exception: # noqa: BLE001 — predicate must never break retrieval ok = False _extraction_cache[pid] = ok return ok def _load_policy_facts(policy_id: str) -> dict: """Return {fact_key: value} for a policy_id, or {} when no facts file exists / is unreadable. Cached per policy_id (incl. negative cache).""" if policy_id in _facts_cache: return _facts_cache[policy_id] out: dict = {} try: for stem in _candidate_stems(policy_id): fp = _FACTS_DIR / f"{stem}.json" if not fp.exists(): continue try: d = _json.loads(fp.read_text()) except Exception: # noqa: BLE001 — corrupt mid-repair → skip continue for k in _FACT_KEYS: if k in d and k not in out: out[k] = _unwrap(d[k]) # Stop once we have the decision-critical signals. KI-279: a # type signal is "have it" if EITHER the canonical key OR the # raw `policy_type` key is present (some files only carry the # latter), so the fixed-benefit gate is never starved. # KI-280: also require the unified-gate signals (uin_code + # max_entry_age) before breaking so the eligibility/dedup rules # are not starved when a sibling doctype file carries them. The # candidate-stem list is short (≤5) so iterating it fully is # cheap; this just stops us breaking after stem #1 when the UIN # / entry-age lives in stem #2. _have_type = ( "policy_type_indemnity_or_fixed" in out or "policy_type" in out ) if ( _have_type and "sum_insured_options" in out and "uin_code" in out and "max_entry_age" in out ): break except Exception as e: # noqa: BLE001 — enrichment must never break retrieval _log.warning("KI-278 _load_policy_facts(%s) failed: %s", policy_id, e) out = {} _facts_cache[policy_id] = out return out _curated_all_cache: dict = {} # single-slot cache for the full curated layer _reviews_cache: dict = {} def _curated_facts_all() -> dict: """The full curated-facts layer — the same source the marketplace scores from (main._load_curated_facts, KI-219/251 canonical precedence; lazy import since main is loaded at request time). Using it here makes a policy's recommendation grade identical to its marketplace grade. The 7-key `_load_policy_facts` is the input for the eligibility/dedup gate, not for grading.""" if "d" not in _curated_all_cache: from backend.main import _load_curated_facts # lazy: avoids cycle _curated_all_cache["d"] = _load_curated_facts() return _curated_all_cache["d"] def _insurer_reviews(slug: str) -> Optional[dict]: """Insurer reviews (40-data/reviews/.json) passed to build_scorecard — drives the claim-experience sub-score; the same source the marketplace uses.""" if not slug: return None if slug in _reviews_cache: return _reviews_cache[slug] ir = None try: rp = settings.DATA_DIR / "reviews" / f"{slug}.json" if rp.exists(): ir = _json.loads(rp.read_text()) except Exception: # noqa: BLE001 — reviews optional ir = None _reviews_cache[slug] = ir return ir def _scorecard_signal(policy_id: str, profile=None) -> dict: """{_grade, _overall_score} for policy_id — the SAME grade its marketplace card shows (#40 SINGLE SOURCE OF TRUTH, 2026-05-18). Delegates to backend.main.marketplace_grade, which derives the grade from the ONE marketplace catalogue computation (backend.main._marketplace_catalogue) using UIN-canonical resolution: a marketing-rename / KI-145 variant id resolves onto its canonical card. There is no longer a parallel doctype-rank/_merge re-implementation here, so the cited-card grade cannot drift from the marketplace card grade — for ALL 148 by construction, asserted by tests/test_scorecard_parity.py. `profile` is accepted for call-site compatibility. The parity-relevant grade LETTER (what _recommendation_fit gates on) is the profile-neutral marketplace grade, identical to the /api/policies/all catalogue view. Read-only; returns {} only when the policy is unknown to the marketplace (ranking still works off co-pay + SI headroom + cosine).""" try: from backend.main import marketplace_grade # lazy: avoids cycle sig = marketplace_grade(policy_id) if sig and sig.get("_grade"): return { "_grade": sig.get("_grade"), "_overall_score": sig.get("_overall_score"), } return {} except Exception: # noqa: BLE001 — scorecard optional; ranking degrades gracefully return {} # ---- accepted fields for save_profile_field -------------------------------- # Mirrors the slot list in sales_brain._REQUIRED_FOR_READY plus the # nice-to-have fields the LLM may capture opportunistically. `gender` # is listed in the spec but is NOT a Profile dataclass field today — # we silently no-op on it rather than rejecting (forward-compat). _ACCEPTED_FIELDS = { "name", "age", "dependents", "location_tier", "income_band", "primary_goal", "health_conditions", "existing_cover_inr", "budget_band", "budget_inr", # #64 — exact ₹/yr; set as a side-effect of budget_band "desired_sum_insured_inr", # SOFT capture (pricing input, post-recap) # Family-detail pricing inputs (B6) — already on the Profile dataclass # via needs_finder.Profile (parents_to_insure / parents_age_max / # parents_has_ped). Listed here so save_profile_field can persist them # when Gemini extracts them post-recap. "parents_to_insure", "parents_age_max", "parents_has_ped", # D2 (2026-05-15) — coupled additions: co-pay tolerance + family medical history "copay_pct", "family_medical_history", # KI-275 (2026-05-15) — smoker / tobacco use, +30-50% premium loading. "smoker", "gender", # tolerated; not persisted unless Profile gains the field } # These are the slots the brain MUST capture before recommending — same # list as sales_brain._REQUIRED_FOR_READY. Kept inline (not imported) # to avoid coupling single_brain to sales_brain. _REQUIRED_FOR_READY = ( "name", "age", "dependents", "location_tier", "income_band", "primary_goal", "health_conditions", ) # ─────────────────────────────────────────────────────────────────────────── # Bug #108 + #110 (2026-05-16) — POST-RECAP pricing & family-history bundle. # # Bug #108: the RULE 2.5 bundle (SI / budget / co-pay / family history / # smoker) is asked in ONE prompt. When the user answers SOME but # not all, the LLM treated the bundle as fully satisfied (it's # SOFT capture) and recommended with the unanswered slot blank — # e.g. asked SI/budget/co-pay/smoker, user gave SI/budget/co-pay, # smoker never re-asked, bot recommended. # Bug #110: family_medical_history (item 5 of the bundle) was the slot most # often silently dropped — the fact-find effectively NEVER asked it. # # FIX: a deterministic ONE-SHOT re-ask gate. After the hard 7-slot gate # passes, on a recommendation retrieve_policies call, if any UNRESOLVED # bundle slot remains we return a directive `pricing_inputs_incomplete` # response (re-ask exactly the unresolved ones, verbatim) — but ONLY once # per session, and ONLY when the user has not explicitly skipped. After the # single re-ask we proceed (SOFT-capture semantics preserved: the user can # still skip; we never hard-loop). # # A bundle slot is RESOLVED when it is either captured on the profile OR the # user explicitly declined/skipped it (session.pricing_bundle_skipped — set # by single_brain when the user says "just show me options" / "skip" / "you # decide"). parents_age_max is conditional: only part of the bundle when the # user is covering parents and it isn't captured yet. _PRICING_BUNDLE_CORE: tuple[str, ...] = ( "desired_sum_insured_inr", "budget_band", "existing_cover_inr", "copay_pct", "family_medical_history", # Bug #110 — must be asked, same skip handling "smoker", ) # Exact verbatim re-ask phrasing per bundle slot (mirrors RULE 2.5 wording so # the re-ask reads identically to the first ask). family_medical_history is # first so a forgotten Bug #110 slot leads the re-ask. _PRICING_BUNDLE_QUESTIONS: dict[str, str] = { "family_medical_history": ( "Any major conditions running in your blood family " "(parents/siblings) — cancer / diabetes / heart disease / " "hypertension? (say 'none' if not)" ), "desired_sum_insured_inr": ( "How much sum insured would you like? (e.g., ₹5L / ₹10L / " "₹25L / ₹1Cr)" ), "budget_band": ( "What's your annual premium budget? (e.g., ₹10-15K/year, or " "₹50K+ for premium covers)" ), "existing_cover_inr": ( "Any existing health cover from work or otherwise? " "(e.g., '5L through employer', or 'no')" ), "copay_pct": ( "Are you OK with a co-pay — sharing 10-30% of every claim — to " "lower the premium? Or do you want zero co-pay?" ), "smoker": "Do you smoke or use tobacco products? (yes / no)", "parents_age_max": ( "Roughly what's the age of the eldest parent you'd cover?" ), } def _profile_has_parents(profile) -> bool: """True when the captured dependents indicate parents are covered (so parents_age_max becomes part of the post-recap bundle).""" dep = getattr(profile, "dependents", None) if isinstance(dep, str) and "parent" in dep.lower(): return True return bool(getattr(profile, "parents_to_insure", None)) def _unresolved_pricing_bundle(profile, session) -> list[str]: """Return the bundle slots that are NEITHER captured on the profile NOR explicitly skipped by the user this session. Empty list ⇒ nothing left to re-ask (recommend may proceed).""" if session is not None and bool( getattr(session, "pricing_bundle_skipped", False) ): # User explicitly said "just show me options / you decide / skip". return [] bundle = list(_PRICING_BUNDLE_CORE) if _profile_has_parents(profile): bundle.append("parents_age_max") # #41 (2026-05-21) — a slot the user has ANSWERED is resolved even if # the answer coerces to an empty value (family_medical_history="none" # → []). profile.asked records answered slots, so an empty-but-asked # slot is NOT re-asked. asked = set(getattr(profile, "asked", None) or []) unresolved: list[str] = [] for slot in bundle: if getattr(profile, slot, None) in (None, "", []) and slot not in asked: unresolved.append(slot) return unresolved # ─────────────────────────────────────────────────────────────────────────── # SLOT_UNION (B6, 2026-05-15) # ─────────────────────────────────────────────────────────────────────────── # Single source of truth for every captured field that drives EITHER the # profile (recommendation match + scorecard) OR the pricing pipeline # (premium_calculator.estimate / bulk_estimate). See module docstring above # for the full slot→consumer matrix. # # Ordering convention (do not re-order without auditing union_snapshot # callers): required slots first, then pricing-only slots, then # family-detail slots. Total = 13. SLOT_UNION: tuple[str, ...] = ( # Recommendation slots (in _REQUIRED_FOR_READY) "name", "age", "dependents", "location_tier", "income_band", "primary_goal", "health_conditions", # Pricing slots (B5 + B6 additions) "budget_band", "budget_inr", # #64 — exact ₹/yr (lossless companion to budget_band) "desired_sum_insured_inr", "existing_cover_inr", # Family-detail slots (used by pricing if applicable) "parents_to_insure", "parents_age_max", "parents_has_ped", # D2 additions (2026-05-15) "copay_pct", "family_medical_history", # KI-275 (2026-05-15) — smoker / tobacco use, +30-50% premium loading. "smoker", ) # Invariant: every SLOT_UNION field must be accepted by save_profile_field # (otherwise the LLM can never capture it). Validated at import time so a # future delete here is loud, not silent. assert all(_s in _ACCEPTED_FIELDS for _s in SLOT_UNION), ( "SLOT_UNION contains a field not in _ACCEPTED_FIELDS: " f"{[s for s in SLOT_UNION if s not in _ACCEPTED_FIELDS]}" ) def union_snapshot(profile) -> dict: """Return a JSON-safe dict of every SLOT_UNION field currently captured on `profile`. Empty / None / [] slots are EXCLUDED so the pricing pipeline can safely treat presence as "captured" (KI-091 null-overwrite rule — never pass None where 0 is meaningful). Used by premium_calculator.estimate / bulk_estimate to read pricing inputs without re-implementing the field-name list on each side. """ snap: dict = {} for fld in SLOT_UNION: try: v = getattr(profile, fld, None) except Exception: # noqa: BLE001 v = None if v in (None, "", []): continue snap[fld] = v return snap def _profile_complete(profile) -> bool: """Return True when every slot in _REQUIRED_FOR_READY is non-empty on the live Profile dataclass.""" for slot in _REQUIRED_FOR_READY: val = getattr(profile, slot, None) if val in (None, "", []): return False return True # ---- save_profile_field ---------------------------------------------------- def save_profile_field(session, field: str, value: Any) -> dict: """Validate + persist a captured profile field on session.profile. Accepted fields: name, age, dependents, location_tier, income_band, primary_goal, health_conditions, existing_cover_inr, budget_band, gender (tolerated, may no-op). Lightweight normalization: - age: int, clamp to [0, 110] - dependents: pass through `_normalize_dependents` when available, else stringify. - existing_cover_inr: parse via `_parse_inr_amount` when available, else int-coerce. - health_conditions: coerce to list[str], lowercase, strip empties. - everything else: string pass-through (Gemini already emits canonical values when prompted correctly). Returns: {"saved": True, "field": ..., "value": ..., "profile_complete": bool} On unknown field: {"saved": False, "error": "unknown_field"} """ if not isinstance(field, str) or not field: return {"saved": False, "error": "missing_field_name"} fld = field.strip().lower() if fld not in _ACCEPTED_FIELDS: return {"saved": False, "error": f"unknown_field:{field}"} profile = session.profile normalized: Any = value try: if fld == "age": normalized = _coerce_age(value) elif fld == "dependents": normalized = _coerce_dependents(value) elif fld == "health_conditions": normalized = _coerce_health_conditions(value) elif fld == "existing_cover_inr": normalized = _coerce_existing_cover(value) elif fld == "desired_sum_insured_inr": normalized = _coerce_desired_sum_insured(value) elif fld in ("parents_to_insure", "parents_has_ped"): normalized = _coerce_bool(value) elif fld == "parents_age_max": normalized = _coerce_age(value) elif fld == "copay_pct": normalized = _coerce_copay_pct(value) elif fld == "family_medical_history": normalized = _coerce_family_medical_history(value) elif fld == "smoker": normalized = _coerce_smoker(value) elif fld == "budget_band": normalized = _coerce_budget_band(value) elif fld == "name": normalized = (str(value).strip() if value is not None else None) or None elif fld == "gender": # Profile dataclass has no gender field today; silently # accept + skip persistence so the LLM doesn't loop trying # to save it. Forward-compat: when Profile gains gender, # this branch just becomes `normalized = str(value).strip()`. return { "saved": False, "field": fld, "value": value, "error": "field_not_on_profile_dataclass", "profile_complete": _profile_complete(profile), } else: # location_tier / income_band / primary_goal — pass through # (budget_band is normalised above via _coerce_budget_band) normalized = (str(value).strip() if value is not None else None) or None except (TypeError, ValueError) as e: return { "saved": False, "field": fld, "value": value, "error": f"normalize_failed:{type(e).__name__}:{e}", } # Drop None/empty so we don't overwrite a previously-captured slot # with a Gemini turn that "didn't extract anything". Universal rule # from KI-091/094 (extractor null overwrite). if normalized in (None, "", []): # #41 (2026-05-21) — an explicit NEGATIVE family-history answer # ("none") legitimately coerces to []. Do NOT silently drop it as # "extracted nothing": record the field on profile.asked so the # post-recap pricing-bundle gate (_unresolved_pricing_bundle) # treats it RESOLVED and never re-asks a slot the user answered. if fld == "family_medical_history" and value is not None and str(value).strip().lower() in ( "none", "no", "nil", "nothing", "n/a", "na", "none.", "no family history", "no family medical history", "no family medical history.", "no family history.", ): try: if fld not in profile.asked: profile.asked.append(fld) except Exception: # noqa: BLE001 pass return { "saved": True, "field": fld, "value": [], "profile_complete": _profile_complete(profile), } return { "saved": False, "field": fld, "value": value, "error": "normalized_empty", "profile_complete": _profile_complete(profile), } if not hasattr(profile, fld): return { "saved": False, "field": fld, "value": value, "error": "field_not_on_profile_dataclass", "profile_complete": _profile_complete(profile), } setattr(profile, fld, normalized) # #64 — when the user states a budget, ALSO preserve the EXACT ₹ amount # losslessly (not just the 4-bucket band) so the slider shows what they # actually said ("₹15,000"), never a band representative ("₹12k"). The # band stays the pricing contract; budget_inr is the display truth. if fld == "budget_band": try: if isinstance(value, bool): _exact = None elif isinstance(value, (int, float)): _exact = int(value) else: from backend.needs_finder import _parse_inr_amount as _pinr import re as _re2 # Strip per-annum qualifiers FIRST — same KI-161 guard # _coerce_budget_band handles, so "₹15,000/yr" / "15000 a # year" / "more than 15000 a year" yield the exact ₹, not # None (the parser otherwise reads "year" as age context). _s = str(value) _cleaned = _re2.sub( r"\b(?:per\s*(?:year|annum)|p\.?\s*a\.?|/\s*(?:yr|year|" r"annum)|a\s*year|annually|yearly|/\s*yr)\b", " ", _s, flags=_re2.IGNORECASE, ) _exact = _pinr(_cleaned) or _pinr(_s) if _exact and _exact > 0 and hasattr(profile, "budget_inr"): profile.budget_inr = int(_exact) if "budget_inr" not in getattr(profile, "asked", []): profile.asked.append("budget_inr") except Exception: # noqa: BLE001 — exact ₹ is best-effort; band still saved pass # Track that the brain has "asked" this field so the rest of the # codebase's helpers (which inspect profile.asked) stay in sync. try: if fld not in getattr(profile, "asked", []): profile.asked.append(fld) except Exception: # noqa: BLE001 — best-effort bookkeeping pass return { "saved": True, "field": fld, "value": normalized, "profile_complete": _profile_complete(profile), } _qseed_cache: dict = {} # (profile_sig) -> [seed chunk dicts] # BUG #30 (B1-c) — per-signature count of TRAILING existing-cover top-up # seeds in `_qseed_cache[sig]`. They sit AFTER the primary window and must # survive the final `[:limit]` slice so a relevant super-top-up is never # truncated out of contention for a user who already holds base cover. _qseed_topup_n: dict = {} def _qseed_slice(rows: list[dict], sig: str, limit: int) -> list[dict]: """Return up to `limit` primary seeds PLUS all existing-cover top-up seeds (which trail the list), so the top-up seeds are never cut.""" n_top = _qseed_topup_n.get(sig, 0) if n_top <= 0: return rows[:limit] primaries = rows[:-n_top] if n_top < len(rows) else [] topups = rows[-n_top:] return primaries[:limit] + topups def _quality_seed_candidates(profile, limit: int = 25) -> list[dict]: """Seed the candidate pool with the catalogue's top policies by the profile-tuned scorecard overall, so a strong policy that isn't cosine-similar to a generic query still enters contention. This only ADDS candidates — filter_pipeline still applies precise eligibility + profile-fit ranking on the union; it never bypasses eligibility or fabricates a recommendation. Cached per profile-signature.""" out: list[dict] = [] try: prof_sig = repr(sorted( (s, getattr(profile, s, None)) for s in ( "age", "income_band", "primary_goal", "existing_cover_inr", "copay_pct", "dependents", "health_conditions", "budget_band", "location_tier", "parents_to_insure", "parents_age_max", ) )) if profile is not None else "none" if prof_sig in _qseed_cache: return _qseed_slice(_qseed_cache[prof_sig], prof_sig, limit) cur = _curated_facts_all() scored: list[tuple[float, str, dict, dict]] = [] seen: set[str] = set() for key, data in cur.items(): if not isinstance(data, dict): continue pid = (data.get("policy_id") or key or "").strip() if not pid or pid in seen: continue seen.add(pid) if not _has_extraction(pid): # Curated-graded but no extracted corpus → its card would # render as N/A / "No extraction available for this policy" # / "Data not indexed". Never seed a non-renderable policy. continue sig = _scorecard_signal(pid, profile=profile) ov = sig.get("_overall_score") if ov is None: continue try: ovf = float(ov) except (TypeError, ValueError): continue scored.append((ovf, pid, data, sig)) scored.sort(key=lambda t: -t[0]) # BUG #30 (B1-c) — when the user already holds ANY base cover, also # union-in the top-N top-up / super-top-up policies even if they fall # outside the profile-neutral top-25 window, so a directly relevant # super-top-up is seeded into contention (filter_pipeline then ranks # the union via _fit_score, which now carries the existing-cover term). # Deterministic: drawn from the already-sorted `scored` list, no RNG. _existing = getattr(profile, "existing_cover_inr", None) \ if profile is not None else None try: _existing_int = int(str(_existing).replace(",", "").strip()) \ if _existing not in (None, "") else 0 except (TypeError, ValueError): _existing_int = 0 window = max(limit, 25) primary_rows = scored[:window] extra: list[tuple[float, str, dict, dict]] = [] if _existing_int > 0: from backend.retrieval_filters import _is_top_up as _rf_is_top_up in_window = {pid for _, pid, _, _ in primary_rows} _TOPUP_SEED_MAX = 3 # bounded extra seeds; keeps pool deterministic for ovf, pid, data, sig in scored[window:]: if pid in in_window: continue probe = { "policy_name": data.get("policy_name", pid), **_load_policy_facts(pid), } if _rf_is_top_up(probe): extra.append((ovf, pid, data, sig)) if len(extra) >= _TOPUP_SEED_MAX: break # `_n_topup_seeds` is preserved on the cached list so the final # slice keeps the existing-cover top-up seeds (they sit AFTER the # primary window and would otherwise be cut by `out[:limit]`). for ovf, pid, data, sig in primary_rows + extra: ch = { "chunk_id": f"{pid}__qseed", "policy_id": pid, "policy_name": clean_display_policy_name( data.get("policy_name", pid) ), "insurer_slug": data.get("insurer_slug") or (pid.split("__", 1)[0] if "__" in pid else ""), "doc_type": "policy", "source_url": data.get("_primary_source_pdf") or "", # No cosine text — filter_pipeline ranks the union by # _fit_score (scorecard grade + profile fit), not cosine, # so a 0.0 cosine score does not bury a seeded A-policy. "chunk_text": "", "score": 0.0, } ch.update(_load_policy_facts(pid)) ch.update(sig) out.append(ch) _qseed_cache[prof_sig] = out _qseed_topup_n[prof_sig] = len(extra) return _qseed_slice(out, prof_sig, limit) except Exception as e: # noqa: BLE001 — seeding must never break retrieval _log.warning("quality-seed failed: %s", e) return out[:limit] # ---- uploaded-doc (quarantine) bypass helpers ------------------------------ def _session_has_quarantine_docs(session_id: str) -> bool: """True iff this session has at least one chunk in the SEPARATE `user_uploads_quarantine` Chroma collection. Strictly session-scoped (where={"session_id": session_id}); cheap metadata-only `.get(limit=1)`. Never raises — a Chroma hiccup just means "treat as no upload" so the normal profile-gate path runs. """ if not session_id: return False try: from rag.ingest import get_quarantine_collection q = get_quarantine_collection().get( where={"session_id": session_id}, limit=1, include=[], ) return bool(q and q.get("ids")) except Exception as e: # noqa: BLE001 — best-effort probe _log.warning( "quarantine presence probe failed (sid=%s): %s: %s", session_id, type(e).__name__, str(e)[:160], ) return False async def _retrieve_uploaded_only( query: str, session_id: str, top_k: int ) -> dict: """Quarantine-scoped retrieval that returns ONLY this session's uploaded-doc chunks, bypassing the recommendation profile-gate. Reuses `rag.retrieve.retrieve(session_id=...)` (whose quarantine boost pass already prepends this session's uploaded chunks), then filters the result down to `doc_type == "user_upload"` so the bypass response can NEVER contain general-corpus policy chunks against an incomplete profile. Returns the same dict shape as `retrieve_policies`. """ try: from rag.retrieve import retrieve as _retrieve chunks = await _retrieve( query=query, top_k=max(int(top_k) if top_k else 8, 3), session_id=session_id, ) except Exception as e: # noqa: BLE001 — graceful empty _log.warning( "uploaded-only retrieve failed (sid=%s): %s: %s", session_id, type(e).__name__, str(e)[:160], ) return {"chunks": [], "count": 0, "error": f"{type(e).__name__}"} uploaded = [] for c in chunks or []: if (getattr(c, "doc_type", "") or "").lower() != "user_upload": continue # Triple-check session ownership before exposing the chunk — # belt + suspenders on top of the where={"session_id"} filter. if (getattr(c, "session_id", None) or session_id) != session_id and \ getattr(c, "session_id", None) not in (None, ""): continue uploaded.append({ "chunk_id": getattr(c, "chunk_id", ""), "policy_id": getattr(c, "policy_id", ""), "policy_name": clean_display_policy_name( getattr(c, "policy_name", "") ), "insurer_slug": getattr(c, "insurer_slug", "") or "user-upload", "doc_type": "user_upload", "source_url": getattr(c, "source_url", ""), "chunk_text": (getattr(c, "text", "") or "")[:1200], "score": float(getattr(c, "score", 0.0) or 0.0), }) if not uploaded: return {"chunks": [], "count": 0} return { "chunks": uploaded, "count": len(uploaded), "query": query, "source": "uploaded_doc_quarantine", "note": ( "These chunks come from the user's OWN uploaded policy PDF " "(session-scoped quarantine). Answer the user's question " "about THIS document directly. Do NOT treat this as a " "general recommendation and do NOT block on profile " "completeness — the user explicitly asked about their " "uploaded file." ), } # ---- retrieve_policies ----------------------------------------------------- _NP_STOP = { "health", "insurance", "insurances", "policy", "policies", "plan", "plans", "the", "and", "of", "for", "cover", "covers", "coverage", "india", "general", "life", "assurance", "co", "ltd", "limited", "company", "scheme", "what", "is", "are", "tell", "me", "about", "detail", "details", "benefit", "benefits", } def _resolve_named_policy(query: str) -> Optional[str]: """#61 — if the question UNAMBIGUOUSLY names a specific catalogue policy, return its policy_id so a factual Q&A can retrieve it without being blocked by the recommendation profile-gate. Conservative: >=2 significant name tokens present, >=60% of the policy_name's significant tokens in the query, and the top match strictly ahead of the runner-up (no ambiguous grab). Returns None otherwise. Best-effort; never raises.""" import re as _re try: q = " " + _re.sub(r"[^a-z0-9 ]", " ", (query or "").lower()) + " " if len(q) < 8: return None from backend.main import _marketplace_catalogue # lazy: avoids cycle scored: list[tuple[int, float, str, str]] = [] for c in _marketplace_catalogue(None): name = (getattr(c, "policy_name", "") or "").lower() toks = { t for t in _re.sub(r"[^a-z0-9 ]", " ", name).split() if len(t) > 2 and t not in _NP_STOP } if len(toks) < 2: continue hit = sum(1 for t in toks if f" {t} " in q) cov = hit / len(toks) if hit >= 2 and cov >= 0.6: pid = getattr(c, "policy_id", None) if pid: scored.append((hit, cov, pid, name)) if not scored: return None # VERSION DISAMBIGUATION (#61) — when the user explicitly states a # version ("...Health Companion V2022" / "V22"), the matching card # that carries that exact version token MUST win, even if a shorter # base-name card scores more generic tokens. Without this the base # "Niva Bupa Health Companion" (0 corpus chunks) was chosen over # "Health Companion V2022" (the card the user named, which HAS # chunks) → retrieval returned nothing → "I couldn't find it". _ver = set(_re.findall(r"\bv?\d{2,4}\b", q)) if _ver: _vmatch = [ s for s in scored if any(v in _re.sub(r"[^a-z0-9 ]", " ", s[3]).split() for v in _ver) ] if len(_vmatch) == 1: return _vmatch[0][2] if _vmatch: _vmatch.sort(key=lambda s: (s[0], s[1]), reverse=True) if len(_vmatch) == 1 or _vmatch[0][0] > _vmatch[1][0]: return _vmatch[0][2] scored.sort(key=lambda s: (s[0], s[1]), reverse=True) if len(scored) == 1 or scored[0][0] > scored[1][0]: return scored[0][2] except Exception: # noqa: BLE001 — name resolution is best-effort pass return None async def retrieve_policies( query: str, top_k: int = 8, policy_filter_ids: Optional[list[str]] = None, profile=None, intent: str = "recommendation", session=None, session_id: Optional[str] = None, ) -> dict: """Call the existing Chroma retriever and return policy chunks. Returns: {"chunks": [{policy_id, policy_name, insurer_slug, chunk_text, doc_type, source_url, score, ...}, ...], "count": N, "query": query, "guard": optional {reason, fallback} if filter pipeline says abort} On failure: {"chunks": [], "count": 0, "error": "..."}. QUARANTINE-RETRIEVAL FIX (2026-05-16) — `session_id` is threaded all the way down to `rag.retrieve.retrieve(...)` so a PDF the caller uploaded via POST /api/upload-policy (indexed into the SEPARATE `user_uploads_quarantine` Chroma collection, tagged with this session's id) becomes retrievable BY THE CHAT BRAIN FOR THAT SESSION ONLY. Before this fix the upload was embedded but the brain never forwarded session_id, so the quarantine boost pass in retrieve.py never fired and an uploaded policy could never surface in the conversation. Resolution order: explicit `session_id` arg > `session.session_id` attribute > None (no quarantine lookup). User A's session_id never leaks into user B's retrieval because the quarantine `where={"session_id": ...}` filter is strictly equality-scoped (see rag/retrieve.py). """ # Resolve the effective session id. The single_brain dispatcher passes # the live SessionState as `session`; older callers may pass session_id # explicitly. Never raise — a missing id just means "no quarantine". eff_session_id = session_id if eff_session_id is None and session is not None: eff_session_id = getattr(session, "session_id", None) if isinstance(eff_session_id, str): eff_session_id = eff_session_id.strip() or None elif eff_session_id is not None: eff_session_id = None if not isinstance(query, str) or not query.strip(): return {"chunks": [], "count": 0, "error": "empty_query"} # NAMED-POLICY Q&A BYPASS (#61, 2026-05-18) — a direct factual question # about a SPECIFIC catalogue policy ("what is the PED waiting period for # HDFC ERGO Optima Restore?") must be answerable on a cold / # incomplete-profile session. The profile-complete gate below blocks # ALL policy_filter_ids=None retrieval until the 7-slot fact-find is # done — so the bot replied "I couldn't find that policy" for policies # that ARE indexed (#26/#28). The gate exists to not RECOMMEND before # fact-find, not to refuse a factual lookup. When the query # unambiguously names a known policy, resolve it and reuse the existing # TRUSTED known-policy path (legitimately gate-bypassing + getting the # #61 canonical-family $in expansion in rag/retrieve.py). # # NOT gated on `intent`: single_brain._execute_tool HARDCODES # intent="recommendation" for EVERY tool call (single_brain.py:1529), # so an intent-conditioned bypass is dead code on the live path — the # proven (2026-05-18) #61 root cause: every named-policy Q&A arrived # with intent="recommendation" → the bypass never fired → the gate # blocked → "I couldn't find that policy". Safety is the RESOLVER's # conservatism, not the intent: a broad "recommend me a plan" / generic # profile request names NO specific policy → _resolve_named_policy # returns None → still fully gated (verified). Only an explicit, # unambiguous name (>=2 significant tokens, >=60% coverage, # version-aware, top strictly ahead of runner-up) triggers it. if not policy_filter_ids: _np = _resolve_named_policy(query) if _np: policy_filter_ids = [_np] # UPLOADED-DOC BYPASS (2026-05-18) — the user is explicitly promised in # the UI ("✓ Indexed … It's now searchable in this chat. Ask me about # it.") that an uploaded policy PDF is immediately queryable. But the # profile-complete gate below blocks ALL retrieval until the 7-slot # fact-find is done (unless policy_filter_ids is set, which the LLM # can't know for a freshly-uploaded doc). An uploaded doc is, by # definition, a known-doc-FOR-THIS-SESSION — semantically identical to # the policy_filter_ids follow-up branch that already legitimately # bypasses the gate. So: when this session has chunks in the SEPARATE # `user_uploads_quarantine` collection, run a quarantine-ONLY retrieval # that bypasses the recommendation profile-gate. Strictly session- # scoped (where={"session_id": eff_session_id}) so user A's upload # never leaks into user B. The full recommendation flow (general # corpus + eligibility + scorecard) still requires the complete # profile — this bypass only surfaces the user's OWN uploaded doc. if ( eff_session_id and not policy_filter_ids and _session_has_quarantine_docs(eff_session_id) ): up = await _retrieve_uploaded_only(query, eff_session_id, top_k) # Only short-circuit if the gate would otherwise block (incomplete # profile) AND we actually found uploaded chunks. If the profile is # already complete we fall through so the upload is folded into the # normal ranked recommendation pool by the quarantine boost pass. _profile_incomplete = profile is not None and any( getattr(profile, slot, None) in (None, "", []) for slot in _REQUIRED_FOR_READY ) if up.get("count", 0) > 0 and _profile_incomplete: return up # Profile-complete gate (skip if caller supplied policy_filter_ids — that # branch is a known-policy follow-up). Defense-in-depth: even if the LLM # ignores RULE 2 of the system prompt, this refuses to return chunks # against an incomplete profile. if profile is not None and not policy_filter_ids: missing = [ slot for slot in _REQUIRED_FOR_READY if getattr(profile, slot, None) in (None, "", []) ] if missing: # KI-Z6-NONE follow-up (2026-05-15): make the response # extremely directive so Gemini doesn't burn an iteration # re-trying retrieve_policies on the same incomplete profile. # Provide an exact_question string the model can literally # relay to the user for the first missing slot. _SLOT_QUESTIONS = { "name": "What's your name?", "age": "How old are you?", "dependents": ( "Who would you like the cover to include — just you, " "or spouse / kids / parents?" ), "location_tier": "Which city do you live in?", "income_band": ( "Roughly what's your annual household income — " "under 10 lakh, 10-25 lakh, or above 25 lakh?" ), "primary_goal": ( "Is this your first health policy, an upgrade, for " "tax planning, or to find a cheaper option?" ), "health_conditions": ( "Do you or your family have any pre-existing health " "conditions like diabetes, BP, or thyroid? If none, " "just say no." ), } first = missing[0] return { "chunks": [], "count": 0, "error": "profile_incomplete", "missing_slots": missing, "action_required": "ask_user_for", "field": first, "exact_question": _SLOT_QUESTIONS.get( first, f"Could you share your {first.replace('_', ' ')}?", ), "instruction": ( f"Profile is incomplete — missing: {', '.join(missing)}. " "Do NOT call retrieve_policies again this turn. Do NOT " "retry save_profile_field for the same field. Emit a " "TEXT reply that asks the user the `exact_question` " "above verbatim." ), } # Bug #108 + #110 (2026-05-16) — POST-RECAP pricing & family-history # bundle re-ask gate. The hard 7-slot gate above is satisfied; before we # return chunks for a RECOMMENDATION, ensure every bundle item (SI / # budget / existing cover / co-pay / FAMILY MEDICAL HISTORY / smoker / # parents-age-if-applicable) has been RESOLVED — captured OR explicitly # skipped. If the user answered some but not all, re-ask ONLY the # unresolved ones — but exactly ONCE per session (one-shot guard) so we # never hard-loop and SOFT-capture semantics survive (the user can still # skip on the re-ask). Follow-ups (policy_filter_ids) bypass this — it's # purely the first recommendation path. if ( profile is not None and not policy_filter_ids and (intent or "").lower() == "recommendation" and session is not None and not bool(getattr(session, "pricing_bundle_reasked", False)) ): unresolved = _unresolved_pricing_bundle(profile, session) if unresolved: # One-shot: mark so the NEXT recommendation retrieve proceeds # even if the user skips on the re-ask (SOFT capture, not a hard # gate — Bug #108 fix must re-ask, not loop forever). try: session.pricing_bundle_reasked = True except Exception: # noqa: BLE001 — bookkeeping must not break pass _qs = [ _PRICING_BUNDLE_QUESTIONS.get( s, f"Could you share your {s.replace('_', ' ')}?" ) for s in unresolved ] _numbered = "\n".join( f"{i}. {q}" for i, q in enumerate(_qs, start=1) ) return { "chunks": [], "count": 0, "error": "pricing_inputs_incomplete", "missing_slots": unresolved, "action_required": "ask_user_for", "field": unresolved[0], "exact_question": ( "Before I pull your recommendations, just a couple more " "(you can skip any):\n" + _numbered ), "instruction": ( "The user answered some but not all of the pricing / " "family-history questions. Do NOT call retrieve_policies " "again this turn and do NOT recommend yet. Emit a TEXT " "reply that asks ONLY the still-missing items in " "`exact_question` verbatim. If the user then provides " "them, save each via save_profile_field; if they skip, " "proceed to retrieve_policies on the next turn." ), } try: from rag.retrieve import retrieve as _retrieve # Decouple the Chroma recall pool from the caller's top_k: pull a # wide pool so eligibility + scorecard-aware ranking choose from a # broad set, then return the top-k best-fit survivors (truncated # after filter_pipeline below). For an explicit known-policy # follow-up (policy_filter_ids) keep the narrow top_k. _recall_pool = ( (int(top_k) if top_k else 8) if policy_filter_ids else _RECALL_POOL ) chunks = await _retrieve( query=query, top_k=_recall_pool, policy_ids=policy_filter_ids or None, session_id=eff_session_id, ) except Exception as e: # noqa: BLE001 — return graceful empty _log.warning( "retrieve_policies failed (q=%r): %s: %s", query[:120], type(e).__name__, str(e)[:200], ) return { "chunks": [], "count": 0, "error": f"{type(e).__name__}:{str(e)[:200]}", } raw: list[dict] = [] for c in chunks or []: pid = getattr(c, "policy_id", "") doc_type = getattr(c, "doc_type", "") chunk_dict = { "chunk_id": getattr(c, "chunk_id", ""), "policy_id": pid, "policy_name": clean_display_policy_name( getattr(c, "policy_name", "") ), "insurer_slug": getattr(c, "insurer_slug", ""), "doc_type": doc_type, "source_url": getattr(c, "source_url", ""), "chunk_text": (getattr(c, "text", "") or "")[:1200], "score": float(getattr(c, "score", 0.0) or 0.0), "min_entry_age": getattr(c, "min_entry_age", None), "max_entry_age": getattr(c, "max_entry_age", None), } # KI-278 — enrich policy chunks with the structural facts the # eligibility/profile-fit filter needs (top-up signal, SI options, # mandatory co-pay, scorecard grade). Skip non-policy chunks # (profile/regulatory/review) — they're never recommendable policies # and the eligibility rules don't apply to them. if pid and (doc_type or "").lower() not in ( "profile", "regulatory", "review" ): try: chunk_dict.update(_load_policy_facts(pid)) chunk_dict.update(_scorecard_signal(pid, profile=profile)) except Exception as e: # noqa: BLE001 — never break retrieval _log.warning("KI-278 enrich(%s) failed: %s", pid, e) raw.append(chunk_dict) # X5 sidecar: apply profile-fit + citation-grounding + dedup. Skip when # caller supplied an explicit policy_filter_ids (we already know which # policies they want). guard_signal = None filtered = raw if not policy_filter_ids: # Union the cosine pool with the catalogue's top profile-graded # policies so a strong policy that isn't cosine-similar to a # generic needs-query still enters contention. filter_pipeline # then applies eligibility + profile-fit ranking on the union. A # cosine chunk wins on a dup (it carries real retrieved text); # seeds only add missing candidates. if profile is not None: _seen_pids = { (c.get("policy_id") or "").strip() for c in raw } for _seed in _quality_seed_candidates(profile, limit=25): _sp = (_seed.get("policy_id") or "").strip() if _sp and _sp not in _seen_pids: _seen_pids.add(_sp) raw.append(_seed) try: from backend.retrieval_filters import filter_pipeline filtered, guard_signal = filter_pipeline( raw, profile=profile, query=query, intent=intent, ) except Exception as e: # noqa: BLE001 — pipeline must never break retrieval _log.warning("retrieval_filters.filter_pipeline failed: %s", e) filtered = raw # filter_pipeline has eligibility-filtered + ranked the wide pool # best-fit-first (scorecard-aware, deduped to ~1 chunk/policy). # Return only the top-N best-fit survivors — enough for the LLM to # pick 2-4 recommendations and for the citation builder to select # from, without the wide tail as context. This is the only # truncation, done after quality ranking, so the LLM and the cards # see the same best-fit set. filtered = filtered[: max((int(top_k) if top_k else 8), 12)] # For a recommendation turn, apply the same fitness floor the # citation builder uses (_recommendation_fit) here, so the LLM only # sees cardable policies — prose and cards stay 1:1 (it cannot name # a policy it won't card). If nothing clears the floor the set is # empty and the LLM says there is no strong match rather than # padding with a sub-floor policy. qa / follow-up intents are # untouched (they cite supporting source chunks regardless of # recommendation grade). if (intent or "").lower() == "recommendation" and filtered: try: from backend.single_brain import _recommendation_fit filtered = [c for c in filtered if _recommendation_fit(c)[0]] except Exception as e: # noqa: BLE001 — never break retrieval _log.warning("rec-fit 1:1 gate failed: %s", e) # X7 — cache slug→insurer lookups on session so a subsequent # mark_recommendation call (same turn) can stamp the right insurer on the # shown_policies event. Each retrieve_policies call MERGES into the cache # rather than overwriting, so an LLM that hits multiple retrieves before # marking still resolves every cited slug. Keep last_retrieved_chunks too # for parity with future tools that need the full chunk objects. if session is not None: try: slug_to_insurer = dict(getattr(session, "slug_to_insurer", {}) or {}) for c in filtered: slug = (c.get("policy_id") or "").strip() insurer = (c.get("insurer_slug") or "").strip() if slug and insurer: slug_to_insurer[slug] = insurer session.slug_to_insurer = slug_to_insurer session.last_retrieved_chunks = list(filtered) except Exception: # noqa: BLE001 — bookkeeping must never fail retrieval pass out = { "chunks": filtered, "count": len(filtered), "query": query, } if guard_signal is not None: out["guard"] = guard_signal return out # ---- mark_recommendation --------------------------------------------------- def get_policy_facts(session, policy_ids: Optional[list[str]] = None) -> dict: """Return AUTHORITATIVE claim / reputation / scorecard / coverage facts for one or more policy_ids — the SAME data the frontend detail-modal shows (insurer reviews `claim_metrics` + scorecard grade + the curated coverage facts). This is the tool the brain calls for ANY follow-up about claim-settlement ratio, claim denials/rejections, complaints, incurred-claim ratio, insurer reputation, or to COMPARE two policies the user already saw. Root cause (2026-05-18): the brain previously had NO way to reach claim/denial/complaint/review data — retrieve_policies returns policy WORDING chunks only — so it falsely answered "I don't have enough information" for claims questions and could not back a verbal comparison. This tool closes that gap; it reuses the existing `_insurer_reviews` / `_scorecard_signal` / `_load_policy_facts` / `_curated_facts_all` loaders so a verbal answer matches the modal. `policy_ids` empty/None → falls back to the active shortlist (`session.last_recommendation_ids`) so "compare the ones you showed" works without the model re-deriving ids. """ ids = [str(p).strip() for p in (policy_ids or []) if str(p).strip()] if not ids: ids = [ str(p).strip() for p in (getattr(session, "last_recommendation_ids", []) or []) if str(p).strip() ] if not ids: return { "ok": False, "error": ( "no_policy_ids — pass policy_ids, or recommend policies " "first so there is an active shortlist" ), } # Resolve policy_id → (policy_name, insurer_slug) from the caches # retrieve_policies stashed this session, then the curated catalogue. slug_map = dict(getattr(session, "slug_to_insurer", {}) or {}) name_by_id: dict[str, str] = {} for c in getattr(session, "last_retrieved_chunks", []) or []: pid = (c.get("policy_id") or "").strip() if not pid: continue name_by_id.setdefault(pid, c.get("policy_name") or pid) if pid not in slug_map and c.get("insurer_slug"): slug_map[pid] = c.get("insurer_slug") try: curated = _curated_facts_all() except Exception: # noqa: BLE001 — curated layer optional curated = {} out: list[dict] = [] for pid in ids: cur = curated.get(pid) or {} slug = (slug_map.get(pid) or cur.get("insurer_slug") or "").strip() pname = clean_display_policy_name( name_by_id.get(pid) or cur.get("policy_name") or pid ) rv = _insurer_reviews(slug) or {} cm = rv.get("claim_metrics") or {} agg = rv.get("aggregate_score") or {} sig = _scorecard_signal(pid) or {} try: facts = _load_policy_facts(pid) or {} except Exception: # noqa: BLE001 — facts optional facts = {} out.append( { "policy_id": pid, "policy_name": pname, "insurer_slug": slug, "insurer_name": rv.get("insurer_name") or (slug.replace("-", " ").title() if slug else ""), "scorecard_grade": sig.get("_grade"), "scorecard_overall_0_100": sig.get("_overall_score"), "claim_settlement_ratio_pct": cm.get( "claim_settlement_ratio_pct" ), "claim_settlement_ratio_year": cm.get( "claim_settlement_ratio_year" ), "three_year_avg_csr_pct": cm.get("three_year_avg_csr_pct"), "complaints_per_10k_policies": cm.get( "complaints_per_10k_policies" ), "complaints_year": cm.get("complaints_year"), "claims_rejected_fy24": cm.get("claims_rejected_fy24"), "incurred_claim_ratio_pct": cm.get("incurred_claim_ratio_pct"), "reputation_headline": agg.get("headline"), "reputation_grade": agg.get("letter_grade"), "claim_data_source_url": ( cm.get("source_irdai_url") or cm.get("source_secondary_url") or cm.get("source_complaints_url") ), # Bug #44 — decision-critical fields (PED waiting, initial # waiting, copay, room-rent cap, CSR) are resolved from the # SAME canonical curated entry the scorecard / #31 path # uses, so a verbal answer / comparison table built from # this tool can never contradict the policy's scorecard # card. They OVERRIDE the divergent _load_policy_facts # 7-key resolver for these keys, agreeing by construction. "key_coverage_facts": { **{k: v for k, v in facts.items() if v not in (None, "", [])}, **canonical_decision_facts(pid), }, "reviews_available": bool(rv), } ) return { "ok": True, "count": len(out), "policies": out, "note": ( "This is the authoritative claim-settlement / complaint / " "denial / scorecard data (IRDAI + scorecard) — answer the " "user's question directly from it; do NOT say you lack this " "information. Cite as " "[Source: claim data (IRDAI), ]." ), } def mark_recommendation( session, policy_ids: list[str], is_final: bool = False, ) -> dict: """Persist the policies just recommended so follow-up turns can resolve references like "tell me about #2". Sets `session.last_recommendation_ids = policy_ids` (the same field the orchestrator already maintains for follow-up routing — KI-224 / KI-228). `is_final` is accepted for forward-compat (when the session grows a `closed` field); today it's logged but not persisted. Returns: {"recorded": True, "policy_ids": [...], "is_final": bool} """ if not isinstance(policy_ids, list): return {"recorded": False, "error": "policy_ids_not_list"} # Coerce + dedupe while preserving order. seen: set[str] = set() cleaned: list[str] = [] for pid in policy_ids: s = str(pid).strip() if s and s not in seen: seen.add(s) cleaned.append(s) # Z2 fix — Issue 2 (hallucinated closure). Vikram T6 saw the LLM emit # mark_recommendation with an empty policy_ids list, the tool silently # returned {"recorded": True, "policy_ids": []}, and the bot then said # "I'm glad we found a good fit" despite ZERO cards shown. Two # preconditions, gated BEFORE any session write so we don't poison # last_recommendation_ids / shown_policies events with junk: # (a) empty (after dedup) → no_policies_supplied # (b) non-empty BUT no retrieval history this session → caller # must run retrieve_policies first (Y2 cache check via # session.last_retrieved_chunks) if not cleaned: return {"recorded": False, "error": "no_policies_supplied"} _retrieval_cache = getattr(session, "last_retrieved_chunks", None) if not _retrieval_cache: return { "recorded": False, "error": "no_retrieval_history — call retrieve_policies first", } try: session.last_recommendation_ids = cleaned except Exception as e: # noqa: BLE001 return { "recorded": False, "error": f"setattr_failed:{type(e).__name__}:{e}", } if is_final and hasattr(session, "closed"): try: session.closed = True # type: ignore[attr-defined] except Exception: # noqa: BLE001 pass # X7 — write a shown_policies event per cited policy so the admin # Recommendation History panel needs single_brain turns logged the same # way the (now-removed) orchestrator did its `_log_shown_policies` # (KI-063): dedupe by slug, resolve insurer via the slug→insurer # cache `retrieve_policies` stashed on the session this turn, and stamp # `turn_idx=session.turn_idx` so the frontend "Conversation turn" column # has a real value instead of "—". # # No profile name → no JSON file to write to (anonymous session). No # insurer resolution for a slug → skip that slug. All errors swallowed # so a logging failure never breaks the tool reply back to Gemini. # ADR-043 (2026-05-27) — record_policy_event used to write the shown # policy onto the named-profile JSON for cross-session "have I shown # this before" tracking. Cross-session persistence is gone; the # in-memory equivalent (avoid re-pitching within the same session) # is handled by session.last_recommendation_ids / shown_policies on # the live Profile dataclass. try: profile = getattr(session, "profile", None) if profile is not None and cleaned: shown = list(getattr(profile, "shown_policies", None) or []) existing_slugs = {(e or {}).get("policy_slug") for e in shown} slug_to_insurer = dict(getattr(session, "slug_to_insurer", {}) or {}) turn_idx = int(getattr(session, "turn_idx", 0) or 0) session_id = getattr(session, "session_id", None) now_iso = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) for slug in cleaned: if not slug or slug in existing_slugs: continue insurer = slug_to_insurer.get(slug) if not insurer: continue shown.append({ "policy_slug": slug, "insurer": insurer, "event_at": now_iso, "session_id": session_id, "reason": "shown_in_recommendation", "turn_idx": turn_idx, }) existing_slugs.add(slug) profile.shown_policies = shown except Exception as e: # noqa: BLE001 — never break the tool reply _log.warning( "mark_recommendation shown-event logging (in-memory) failed: " "%s: %s", type(e).__name__, str(e)[:200], ) return { "recorded": True, "policy_ids": cleaned, "is_final": bool(is_final), } # ---- private normalizers --------------------------------------------------- def _coerce_age(value: Any) -> Optional[int]: """int(value), clamped to [0, 110]. Empty / non-numeric → None.""" if value is None: return None try: if isinstance(value, bool): # bool is an int subclass — block first return None n = int(value) except (TypeError, ValueError): # Try string parse — Gemini sometimes emits "29" as a JSON string try: n = int(str(value).strip()) except (TypeError, ValueError): return None if n < 0: n = 0 if n > 110: n = 110 return n # Ported from sales_brain_normalizer.py (B6 cleanup, 2026-05-15) — the legacy # normalizer module is being deleted; this is the only consumer left. _DEPENDENT_VAGUE_TERMS = ( "family", "everyone", "all of us", "everybody", "whole family", "joint family", ) def _normalize_dependents_inline(value: Any) -> Optional[str]: """Port of sales_brain_normalizer._normalize_dependents. Schema-free variant — the original required a `schema["values"]` lookup for direct enum hits; here we hard-code the canonical bucket set used by the bot (kept in sync with the previous fact-find schema). """ if not isinstance(value, str): return None s = value.strip().lower() if not s: return None # Canonical enum values (formerly schema["values"]). _CANONICAL_VALUES = ( "self", "self+spouse", "self+spouse+kids", "self+spouse+parents", "self+spouse+kids+parents", "self+kids", "self+parents", ) if s in _CANONICAL_VALUES: return s has_spouse = any(k in s for k in ("spouse", "wife", "husband", "partner")) has_kids = ("kid" in s) or ("child" in s) or ("children" in s) or ("son" in s) or ("daughter" in s) has_parents = "parent" in s # Vague terms with no specific signals → cannot coerce if any(v in s for v in _DEPENDENT_VAGUE_TERMS) and not (has_spouse or has_kids or has_parents): return None if has_spouse and has_kids and has_parents: return "self+spouse+kids+parents" if has_spouse and has_parents: return "self+spouse+parents" if has_spouse and has_kids: return "self+spouse+kids" if has_spouse: return "self+spouse" if has_kids and has_parents: # No canonical bucket — fold parents into the wider bundle return "self+spouse+kids+parents" if has_kids: return "self+kids" if has_parents: return "self+parents" # KI-222 — expand the "self" alias set. Live captures showed users # answering "single", "unmarried", "no dependents", "just myself" etc., # which previously fell through to None and got silently dropped — the # bot then re-asked the same slot on the next turn. _SELF_ALIASES = ( "self", "me", "just me", "only me", "myself", "only self", "single", "unmarried", "alone", "bachelor", "no dependents", "just myself", "nobody else", "no one else", "nobody", "myself only", "by myself", "solo", ) if s in _SELF_ALIASES: return "self" # Substring fall-through for the same intents when wrapped in extra prose # (e.g. "i'm single right now", "just myself for now"). if any(alias in s for alias in ( "single", "unmarried", "no dependents", "just myself", "by myself", "nobody else", "no one else", "myself only", "bachelor", "solo", )): return "self" return None def _coerce_dependents(value: Any) -> Optional[str]: """Normalize raw dependents text to a canonical bucket; see `_normalize_dependents_inline` (ported from sales_brain_normalizer).""" if value is None: return None try: return _normalize_dependents_inline(value) except Exception: # noqa: BLE001 — best-effort s = str(value).strip() return s or None def _coerce_existing_cover(value: Any) -> Optional[int]: """Parse INR amounts like "5L" / "5 lakh" / 500000 via the canonical parser. Numeric pass-throughs are clamped to >= 0. """ if value is None: return None if isinstance(value, bool): return None if isinstance(value, (int, float)): n = int(value) return max(0, n) try: from backend.needs_finder import _parse_inr_amount parsed = _parse_inr_amount(str(value)) if parsed is not None: return max(0, int(parsed)) except Exception: # noqa: BLE001 pass # Last-ditch: strip non-digits try: digits = "".join(ch for ch in str(value) if ch.isdigit()) if digits: return max(0, int(digits)) except Exception: # noqa: BLE001 pass return None # Canonical budget bands — the EXACT string contract the frontend # ProfileBuilderPanel round-trips via budgetBandToInr / budgetInrToBand # (frontend/src/app/page.tsx) and that needs_finder._parse_budget_band # emits. Keeping this set inline makes the contract greppable from the # capture surface. _CANONICAL_BUDGET_BANDS: frozenset[str] = frozenset( {"under_15k", "15k_30k", "30k_60k", "60k+"} ) def _coerce_budget_band(value: Any) -> Optional[str]: """Normalise a budget capture to the documented `budget_band` contract. Bug #109 (2026-05-16). The user states a premium budget in chat ("max ₹15,000/yr", "around 30k", "15000"). Gemini calls save_profile_field(field="budget_band", value=...) but often passes the NUMERIC the user said, not a canonical band. The old code let budget_band fall through the generic string pass-through, so profile.budget_band was stored as the raw "15000" — which the frontend ProfileBuilderPanel's budgetBandToInr() switch can't map, so the panel never pre-filled the budget slider even though the bot's summary showed it. Mapping (matches frontend budgetInrToBand + needs_finder bands): • already a canonical band ("15k_30k") → passed through unchanged • free-text / numeric ("max ₹15,000/yr", "30k", 22000, "10-15K") → delegated to needs_finder._parse_budget_band → canonical band • unrecognised → None (KI-091 null-overwrite guard then refuses to clobber a previously-captured band) """ if value is None: return None # Numeric → bucket directly (₹ amount per year). if isinstance(value, bool): return None if isinstance(value, (int, float)): v = int(value) if v < 15_000: return "under_15k" if v < 30_000: return "15k_30k" if v < 60_000: return "30k_60k" return "60k+" s = str(value).strip() if not s: return None # Already canonical — accept verbatim (case/space tolerant). norm = s.lower().replace(" ", "") if norm in _CANONICAL_BUDGET_BANDS: return norm # Strip ANNUAL-budget qualifiers FIRST. needs_finder._parse_inr_amount # treats a bare "yr" / "year" as an AGE context and refuses to read any # number as currency (the KI-161 age guard) — so the canonical live # phrasing "max ₹15,000/yr" / "₹15000 per year" parsed to None and the # band was never captured (Bug #109). These suffixes are unambiguous # PER-ANNUM budget markers here (the field is explicitly the premium # budget), not an age, so we drop them before delegating. import re as _re cleaned = _re.sub( r"\b(?:per\s*(?:year|annum)|p\.?\s*a\.?|/\s*(?:yr|year|annum)|" r"a\s*year|annually|yearly|/\s*yr)\b", " ", s, flags=_re.IGNORECASE, ) # Free-text / amount → canonical band via the shared parser. This # handles "max ₹15,000", "30k", "around 25000", "15-30k", "1 lakh". try: from backend.needs_finder import _parse_budget_band band = _parse_budget_band(cleaned) if band in _CANONICAL_BUDGET_BANDS: return band # Last-resort: if the qualifier strip left only the amount, try the # raw string too (covers phrasings the regex didn't anticipate). if cleaned != s: band = _parse_budget_band(s) if band in _CANONICAL_BUDGET_BANDS: return band except Exception: # noqa: BLE001 — parser optional; fall through to None pass return None def _coerce_bool(value: Any) -> Optional[bool]: """Tri-state bool coercion for parents_to_insure / parents_has_ped. Accepts: True / False / "yes" / "no" / "y" / "n" / "true" / "false" / 1 / 0. Anything else → None (so the KI-091 null-overwrite guard refuses to clobber a previously-captured value). """ if value is None: return None if isinstance(value, bool): return value if isinstance(value, (int, float)): return bool(value) s = str(value).strip().lower() if s in ("true", "yes", "y", "1"): return True if s in ("false", "no", "n", "0"): return False return None def _coerce_smoker(value: Any) -> Optional[bool]: """KI-275 (2026-05-15) — tri-state bool for smoker / tobacco use. Accepts: - True / "yes" / "true" / "smoker" / "smokes" / "tobacco" / 1 → True - False / "no" / "false" / "non-smoker" / "doesn't smoke" / 0 → False - None / "" / unclear → None Returning None lets the KI-091 null-overwrite guard in save_profile_field refuse to clobber a previously-captured value. """ if value is None: return None if isinstance(value, bool): return value if isinstance(value, (int, float)): return bool(value) s = str(value).strip().lower() if not s: return None _YES = { "yes", "y", "true", "1", "smoker", "smokes", "smoke", "i smoke", "tobacco", "tobacco user", "uses tobacco", "i do", "yep", "yeah", "yup", } _NO = { "no", "n", "false", "0", "non-smoker", "nonsmoker", "non smoker", "doesn't smoke", "does not smoke", "dont smoke", "don't smoke", "i don't", "i do not", "nope", "never", "no tobacco", } if s in _YES: return True if s in _NO: return False # Substring fall-through for prose ("I'm a non-smoker", "I smoke daily"). if any(tok in s for tok in ("non-smoker", "nonsmoker", "non smoker", "don't smoke", "doesn't smoke", "do not smoke", "no tobacco")): return False if any(tok in s for tok in ("smoker", "smokes", "tobacco")): return True return None def _coerce_desired_sum_insured(value: Any) -> Optional[int]: """Parse desired sum insured (cover amount) as integer rupees. Accepts: "10L" / "10 lakh" / "1 crore" / "1Cr" / 1000000 / "₹10,00,000" / "five lakh" (rejected — words not numerals). Delegates to `_parse_inr_amount` from needs_finder for the heavy lift, falls back to bare-digit extraction. Clamps to [50_000, 500_000_000] (₹50K floor, ₹50Cr ceiling) — anything outside is implausible for a health-insurance sum insured and likely a parse error. """ if value is None: return None if isinstance(value, bool): return None if isinstance(value, (int, float)): n = int(value) return max(50_000, min(500_000_000, n)) try: from backend.needs_finder import _parse_inr_amount parsed = _parse_inr_amount(str(value)) if parsed is not None: return max(50_000, min(500_000_000, int(parsed))) except Exception: # noqa: BLE001 pass # Last-ditch: strip non-digits (handles "₹10,00,000" if parser missed). try: digits = "".join(ch for ch in str(value) if ch.isdigit()) if digits: n = int(digits) if n >= 50_000: return min(500_000_000, n) except Exception: # noqa: BLE001 pass return None def _coerce_health_conditions(value: Any) -> Optional[list[str]]: """Always return list[str] lowercase, stripped, empties dropped. KI-Z6-NONE (2026-05-15): "none" / "no" / "n/a" — used to be stripped to `[]`, but downstream `save_profile_field` then hits the KI-091 null- overwrite guard (`normalized in (None, "", [])`) and refuses to persist the slot. Result: profile.health_conditions stays empty forever, `_profile_complete` returns False, retrieve_policies returns profile_incomplete, the brain loops, MAX_ITERATIONS exhausts, the bot emits "Sorry — I lost my train of thought" (W1 Turn 3 live blocker). Fix: keep the explicit-negation sentinel `["none"]` so: • the slot is non-empty → _profile_complete=True → retrieve fires • downstream consumers can still detect "no PED" via the literal token `"none"` in the list (callers already lowercase-compare). """ if value is None: return None if isinstance(value, str): # Gemini sometimes emits comma-joined strings instead of a list. items = [t.strip() for t in value.split(",")] elif isinstance(value, (list, tuple)): items = [str(t).strip() for t in value] else: items = [str(value).strip()] cleaned = [t.lower() for t in items if t] # Explicit-negation tokens — collapse to the canonical sentinel # `["none"]` rather than `[]` so the slot is captured, not blanked. _NEGATION = {"none", "no", "n/a", "na", "nil", "nothing", "healthy"} if cleaned and all(t in _NEGATION for t in cleaned): return ["none"] # Mixed input ("diabetes, none") — drop the negation noise, keep real # conditions. real = [t for t in cleaned if t not in _NEGATION] return real # --------------------------------------------------------------------------- # D2 (2026-05-15) — copay_pct + family_medical_history coercers # --------------------------------------------------------------------------- # Word-number map for "twenty", "ten" etc. (RULE 2.5 asks the user in # multiples of 10; Gemini sometimes echoes the user's word verbatim). _COPAY_WORD_TO_INT: dict[str, int] = { "zero": 0, "none": 0, "no": 0, "ten": 10, "fifteen": 15, "twenty": 20, "twenty five": 25, "twenty-five": 25, "thirty": 30, "forty": 40, "fifty": 50, } def _coerce_copay_pct(value: Any) -> Optional[int]: """Parse a co-pay tolerance percent, clamped to [0, 50]. Accepts: - int / float → int + clamp - "20", "20%", " 20 % ", "20 percent" → 20 - "no copay" / "zero" / "none" → 0 - word numbers like "twenty" → 20 - bool → blocked (KI-091 null-overwrite caution: bool is an int subclass) Returns None for unrecognised input so the null-overwrite guard in save_profile_field can refuse to clobber a previously-captured slot. """ if value is None: return None if isinstance(value, bool): return None if isinstance(value, (int, float)): n = int(value) return max(0, min(50, n)) s = str(value).strip().lower() if not s: return None # Explicit zero phrasings. if s in ("no", "none", "nil", "zero", "no copay", "no co-pay", "no co pay"): return 0 # Word-number lookup (exact match). if s in _COPAY_WORD_TO_INT: return _COPAY_WORD_TO_INT[s] # Strip "%" + "percent" + "pct". cleaned = ( s.replace("%", " ") .replace("percent", " ") .replace("pct", " ") .replace("copay", " ") .replace("co-pay", " ") .replace("co pay", " ") ) # Digit run. import re as _re m = _re.search(r"\d+(?:\.\d+)?", cleaned) if m: try: n = int(float(m.group(0))) return max(0, min(50, n)) except ValueError: return None # Word-number fall-through (substring on cleaned text). for word, num in _COPAY_WORD_TO_INT.items(): if word in cleaned.split(): return num return None # Alias map for family medical history — same canonicalisation logic as # health_conditions but kept inline so this slot stays self-contained. _FAMILY_HISTORY_ALIASES: dict[str, str] = { "bp": "hypertension", "high bp": "hypertension", "high-bp": "hypertension", "hi-bp": "hypertension", "high blood pressure": "hypertension", "blood pressure": "hypertension", "sugar": "diabetes", "diabetic": "diabetes", "type 2 diabetes": "diabetes", "type 1 diabetes": "diabetes", "heart attack": "heart", "heart disease": "heart", "cardiac": "heart", "cardiac disease": "heart", "stroke": "heart", "tumor": "cancer", "tumour": "cancer", "carcinoma": "cancer", } _FAMILY_HISTORY_NEGATION = { "none", "no", "n/a", "na", "nil", "nothing", "healthy", "no family history", "no history", "no medical history", } def _coerce_family_medical_history(value: Any) -> Optional[list[str]]: """Return list[str] lowercase canonical conditions running in BLOOD family. Accepts: - list / tuple of strings - comma-joined string ("cancer, diabetes") - "none" / "no family history" → [] Alias map collapses BP/sugar/cardiac/tumor → hypertension/diabetes/heart/ cancer respectively (same family as _coerce_health_conditions). Negation sentinels return `[]` since downstream pricing & retrieval BOTH treat an empty list as the "no family history" branch (different from health_ conditions where the explicit `["none"]` sentinel is needed for the profile-completeness gate). """ if value is None: return None if isinstance(value, str): items = [t.strip() for t in value.split(",")] elif isinstance(value, (list, tuple)): items = [str(t).strip() for t in value] else: items = [str(value).strip()] cleaned = [t.lower() for t in items if t] # Full-string negation collapses to []. if cleaned and all(t in _FAMILY_HISTORY_NEGATION for t in cleaned): return [] # Drop negation noise from mixed input ("cancer, none"). cleaned = [t for t in cleaned if t not in _FAMILY_HISTORY_NEGATION] # Canonicalise via alias map. canonical: list[str] = [] seen: set[str] = set() for t in cleaned: c = _FAMILY_HISTORY_ALIASES.get(t, t) if c and c not in seen: seen.add(c) canonical.append(c) return canonical __all__ = [ "save_profile_field", "retrieve_policies", "mark_recommendation", "SLOT_UNION", "union_snapshot", ]