Spaces:
Sleeping
Sleeping
| """FastAPI app β the backend API for the Insurance Sales Portfolio Expert. | |
| Run locally: | |
| uvicorn backend.main:app --reload --host 0.0.0.0 --port 8000 | |
| Interactive docs at http://localhost:8000/docs | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import base64 | |
| import json | |
| import logging | |
| import re | |
| import time | |
| import uuid | |
| from pathlib import Path | |
| from typing import Optional | |
| from fastapi import FastAPI, File, Form, Header, HTTPException, Request, UploadFile | |
| from fastapi.exceptions import RequestValidationError | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel, Field, field_validator | |
| from backend.config import settings | |
| from backend import nim_fallback | |
| from backend import brain_tools # KI-271 β SLOT_UNION-driven profile_dict in 3 endpoints | |
| from backend import sum_insured as _si # SI rationalisation (D1/D3) β source-quote corroboration | |
| from backend.providers.sarvam_stt import SarvamSTT | |
| from backend.providers.sarvam_tts import SarvamTTS | |
| # Single-LLM brain toggle. Off by default; flip via env var. When on, the | |
| # /api/chat hot path runs single_brain.handle_turn and falls back to | |
| # nim_fallback.handle_turn_fallback on any SingleBrainError (so users | |
| # always get a reply). When off, /api/chat routes directly through | |
| # nim_fallback.handle_turn_fallback. | |
| import os as _os # local alias to avoid stomping any later `import os` | |
| USE_SINGLE_BRAIN = _os.environ.get("USE_SINGLE_BRAIN", "false").lower() in ( | |
| "1", "true", "yes", "on", | |
| ) | |
| # Safety net for RULE 7. If Gemini does not call mark_recommendation when | |
| # the user clearly commits to a policy ("I'll go with that one", "let's do | |
| # #2", "buy this"), the post-turn detector below auto-calls | |
| # mark_recommendation against session.last_recommendation_ids[:1] so the | |
| # closure event is recorded for analytics. | |
| # Word-boundary anchored; case-insensitive at match-time. | |
| _CLOSER_KEYWORD_RE = re.compile( | |
| r"\b(go with|i'?ll take|i will take|let'?s do|let me get|sign me up|" | |
| r"purchase|buy this|i want to purchase|i'?ll go with|i want to buy)\b", | |
| re.IGNORECASE, | |
| ) | |
| # Singleton provider instances (initialized on first call) | |
| _stt: Optional[SarvamSTT] = None | |
| _tts: Optional[SarvamTTS] = None | |
| def get_stt() -> SarvamSTT: | |
| global _stt | |
| if _stt is None: | |
| _stt = SarvamSTT() | |
| return _stt | |
| def get_tts() -> SarvamTTS: | |
| global _tts | |
| if _tts is None: | |
| _tts = SarvamTTS() | |
| return _tts | |
| # ---------- log helpers ---------- | |
| LOG_DIR = settings.CORPUS_DIR.parent.parent / "logs" | |
| LOG_DIR.mkdir(exist_ok=True) | |
| TURNS_LOG = LOG_DIR / "turns.jsonl" | |
| def log_turn(event: dict) -> None: | |
| event["ts"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) | |
| with open(TURNS_LOG, "a") as f: | |
| f.write(json.dumps(event) + "\n") | |
| # ---------- API schemas ---------- | |
| class HealthResponse(BaseModel): | |
| status: str | |
| providers_ok: dict[str, bool] | |
| missing_keys: list[str] | |
| class TranscribeResponse(BaseModel): | |
| text: str | |
| language_code: Optional[str] = None | |
| confidence: Optional[float] = None | |
| latency_ms: int | |
| # KI-242 β When Sarvam STT fails, the endpoint returns HTTP 200 with | |
| # `text=""` plus these two fields set so the frontend can render a | |
| # friendly message instead of parsing raw httpx error strings. | |
| # error_code is a closed enum: rate_limit | service_unavailable | | |
| # network | auth | unknown. Absent on the success path. | |
| error_code: Optional[str] = None | |
| user_message: Optional[str] = None | |
| class CitationOut(BaseModel): | |
| policy_id: str | |
| policy_name: str | |
| insurer_slug: str | |
| page_start: int | |
| page_end: int | |
| source_url: str | |
| score: float | |
| class ChatRequest(BaseModel): | |
| user_text: str = Field(..., description="The user's question or utterance") | |
| session_id: Optional[str] = Field(None, description="Stable per-session ID for logging") | |
| chat_history: list[dict] = Field(default_factory=list, description="[{role, content}, ...]") | |
| profile: dict = Field(default_factory=dict, description="User profile (age, dependents, etc.)") | |
| policy_filter_ids: Optional[list[str]] = Field(None, description="Restrict retrieval to these policies") | |
| return_audio: bool = Field(False, description="If true, also return TTS audio (base64 WAV)") | |
| tts_language_code: str = Field("en-IN", description="Language for TTS playback") | |
| view_context: Optional[dict] = Field( | |
| None, | |
| description=( | |
| "Frontend-supplied snapshot of what the user is looking at right now: " | |
| "{active_view, active_policy_id, filters}. Injected into the system prompt " | |
| "so the bot can ground 'this policy' / 'these filters' references." | |
| ), | |
| ) | |
| class ChatResponse(BaseModel): | |
| reply_text: str | |
| citations: list[CitationOut] | |
| brain_used: str | |
| intent: str | |
| language: str | |
| latency_ms: int | |
| session_id: str | |
| audio_base64: Optional[str] = None | |
| audio_mime: Optional[str] = None # X8 β "audio/wav" | "audio/mp4" | "audio/webm" | |
| # TTS voice-output failures are surfaced to the client (text reply is | |
| # unaffected): the frontend renders a small inline "voice unavailable" | |
| # notice under the bot bubble. tts_error_code is a closed enum mirroring | |
| # the STT path's contract so the client never parses raw httpx text: | |
| # rate_limit β Sarvam 429 / insufficient_quota / no credits | |
| # service_unavailable β Sarvam 5xx / 503 | |
| # auth β Sarvam 401/403 / missing SARVAM_API_KEY | |
| # network β connect/read timeout, DNS, conn reset | |
| # unknown β anything else | |
| tts_error_code: Optional[str] = None | |
| tts_user_message: Optional[str] = None | |
| faithfulness_passed: bool = True | |
| faithfulness_reasons: list[str] = Field(default_factory=list) | |
| blocked: bool = False | |
| profile_updates: dict = Field( | |
| default_factory=dict, | |
| description=( | |
| "Any profile fields auto-extracted from the user's free-form message " | |
| "this turn (age, dependents, health_conditions, etc.). Frontend can " | |
| "flash an acknowledgment + refresh the completeness panel." | |
| ), | |
| ) | |
| # Whether the 7 required profile slots are captured. Surfaced in the | |
| # primary chat response so the UI can flip to 100% in the same render | |
| # cycle without a second roundtrip to /api/profile/completeness. | |
| # Computed via brain_tools._REQUIRED_FOR_READY (same slot list used by | |
| # retrieve_policies' profile-complete gate) so client + server never | |
| # disagree. | |
| profile_complete: bool = Field( | |
| False, | |
| description=( | |
| "True when every required profile slot (name, age, dependents, " | |
| "location_tier, income_band, primary_goal, health_conditions) is " | |
| "non-empty on the live session.profile at end-of-turn." | |
| ), | |
| ) | |
| # KI-Z7 (2026-05-15) β Feature B. True when single_brain.handle_turn's | |
| # turn-1 name heuristic matched a stored profile and hydrated the | |
| # session. Frontend renders a "Welcome back, <name>!" banner with the | |
| # last predicted-premium band when this flips True on the first turn. | |
| returning_user_recalled: bool = Field( | |
| False, | |
| description=( | |
| "True iff a stored named-profile was matched + hydrated on the " | |
| "current turn (typically only on turn 1)." | |
| ), | |
| ) | |
| class TTSRequest(BaseModel): | |
| text: str | |
| language_code: str = "en-IN" | |
| speaker: Optional[str] = None | |
| class PolicyEntry(BaseModel): | |
| name: str | |
| source_url: str = "" # PDF URL, verified at download time | |
| class InsurerCoverage(BaseModel): | |
| slug: str | |
| name: str | |
| home_url: str # insurer's main website (manually curated, verified) | |
| policy_count: int | |
| sample_policies: list[PolicyEntry] | |
| # KI-141 (2026-05-15) β backward-compatible default empty. Per-product | |
| # alias list isn't actually surfaced on the coverage card today, but the | |
| # field is mirrored from MarketplacePolicy so callers that union the two | |
| # endpoints see a consistent schema. Total aliases collapsed into this | |
| # insurer's parents β useful for QA + future UI surfacing. | |
| alias_count: int = 0 | |
| class CoverageResponse(BaseModel): | |
| total_chunks: int | |
| total_policies: int | |
| # KI-130 (2026-05-15) β totals reflect REAL insurers and their products | |
| # ONLY. The 'regulatory' slug (18 IRDAI/NHA documents) is excluded from | |
| # the marketplace surface entirely; those documents are still retrieved | |
| # and cited inside chat answers, they just don't belong in a "policy | |
| # marketplace" UI. | |
| total_insurers: int | |
| insurers: list[InsurerCoverage] | |
| class UploadResponse(BaseModel): | |
| policy_id: str | |
| policy_name: str | |
| chunks_added: int | |
| pages_indexed: int | |
| elapsed_ms: int | |
| # #47 (2026-05-21) β UIN net-new dedup. When the uploaded PDF's IRDAI | |
| # UIN already belongs to a catalogue policy, the upload is NOT indexed | |
| # as a new card; these fields point the caller at the existing policy. | |
| already_in_catalogue: bool = False | |
| existing_policy_id: Optional[str] = None | |
| existing_policy_name: Optional[str] = None | |
| # --------------------------------------------------------------------------- | |
| # #47 (2026-05-21) β UIN net-new dedup for user uploads. Before a freshly | |
| # uploaded PDF is indexed as a brand-new marketplace card, check whether its | |
| # IRDAI UIN already belongs to a catalogued policy; if so it is NOT net-new | |
| # and the caller is pointed at the existing card. All imports are lazy β the | |
| # upload route imports `re` locally, so `re` is not module-level here. | |
| _UIN_PATTERN = r"\b[A-Z]{5,9}\d{5}V\d{6}\b" | |
| _catalogue_uin_cache = None # type: Optional[dict] | |
| def _catalogue_uin_index() -> dict: | |
| """Map every catalogue policy's IRDAI UIN -> (policy_id, policy_name). | |
| Built once from 40-data/policy_facts/*.json, then cached.""" | |
| global _catalogue_uin_cache | |
| if _catalogue_uin_cache is not None: | |
| return _catalogue_uin_cache | |
| import json as _json | |
| import pathlib as _pl | |
| import re as _re | |
| def _find_uin(o): | |
| if isinstance(o, dict): | |
| if "uin_code" in o: | |
| v = o["uin_code"] | |
| return v.get("value") if isinstance(v, dict) else v | |
| for x in o.values(): | |
| r = _find_uin(x) | |
| if r: | |
| return r | |
| elif isinstance(o, list): | |
| for x in o: | |
| r = _find_uin(x) | |
| if r: | |
| return r | |
| return None | |
| idx: dict = {} | |
| pf_dir = _pl.Path(__file__).resolve().parent.parent / "40-data" / "policy_facts" | |
| for fp in sorted(pf_dir.glob("*.json")): | |
| try: | |
| uin = _find_uin(_json.loads(fp.read_text())) | |
| except Exception: | |
| continue | |
| if not uin or not isinstance(uin, str): | |
| continue | |
| uin = uin.strip().upper() | |
| # Only index modern-format IRDAI UINs β those are the only ones the | |
| # uploaded-text matcher (_UIN_PATTERN) can ever extract. Legacy | |
| # registration codes (e.g. "IRDAI/HLT/CTTK/...") are unmatchable, | |
| # so indexing them would be dead weight. | |
| if not _re.fullmatch(r"[A-Z]{5,9}\d{5}V\d{6}", uin): | |
| continue | |
| stem = fp.stem | |
| for suf in ("__wordings", "__cis", "__brochure", "__prospectus"): | |
| if stem.endswith(suf): | |
| stem = stem[: -len(suf)] | |
| nm = stem.split("__")[-1].replace("-", " ").title() | |
| idx.setdefault(uin, (stem, nm)) | |
| _catalogue_uin_cache = idx | |
| return idx | |
| def _match_catalogue_uin(text: str): | |
| """Return (policy_id, policy_name) if `text` carries the IRDAI UIN of an | |
| already-catalogued policy; else None.""" | |
| import re as _re | |
| idx = _catalogue_uin_index() | |
| # Case-insensitive extraction β a UIN may appear in any case in the | |
| # uploaded text / after PDF extraction; normalise to upper for lookup. | |
| for u in {m.upper() for m in _re.findall(_UIN_PATTERN, text or "", _re.IGNORECASE)}: | |
| if u in idx: | |
| return idx[u] | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Quarantine TTL auto-purge (2026-05-16) | |
| # | |
| # User-uploaded PDFs land in the SEPARATE `user_uploads_quarantine` Chroma | |
| # collection, scoped per session_id. They are intentionally ephemeral β NOT | |
| # durable corpus. Two risks if they linger forever: | |
| # 1. The quarantine HNSW index grows unbounded across thousands of one-off | |
| # uploads (a soft version of the 2026-05-14 link_lists.bin bloat). | |
| # 2. A user's private policy document stays queryable long after their | |
| # session is over. | |
| # | |
| # Mechanism (mirrors the existing in-memory ledgers in security.py / | |
| # session_state.py β process-local, resets on restart, v2 β Redis): | |
| # - `_quarantine_last_seen`: {session_id: epoch_seconds} updated on every | |
| # successful upload via `_quarantine_touch`. | |
| # - A periodic asyncio task (`_quarantine_purge_loop`) sweeps every | |
| # settings.QUARANTINE_PURGE_INTERVAL_SEC and deletes all quarantine | |
| # chunks whose session_id has had no upload for | |
| # settings.QUARANTINE_TTL_SECONDS (default 24h). | |
| # Deletion is `where={"session_id": sid}` β strictly scoped, can never touch | |
| # the curated `policies` collection (different collection entirely). | |
| # --------------------------------------------------------------------------- | |
| _quarantine_last_seen: dict[str, float] = {} | |
| _quarantine_lock = asyncio.Lock() | |
| def _quarantine_touch(session_id: str, policy_id: str = "") -> None: | |
| """Record that `session_id` just wrote to the quarantine collection. | |
| Synchronous + best-effort: bookkeeping must never break an upload. | |
| """ | |
| try: | |
| if session_id: | |
| _quarantine_last_seen[session_id] = time.time() | |
| except Exception: # noqa: BLE001 β bookkeeping never breaks the upload | |
| pass | |
| def _purge_expired_quarantine(now: Optional[float] = None) -> int: | |
| """Delete quarantine chunks for every session idle longer than the TTL. | |
| Returns the number of sessions purged. Pure/synchronous so it can be | |
| unit-tested directly and run via asyncio.to_thread (Chroma client is | |
| blocking). Never raises β a Chroma hiccup must not crash the loop. | |
| """ | |
| now = now if now is not None else time.time() | |
| ttl = settings.QUARANTINE_TTL_SECONDS | |
| expired = [ | |
| sid for sid, ts in list(_quarantine_last_seen.items()) | |
| if now - ts >= ttl | |
| ] | |
| if not expired: | |
| return 0 | |
| purged = 0 | |
| try: | |
| from rag.ingest import get_quarantine_collection | |
| coll = get_quarantine_collection() | |
| except Exception as e: # noqa: BLE001 | |
| logging.warning( | |
| "quarantine TTL: could not open quarantine collection (%s: %s)", | |
| type(e).__name__, e, | |
| ) | |
| return 0 | |
| for sid in expired: | |
| try: | |
| coll.delete(where={"session_id": sid}) | |
| _quarantine_last_seen.pop(sid, None) | |
| purged += 1 | |
| logging.info( | |
| "quarantine TTL: purged session %s (idle > %ds)", | |
| sid[:12], ttl, | |
| ) | |
| except Exception as e: # noqa: BLE001 β one bad delete must not abort the sweep | |
| logging.warning( | |
| "quarantine TTL: delete(where session=%s) failed (%s: %s)", | |
| sid[:12], type(e).__name__, e, | |
| ) | |
| return purged | |
| async def _quarantine_purge_loop() -> None: | |
| """Periodic background sweep β registered at startup. Mirrors the | |
| llm_health.background_probe_loop pattern (sleep β work β repeat, | |
| swallow all errors so the loop never dies).""" | |
| interval = max(60, settings.QUARANTINE_PURGE_INTERVAL_SEC) | |
| while True: | |
| try: | |
| await asyncio.sleep(interval) | |
| await asyncio.to_thread(_purge_expired_quarantine) | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception as e: # noqa: BLE001 β loop must survive any error | |
| logging.warning( | |
| "quarantine TTL purge loop iteration failed (%s: %s)", | |
| type(e).__name__, e, | |
| ) | |
| # Single source of truth for "is this profile ready to recommend against". | |
| # brain_tools._profile_complete uses the same _REQUIRED_FOR_READY tuple; we | |
| # _FEATURE_B_SLOT_LIST + _every_filled_slot_was_set_this_turn were the | |
| # heuristic that distinguished "first-time capture on turn 1" from | |
| # "stored profile recalled on turn 1" for the returning-user banner. | |
| # Removed in ADR-043 (2026-05-27) β no cross-session recall, so there is | |
| # no banner to flip. | |
| def _compute_profile_complete(session_id: str) -> bool: | |
| """Read the live session profile and return True iff every required slot | |
| is populated. Tolerant of every failure mode (no session yet, session | |
| state import explodes, profile missing attrs) β returns False on any | |
| error so the frontend NEVER sees a stale `true` from a partial profile. | |
| """ | |
| try: | |
| from backend.session_state import get_session | |
| from backend.brain_tools import _profile_complete | |
| sess = get_session(session_id) | |
| return bool(_profile_complete(sess.profile)) | |
| except Exception: # noqa: BLE001 β never block a chat reply for this | |
| return False | |
| # ---------- app ---------- | |
| app = FastAPI( | |
| title="Insurance Sales Portfolio Expert API", | |
| description="Backend for the Sarvam AI take-home assignment.", | |
| version="0.1.0", | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # tighten for production deploy | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Bug B (2026-05-15) β /api/chat raw 422 leak. Live smoke saw the frontend | |
| # render `{"detail":[{"type":"missing","loc":["body","user_text"]...}]}` as | |
| # the bot reply because a malformed POST (missing user_text) hit FastAPI's | |
| # default RequestValidationError handler β that body bypasses our | |
| # ChatResponse envelope and the frontend has no shape-mapping for it. | |
| # We intercept the chat endpoint specifically and return a clean | |
| # ChatResponse-shaped JSON so frontend parsing never errors out. Other | |
| # endpoints keep FastAPI's default 422 behaviour (which their callers | |
| # already handle). | |
| async def _validation_exception_handler(request: Request, exc: RequestValidationError): | |
| if request.url.path == "/api/chat": | |
| logging.warning( | |
| "chat endpoint received malformed body β returning graceful " | |
| "ChatResponse-shaped 200 instead of raw 422. errors=%r", | |
| exc.errors()[:3], | |
| ) | |
| return JSONResponse( | |
| status_code=200, | |
| content={ | |
| "reply_text": ( | |
| "Sorry, something went wrong β try again." | |
| ), | |
| "citations": [], | |
| "brain_used": "error_fallback", | |
| "intent": "qa", | |
| "language": "en", | |
| "latency_ms": 0, | |
| "session_id": "", | |
| "audio_base64": None, | |
| "audio_mime": None, | |
| "faithfulness_passed": True, | |
| "faithfulness_reasons": [], | |
| "blocked": False, | |
| "profile_updates": {}, | |
| "profile_complete": False, | |
| }, | |
| ) | |
| # Default behaviour for every other endpoint. | |
| return JSONResponse( | |
| status_code=422, | |
| content={"detail": exc.errors()}, | |
| ) | |
| # ---------- Admin panel + LLM health background loop ---------- | |
| # Mount the password-gated admin endpoints. Unauthorized callers get | |
| # 401 Unauthorized. Access is gated by a strong password only (no IP | |
| # allowlist, which would lock the operator out when switching networks | |
| # without adding real security). | |
| from backend import admin as _admin_router_module | |
| app.include_router(_admin_router_module.router) | |
| async def _startup_load_admin_overrides(): | |
| """Re-apply any persisted chain reorderings from the previous process.""" | |
| import asyncio | |
| from pathlib import Path | |
| override_path = settings.DATA_DIR / "admin_overrides.json" | |
| if override_path.exists(): | |
| try: | |
| overrides = json.loads(override_path.read_text()) | |
| from backend.providers import nvidia_nim_llm as nim | |
| name_map = {"brain": "BRAIN_CHAIN", "fast_brain": "FAST_BRAIN_CHAIN", "judge": "JUDGE_CHAIN"} | |
| for role, attr in name_map.items(): | |
| if role in overrides and isinstance(overrides[role], list): | |
| setattr(nim, attr, list(overrides[role])) | |
| except Exception: | |
| pass # bad override file shouldn't crash boot β fall back to defaults | |
| async def _startup_llm_health_probe(): | |
| """Launch the background probe loop β pings every NIM model every 5 min, | |
| auto-marks 'down' models, NimChainLLM uses filter_chain() to skip them.""" | |
| import asyncio | |
| from backend import llm_health | |
| asyncio.create_task(llm_health.background_probe_loop()) | |
| async def _startup_quarantine_ttl_purge(): | |
| """Launch the periodic quarantine TTL sweep (2026-05-16). | |
| Evicts user-uploaded PDF chunks whose session has been idle longer than | |
| settings.QUARANTINE_TTL_SECONDS so the quarantine index can't grow | |
| unbounded and stale private docs don't linger. Mirrors the | |
| _startup_llm_health_probe fire-and-forget create_task pattern. | |
| """ | |
| asyncio.create_task(_quarantine_purge_loop()) | |
| async def _startup_upload_extraction_backfill(): | |
| """ADR-044 (2026-05-27) β on every container boot, run LLM-assisted | |
| extraction on any persisted upload that doesn't yet have a | |
| `rag/extracted/<policy_id>.json` file. This upgrades old uploads | |
| (persisted before the LLM-extraction pipeline was wired, OR before | |
| a fix to the pipeline was deployed) to the same data depth a fresh | |
| upload now gets. Idempotent: extracts that already exist are skipped. | |
| Fire-and-forget so it doesn't delay app readiness. | |
| """ | |
| from backend import uploaded_docs as _udocs | |
| asyncio.create_task(_udocs.backfill_extractions(force=False)) | |
| async def _startup_single_brain_warmup(): | |
| """Pre-warm the Gemini single-brain connection so the FIRST /api/chat turn | |
| doesn't eat 4-5s of cold-start latency (TLS + auth + cache init). | |
| Wrapped in try/except β warmup is an optimization, not a boot | |
| requirement. A failed warmup must NEVER crash the server. | |
| """ | |
| try: | |
| from backend import single_brain | |
| latency = await single_brain.warmup() | |
| if latency is not None: | |
| logging.info( | |
| "single_brain warmup completed at boot (%.2fs)", latency, | |
| ) | |
| except Exception as e: # noqa: BLE001 | |
| logging.warning( | |
| "single_brain warmup raised at top level (%s: %s) β boot continues", | |
| type(e).__name__, e, | |
| ) | |
| async def _startup_reingest_uploaded_docs(): | |
| """#52 β re-materialise persisted uploaded-policy docs after a restart. | |
| On the HF Space, rag/vectors is the EPHEMERAL container FS (KI-119): | |
| every rebuild pulls a fresh Chroma snapshot, so an uploaded doc's | |
| chunks indexed last boot are GONE. The PDF + curated-facts JSON + chunk | |
| payload were persisted to the /data disk (settings.UPLOADED_DOCS_DIR), | |
| so here we re-embed those chunks back into the fresh `policies` | |
| collection. The cards themselves reappear automatically because | |
| _load_curated_facts merges the persisted JSON records. | |
| Wrapped so a re-ingest hiccup never crashes boot β but it logs LOUDLY | |
| (no silent failure): an uploaded card with no retrievable chunks is a | |
| real degradation operators must see. | |
| """ | |
| try: | |
| from backend import uploaded_docs as _udocs | |
| summary = await _udocs.reingest_persisted_into_policies() | |
| if summary.get("docs") or summary.get("skipped"): | |
| logging.info( | |
| "#52 startup re-ingest: %d uploaded docs / %d chunks " | |
| "re-indexed into `policies` (%d skipped)", | |
| summary.get("docs", 0), summary.get("chunks", 0), | |
| summary.get("skipped", 0), | |
| ) | |
| # Bust the #40 grade cache so the restored cards grade on first hit. | |
| try: | |
| with _MG_LOCK: | |
| _MG_CACHE["sig"] = None | |
| _MG_CACHE["index"] = None | |
| except Exception: # noqa: BLE001 | |
| pass | |
| except Exception as e: # noqa: BLE001 β re-ingest failure must not block boot | |
| logging.warning( | |
| "#52 startup re-ingest FAILED (%s: %s) β uploaded-doc cards " | |
| "will show but their chunks are NOT retrievable until next " | |
| "successful re-ingest", | |
| type(e).__name__, e, | |
| ) | |
| async def _startup_purge_dangling_profile_chunks(): | |
| """KI-117 β boot-time self-heal of dangling `doc_type='profile'` chunks. | |
| Background: KI-102's earliest deploy wrote a `profile_anonymous` chunk | |
| WITHOUT a `session_id` metadata field. That legacy row poisoned every | |
| subsequent retrieval whose `where` clause referenced session_id, because | |
| Chroma raises when a filtered row is missing the filtered key. KI-112 | |
| added input guards so no new bad rows can be written, and the local DB | |
| was cleaned manually. But the HF Space carries its OWN copy of the | |
| Chroma DB and still contains the dangling row. | |
| This handler scans the collection for any `doc_type='profile'` chunks | |
| whose metadata lacks a non-empty `session_id` and deletes them. Runs | |
| idempotently β if there are no bad rows, it's a no-op. After HF rebuilds | |
| with this code, the boot task self-heals HF's DB on first request. | |
| Wrapped in try/except so a Chroma hiccup never crashes boot. | |
| """ | |
| def _do_purge() -> None: | |
| from rag.retrieve import get_collection | |
| coll = get_collection() | |
| try: | |
| res = coll.get( | |
| where={"doc_type": "profile"}, | |
| limit=10000, | |
| include=["metadatas"], | |
| ) | |
| except Exception as e: | |
| logging.warning( | |
| "KI-117: profile-chunk scan failed (%s: %s) β skipping cleanup", | |
| type(e).__name__, e, | |
| ) | |
| return | |
| ids = res.get("ids") or [] | |
| metas = res.get("metadatas") or [] | |
| bad_ids: list[str] = [] | |
| for cid, meta in zip(ids, metas): | |
| # KI-118 (2026-05-15) β profile chunks are now keyed by name_slug; | |
| # accept EITHER a non-empty name_slug (new) OR a non-empty | |
| # session_id (legacy KI-102 row) as proof-of-ownership. A profile | |
| # chunk with neither key is the dangling-row corruption case and | |
| # must be purged. | |
| slug = (meta or {}).get("name_slug") | |
| sid = (meta or {}).get("session_id") | |
| slug_ok = isinstance(slug, str) and slug.strip() | |
| sid_ok = isinstance(sid, str) and sid.strip() | |
| if not (slug_ok or sid_ok): | |
| bad_ids.append(cid) | |
| if bad_ids: | |
| try: | |
| coll.delete(ids=bad_ids) | |
| logging.info( | |
| "KI-117: purged %d dangling profile chunks at boot (ids=%s)", | |
| len(bad_ids), | |
| bad_ids[:10] + (["..."] if len(bad_ids) > 10 else []), | |
| ) | |
| except Exception as e: | |
| logging.warning( | |
| "KI-117: delete(ids=...) failed (%s: %s) β bad rows remain", | |
| type(e).__name__, e, | |
| ) | |
| return | |
| else: | |
| logging.info("KI-117: no dangling profile chunks found (DB clean)") | |
| try: | |
| total = coll.count() | |
| logging.info("KI-117: total chunks after cleanup: %d", total) | |
| except Exception as e: | |
| logging.warning( | |
| "KI-117: post-cleanup count() failed (%s: %s)", | |
| type(e).__name__, e, | |
| ) | |
| try: | |
| await asyncio.to_thread(_do_purge) | |
| except Exception as e: | |
| # Belt + suspenders β boot must never crash. | |
| logging.warning( | |
| "KI-117: boot cleanup raised at top level (%s: %s) β continuing boot", | |
| type(e).__name__, e, | |
| ) | |
| async def _startup_purge_dangling_profile_chunks_handler(): | |
| """KI-117 β register the boot-time cleanup as a FastAPI startup hook.""" | |
| await _startup_purge_dangling_profile_chunks() | |
| async def health(): | |
| missing = settings.validate() | |
| # Post-D-019 the stack is Sarvam (voice + Indic) + NVIDIA NIM (brain + | |
| # judge). GROQ + OpenRouter were retired; don't reference them here or | |
| # this endpoint AttributeError's on every call. | |
| providers_ok = { | |
| "sarvam": bool(settings.SARVAM_API_KEY), | |
| "nvidia_nim": bool(settings.NVIDIA_NIM_API_KEY), | |
| } | |
| return HealthResponse( | |
| status="ok" if not missing else "degraded", | |
| providers_ok=providers_ok, | |
| missing_keys=missing, | |
| ) | |
| # KI-096 β public deploy-verification endpoint. No auth (deliberate) so any | |
| # caller can confirm which commit the HF Space is actually serving without | |
| # needing the admin password. Cached at module import so we don't spawn | |
| # `git` per request. | |
| def _compute_build_sha() -> str: | |
| import os | |
| import subprocess | |
| env_sha = os.environ.get("BUILD_SHA") or os.environ.get("HF_SPACE_GIT_REV") | |
| if env_sha: | |
| return env_sha[:12] | |
| try: | |
| out = subprocess.check_output( | |
| ["git", "rev-parse", "HEAD"], | |
| cwd=Path(__file__).resolve().parent.parent, | |
| stderr=subprocess.DEVNULL, | |
| timeout=2, | |
| ).decode().strip() | |
| return out[:12] if out else "unknown" | |
| except Exception: | |
| return "unknown" | |
| _BUILD_SHA = _compute_build_sha() | |
| _BUILD_STARTED_AT = time.time() | |
| async def version(): | |
| """Public deploy-verification endpoint β no auth required. | |
| Returns the git SHA the running app was built from + the process start | |
| timestamp. Used by deploy probes (and humans) to confirm which commit | |
| HF Space is actually serving. The admin /api/admin/* endpoints are | |
| password-gated (KI-097) and return 401; this endpoint is the auth-free | |
| escape hatch for deploy verification. | |
| """ | |
| return { | |
| "sha": _BUILD_SHA, | |
| "started_at": _BUILD_STARTED_AT, | |
| "uptime_s": round(time.time() - _BUILD_STARTED_AT, 1), | |
| } | |
| async def transcribe( | |
| file: UploadFile = File(...), | |
| language_code: Optional[str] = Form(None), | |
| ): | |
| """Speech-to-text. Accepts an audio file upload (WAV/MP3/etc.). | |
| KI-242 β Sarvam errors are classified into a closed `error_code` enum | |
| and the endpoint always returns HTTP 200 with a friendly `user_message` | |
| on failure. The frontend never parses raw httpx text. 429 (rate limit) | |
| is retried ONCE with a 2 s backoff before being surfaced as | |
| `error_code: "rate_limit"`. | |
| """ | |
| import httpx as _httpx | |
| from backend.providers.sarvam_stt import ( | |
| classify_stt_exception, | |
| STT_ERROR_USER_MESSAGES, | |
| STT_ERROR_RATE_LIMIT, | |
| ) | |
| t0 = time.time() | |
| audio_bytes = await file.read() | |
| ext = (file.filename or "audio.wav").rsplit(".", 1)[-1].lower() | |
| audio_format = ( | |
| ext if ext in ("wav", "mp3", "flac", "ogg", "m4a", "webm", "opus", "mp4") | |
| else "wav" | |
| ) | |
| async def _try_once(): | |
| return await get_stt().transcribe( | |
| audio_bytes=audio_bytes, | |
| audio_format=audio_format, | |
| language_code=language_code, | |
| ) | |
| last_exc: Optional[BaseException] = None | |
| try: | |
| result = await _try_once() | |
| except Exception as e: # noqa: BLE001 β classifier narrows | |
| last_exc = e | |
| # 429-only single retry with 2s backoff, mirroring KI-242. Only retry | |
| # on a positively identified rate-limit; other failures surface fast. | |
| is_rate_limited = ( | |
| isinstance(e, _httpx.HTTPStatusError) | |
| and e.response is not None | |
| and e.response.status_code == 429 | |
| ) | |
| if is_rate_limited: | |
| await asyncio.sleep(2.0) | |
| try: | |
| result = await _try_once() | |
| last_exc = None | |
| except Exception as e2: # noqa: BLE001 | |
| last_exc = e2 | |
| latency = int((time.time() - t0) * 1000) | |
| if last_exc is not None: | |
| code = classify_stt_exception(last_exc) | |
| # Log the underlying error server-side so we keep diagnostics, but | |
| # never leak the raw httpx string to the user-facing response. | |
| logging.warning( | |
| "STT failed: error_code=%s exc=%s: %s", | |
| code, | |
| type(last_exc).__name__, | |
| last_exc, | |
| ) | |
| # Force rate_limit code when the retry-arm exhausted on 429 too. | |
| if ( | |
| isinstance(last_exc, _httpx.HTTPStatusError) | |
| and last_exc.response is not None | |
| and last_exc.response.status_code == 429 | |
| ): | |
| code = STT_ERROR_RATE_LIMIT | |
| return TranscribeResponse( | |
| text="", | |
| language_code=language_code, | |
| confidence=0.0, | |
| latency_ms=latency, | |
| error_code=code, | |
| user_message=STT_ERROR_USER_MESSAGES.get( | |
| code, STT_ERROR_USER_MESSAGES["unknown"] | |
| ), | |
| ) | |
| return TranscribeResponse( | |
| text=result.text, | |
| language_code=result.language_code, | |
| confidence=result.confidence, | |
| latency_ms=latency, | |
| ) | |
| async def chat(req: ChatRequest, request: Request): | |
| session_id = req.session_id or str(uuid.uuid4()) | |
| # X8 (2026-05-15) β frontend sends X-Preferred-Codec so the browser can | |
| # decode the TTS payload natively (WebM/Opus on Chrome+Firefox, MP4/AAC on | |
| # Safari, WAV as universal fallback). Default to WAV if the header is | |
| # missing or invalid. | |
| _allowed_codecs = {"audio/wav", "audio/mp4", "audio/webm"} | |
| # KI-278 (2026-05-16) β the frontend sends a full MediaSource codec | |
| # string, e.g. "audio/webm; codecs=opus". The previous exact-match | |
| # against _allowed_codecs NEVER matched that (the "; codecs=opus" | |
| # suffix), so webm/opus-capable browsers were ALWAYS silently | |
| # downgraded to wav. Strip the codec parameter + whitespace before | |
| # the membership test so the negotiated container is honoured. | |
| preferred_codec = ( | |
| request.headers.get("X-Preferred-Codec", "audio/wav") or "audio/wav" | |
| ).split(";")[0].strip().lower() | |
| if preferred_codec not in _allowed_codecs: | |
| preferred_codec = "audio/wav" | |
| t_chat0 = time.time() | |
| # Never let an inner TimeoutError / unhandled exception bubble out of | |
| # handle_turn as a 500. The whole call is wrapped in an outer 45s | |
| # budget so even a pathological hang inside handle_turn surfaces as a | |
| # graceful reply, not a connection-reset to the user. 45s is generous | |
| # but tighter than HF Space's gateway timeout, so the user always gets | |
| # a response. | |
| try: | |
| if USE_SINGLE_BRAIN: | |
| # One Gemini call per turn with native function-calling. | |
| # Falls back to nim_fallback on SingleBrainError so a missing | |
| # GOOGLE_API_KEY / model outage never breaks the chat. | |
| from backend import single_brain | |
| from backend.session_state import get_session | |
| _sb_session = get_session(session_id) | |
| # Once a session has had ANY successful single_brain turn, it | |
| # must stay on single_brain for the rest of its lifetime. | |
| # Switching brains mid-stream would discard everything | |
| # single_brain captured in last_recommendation_ids / | |
| # last_retrieved_chunks / slug_to_insurer. Sticky check below. | |
| _sb_was_sticky = getattr(_sb_session, "single_brain_sticky", False) | |
| try: | |
| turn = await asyncio.wait_for( | |
| single_brain.handle_turn( | |
| session=_sb_session, | |
| user_text=req.user_text, | |
| chat_history=req.chat_history, | |
| # KI-330 (2026-05-27) β pipe view_context through so | |
| # single_brain can build an ACTIVE POLICY DIVE-IN | |
| # block when the frontend tells us the user is | |
| # focused on a specific policy. Field was declared | |
| # on ChatRequest since launch but never consumed. | |
| view_context=req.view_context or None, | |
| ), | |
| timeout=45.0, | |
| ) | |
| # First successful single_brain turn stamps the flag so | |
| # every subsequent turn on this session is locked in. | |
| try: | |
| _sb_session.single_brain_sticky = True | |
| except Exception: # noqa: BLE001 | |
| pass | |
| except single_brain.SingleBrainError as _sb_err: | |
| if _sb_was_sticky: | |
| # Session already had a clean single_brain turn. Do NOT | |
| # cross-fade to the fallback brain (loses turn state + | |
| # frontend sees the brain hop). Emit a graceful retry | |
| # prompt instead. | |
| logging.warning( | |
| "single_brain failed on STICKY session (session=%s); " | |
| "emitting graceful retry, NOT falling back: %s", | |
| session_id, _sb_err, | |
| ) | |
| turn = single_brain.TurnResult( | |
| reply_text=( | |
| # 2026-05-27 β honest copy. Previous text falsely | |
| # blamed comprehension ("could you say that | |
| # again?") when the actual cause was an upstream | |
| # Gemini transient (HTTP 503 / timeout / etc.) | |
| # that survived the internal retry schedule. | |
| # Tells the user exactly what to do (resend the | |
| # same message) and locates blame correctly. | |
| "My model service had a brief blip on that turn " | |
| "β please send the same message again, it should " | |
| "go through now." | |
| ), | |
| citations=[], | |
| retrieved_chunk_ids=[], | |
| brain_used="single_brain::sticky_graceful_retry", | |
| intent="qa", | |
| language="en", | |
| latency_ms=int((time.time() - t_chat0) * 1000), | |
| raw_reply=f"SingleBrainError: {_sb_err}", | |
| faithfulness_passed=True, | |
| faithfulness_reasons=[], | |
| blocked=False, | |
| profile_updates={}, | |
| ) | |
| else: | |
| logging.warning( | |
| "single_brain failed, falling back to nim_fallback " | |
| "(session=%s): %s", | |
| session_id, _sb_err, | |
| ) | |
| turn = await asyncio.wait_for( | |
| nim_fallback.handle_turn_fallback( | |
| session=_sb_session, | |
| user_text=req.user_text, | |
| chat_history=req.chat_history, | |
| ), | |
| timeout=20.0, | |
| ) | |
| else: | |
| # When USE_SINGLE_BRAIN is off, route directly through the | |
| # minimal NIM fallback so the bot still serves a reply. | |
| from backend.session_state import get_session as _get_session | |
| turn = await asyncio.wait_for( | |
| nim_fallback.handle_turn_fallback( | |
| session=_get_session(session_id), | |
| user_text=req.user_text, | |
| chat_history=req.chat_history, | |
| ), | |
| timeout=20.0, | |
| ) | |
| except asyncio.TimeoutError: | |
| logging.warning( | |
| "handle_turn outer TimeoutError; returning graceful reply (session=%s)", | |
| session_id, | |
| ) | |
| log_turn({ | |
| "session_id": session_id, | |
| "user_text": req.user_text, | |
| "error": "asyncio.TimeoutError (outer 45s budget or inner wait_for)", | |
| "graceful": True, | |
| }) | |
| return ChatResponse( | |
| reply_text=( | |
| "That took longer than expected β let me try a smaller answer. " | |
| "Could you ask me again, maybe more specifically?" | |
| ), | |
| citations=[], | |
| brain_used="timeout_fallback", | |
| intent="qa", | |
| language="en", | |
| latency_ms=int((time.time() - t_chat0) * 1000), | |
| session_id=session_id, | |
| audio_base64=None, | |
| faithfulness_passed=True, | |
| faithfulness_reasons=[], | |
| blocked=False, | |
| profile_updates={}, | |
| profile_complete=_compute_profile_complete(session_id), | |
| ) | |
| except Exception as e: | |
| logging.exception( | |
| "handle_turn unhandled exception (session=%s)", session_id | |
| ) | |
| log_turn({ | |
| "session_id": session_id, | |
| "user_text": req.user_text, | |
| "error": f"{type(e).__name__}: {e}", | |
| "graceful": True, | |
| }) | |
| return ChatResponse( | |
| reply_text=( | |
| "Hmm, something went wrong on my end. Could you try once more?" | |
| ), | |
| citations=[], | |
| brain_used="error_fallback", | |
| intent="qa", | |
| language="en", | |
| latency_ms=int((time.time() - t_chat0) * 1000), | |
| session_id=session_id, | |
| audio_base64=None, | |
| faithfulness_passed=True, | |
| faithfulness_reasons=[], | |
| blocked=False, | |
| profile_updates={}, | |
| profile_complete=_compute_profile_complete(session_id), | |
| ) | |
| # Server-side closer-keyword safety net for RULE 7. | |
| # If the user clearly committed to a policy this turn but Gemini did | |
| # NOT call mark_recommendation (single_brain stamps "mark_recommendation" | |
| # into turn.brain_used when the tool fires β see single_brain.py:1052), | |
| # auto-call mark_recommendation against session.last_recommendation_ids[:1] | |
| # so the closure event is recorded for analytics regardless of whether | |
| # the LLM remembered to pull the tool. Best-effort; never blocks the | |
| # reply if anything goes wrong. | |
| try: | |
| if ( | |
| USE_SINGLE_BRAIN | |
| and turn is not None | |
| and getattr(turn, "reply_text", None) | |
| and _CLOSER_KEYWORD_RE.search(req.user_text or "") | |
| and "mark_recommendation" not in (turn.brain_used or "") | |
| ): | |
| from backend.session_state import get_session as _get_session | |
| from backend import brain_tools as _brain_tools | |
| _closer_session = _get_session(session_id) | |
| _last_recs = list( | |
| getattr(_closer_session, "last_recommendation_ids", []) or [] | |
| ) | |
| if _last_recs: | |
| _result = _brain_tools.mark_recommendation( | |
| session=_closer_session, | |
| policy_ids=_last_recs[:1], | |
| is_final=True, | |
| ) | |
| logging.info( | |
| "U1-T9 closer auto-mark (session=%s) user_text=%r " | |
| "policy_ids=%s result=%s", | |
| session_id, req.user_text, _last_recs[:1], _result, | |
| ) | |
| except Exception as _closer_err: # noqa: BLE001 | |
| # Safety-net must never break the reply. | |
| logging.warning( | |
| "U1-T9 closer auto-mark failed (session=%s): %s: %s", | |
| session_id, type(_closer_err).__name__, _closer_err, | |
| ) | |
| # KI-254 β auto-mark_recommendation when single_brain emits a | |
| # recommendation turn (retrieve_policies fired + citations non-empty) | |
| # but Gemini skipped calling mark_recommendation. This populates | |
| # session.last_recommendation_ids so the NEXT turn's ordinal follow-up | |
| # ("tell me about #2", "the second one", "first option") can resolve. | |
| # Without this, RULE 3 ("call mark_recommendation alongside retrieve") | |
| # depends on Gemini remembering; smoke-3-personas showed it forgets | |
| # on recommendation turns ~70% of the time, breaking T4 ordinal routing. | |
| # Safety net mirrors the U1-T9 closer pattern: best-effort, never blocks. | |
| try: | |
| if ( | |
| USE_SINGLE_BRAIN | |
| and turn is not None | |
| and getattr(turn, "citations", None) | |
| and "retrieve_policies" in (turn.brain_used or "") | |
| and "mark_recommendation" not in (turn.brain_used or "") | |
| ): | |
| from backend.session_state import get_session as _get_session_r | |
| from backend import brain_tools as _brain_tools_r | |
| _rec_session = _get_session_r(session_id) | |
| _cited_ids: list[str] = [] | |
| _seen: set[str] = set() | |
| for _c in (turn.citations or []): | |
| pid = _c.get("policy_id") if isinstance(_c, dict) else getattr(_c, "policy_id", None) | |
| pid = (pid or "").strip() | |
| if pid and pid not in _seen: | |
| _seen.add(pid) | |
| _cited_ids.append(pid) | |
| if _cited_ids: | |
| # KI-278 β turn.citations is now the prose-aligned | |
| # recommendation set (single source of truth), so it equals | |
| # exactly the cards the user sees. Back-fill the FULL set so | |
| # ordinal follow-ups ("the 4th one") resolve against the same | |
| # list. The old [:4] cap existed only because citations used | |
| # to be the raw recall dump; it would now wrongly truncate a | |
| # legitimate 5-policy shortlist. | |
| _result_r = _brain_tools_r.mark_recommendation( | |
| session=_rec_session, | |
| policy_ids=_cited_ids, | |
| is_final=False, | |
| ) | |
| logging.info( | |
| "KI-254 auto-mark on rec turn (session=%s) " | |
| "policy_ids=%s result=%s", | |
| session_id, _cited_ids, _result_r, | |
| ) | |
| except Exception as _rec_err: # noqa: BLE001 | |
| logging.warning( | |
| "KI-254 auto-mark on rec turn failed (session=%s): %s: %s", | |
| session_id, type(_rec_err).__name__, _rec_err, | |
| ) | |
| # F3 β confirmation auto-extract safety net. | |
| # Symptom: Gemini emits a recap bullet list ("**Primary Goal:** first | |
| # family policy ...") from conversation context but skips one or more | |
| # save_profile_field calls. User says "yes this is correct"; the | |
| # _profile_complete gate refuses retrieval and the bot embarrassingly | |
| # re-asks. This block parses the bot's prior recap turn, maps slot | |
| # labels -> _REQUIRED_FOR_READY field names, and backfills any slot | |
| # that's STILL missing on the live profile. Best-effort; never blocks. | |
| # Mirror of KI-253 closer regex + KI-254 auto-mark pattern. | |
| try: | |
| _CONFIRM_RE = re.compile( | |
| r"\b(yes|correct|that'?s right|all correct|looks good)\b", | |
| re.IGNORECASE, | |
| ) | |
| _RECAP_BULLET_RE = re.compile(r"^\s*\*\s*\*\*[^:]+:\*\*", re.MULTILINE) | |
| _RECAP_LINE_RE = re.compile( | |
| r"^\s*\*\s*\*\*([^:]+):\*\*\s*(.+?)\s*$", | |
| re.MULTILINE, | |
| ) | |
| # Slot label aliases -> canonical _REQUIRED_FOR_READY field names. | |
| # Keys are lowercased + whitespace-collapsed. | |
| _SLOT_ALIASES = { | |
| "name": "name", | |
| "full name": "name", | |
| "age": "age", | |
| "dependents": "dependents", | |
| "family": "dependents", | |
| "family members": "dependents", | |
| "location": "location_tier", | |
| "city": "location_tier", | |
| "location tier": "location_tier", | |
| "city tier": "location_tier", | |
| "income": "income_band", | |
| "income band": "income_band", | |
| "annual income": "income_band", | |
| "primary goal": "primary_goal", | |
| "goal": "primary_goal", | |
| "objective": "primary_goal", | |
| "health conditions": "health_conditions", | |
| "health": "health_conditions", | |
| "medical conditions": "health_conditions", | |
| "pre-existing conditions": "health_conditions", | |
| "medical history": "health_conditions", | |
| } | |
| if ( | |
| USE_SINGLE_BRAIN | |
| and turn is not None | |
| and _CONFIRM_RE.search(req.user_text or "") | |
| and not getattr(turn, "profile_complete", False) | |
| ): | |
| # Locate the most recent bot/assistant message in chat_history. | |
| _prior_bot_text = "" | |
| for _msg in reversed(req.chat_history or []): | |
| if not isinstance(_msg, dict): | |
| continue | |
| _role = (_msg.get("role") or "").lower() | |
| if _role in ("assistant", "bot", "model"): | |
| _prior_bot_text = _msg.get("content") or "" | |
| break | |
| if _prior_bot_text and _RECAP_BULLET_RE.search(_prior_bot_text): | |
| from backend.session_state import get_session as _get_session_f3 | |
| from backend import brain_tools as _brain_tools_f3 | |
| _f3_session = _get_session_f3(session_id) | |
| _profile_f3 = _f3_session.profile | |
| _parsed: dict[str, str] = {} | |
| for _label, _value in _RECAP_LINE_RE.findall(_prior_bot_text): | |
| _key = " ".join(_label.strip().lower().split()) | |
| _slot = _SLOT_ALIASES.get(_key) | |
| if not _slot: | |
| continue | |
| _val = (_value or "").strip().rstrip(".") | |
| if not _val: | |
| continue | |
| # Don't override slots that already have values. | |
| _existing = getattr(_profile_f3, _slot, None) | |
| if _existing not in (None, "", []): | |
| continue | |
| # First mapping wins (in case a label appears twice). | |
| _parsed.setdefault(_slot, _val) | |
| _backfilled: list[str] = [] | |
| for _slot in _brain_tools_f3._REQUIRED_FOR_READY: | |
| if _slot not in _parsed: | |
| continue | |
| _existing = getattr(_profile_f3, _slot, None) | |
| if _existing not in (None, "", []): | |
| continue | |
| try: | |
| _r = _brain_tools_f3.save_profile_field( | |
| session=_f3_session, | |
| field=_slot, | |
| value=_parsed[_slot], | |
| ) | |
| if isinstance(_r, dict) and _r.get("saved"): | |
| _backfilled.append(_slot) | |
| except Exception: # noqa: BLE001 β best-effort | |
| continue | |
| if _backfilled: | |
| logging.info( | |
| "F3 confirmation auto-extract (session=%s): backfilled %s", | |
| session_id, _backfilled, | |
| ) | |
| except Exception as _f3_err: # noqa: BLE001 | |
| # Safety-net must never break the reply. | |
| logging.warning( | |
| "F3 confirmation auto-extract failed (session=%s): %s: %s", | |
| session_id, type(_f3_err).__name__, _f3_err, | |
| ) | |
| audio_b64 = None | |
| audio_mime: Optional[str] = None | |
| # When TTS fails we propagate a STRUCTURED, user-facing notice instead | |
| # of silently dropping the audio. The text reply is still returned in | |
| # full; the frontend renders a small inline "voice unavailable" line | |
| # under the bot bubble so the user understands why there's no voice | |
| # (e.g. Sarvam 429 / no credits). | |
| tts_error_code: Optional[str] = None | |
| tts_user_message: Optional[str] = None | |
| if req.return_audio and turn.reply_text: | |
| try: | |
| from backend.voice_format import tts_preprocess | |
| # Send a CLEANED version of the reply to TTS β strip markdown, | |
| # citations, expand acronyms, truncate. The text in the chat | |
| # bubble remains the full structured reply. | |
| spoken = tts_preprocess( | |
| turn.reply_text, | |
| language="indic" if req.tts_language_code.startswith("hi") else "en", | |
| max_words=55, | |
| ) | |
| # X8 β honor X-Preferred-Codec; on transcoding failure the provider | |
| # falls back to raw WAV and reports audio_mime="audio/wav". | |
| audio, audio_mime = await get_tts().synthesize_with_mime( | |
| spoken, | |
| language_code=req.tts_language_code, | |
| preferred_codec=preferred_codec, | |
| ) | |
| audio_b64 = base64.b64encode(audio).decode("utf-8") | |
| except Exception as e: | |
| # Don't fail the whole turn if TTS hiccups β but make the failure | |
| # LOUD: classify it once at the boundary (closed enum, same | |
| # contract as STT) so the client can render a friendly notice | |
| # instead of showing a voice-less reply with no explanation. | |
| from backend.providers.sarvam_tts import ( | |
| classify_tts_exception, | |
| TTS_ERROR_USER_MESSAGES, | |
| TTS_ERROR_UNKNOWN, | |
| ) | |
| tts_error_code = classify_tts_exception(e) | |
| tts_user_message = TTS_ERROR_USER_MESSAGES.get( | |
| tts_error_code, TTS_ERROR_USER_MESSAGES[TTS_ERROR_UNKNOWN] | |
| ) | |
| audio_b64 = None | |
| audio_mime = None | |
| # Server-side diagnostics keep the raw error; the client never | |
| # sees the raw httpx string. | |
| logging.warning( | |
| "TTS failed (session=%s): error_code=%s exc=%s: %s", | |
| session_id, tts_error_code, type(e).__name__, e, | |
| ) | |
| log_turn({ | |
| "session_id": session_id, | |
| "tts_error": f"{type(e).__name__}: {e}", | |
| "tts_error_code": tts_error_code, | |
| }) | |
| try: | |
| log_turn({ | |
| "session_id": session_id, | |
| "user_text": req.user_text, | |
| "reply_text": turn.reply_text, | |
| "brain_used": turn.brain_used, | |
| "intent": turn.intent, | |
| "language": turn.language, | |
| "latency_ms": turn.latency_ms, | |
| "retrieved_chunk_ids": turn.retrieved_chunk_ids, | |
| "citation_count": len(turn.citations), | |
| "faithfulness_passed": turn.faithfulness_passed, | |
| "faithfulness_reasons": turn.faithfulness_reasons, | |
| "blocked": turn.blocked, | |
| }) | |
| except Exception: # noqa: BLE001 β log IO must never block a reply | |
| pass | |
| # Cross-session profile persistence + returning-user detection removed | |
| # in ADR-043 (2026-05-27). Sessions are in-memory only. | |
| _returning_user_recalled = False | |
| # Bug B defense β CitationOut requires page_start/page_end as ints, but | |
| # single_brain.TurnResult.citations dicts don't carry those fields (its | |
| # citation shape is {chunk_id, policy_id, policy_name, insurer_slug, | |
| # doc_type, source_url, score}). Without this normalisation the | |
| # Pydantic constructor below would raise ValidationError, the | |
| # exception would escape /api/chat, and FastAPI would return a raw | |
| # 500 (or its default JSON error envelope) that the frontend can't | |
| # parse as a ChatResponse. We patch every citation dict to satisfy | |
| # CitationOut's required fields and wrap the whole response build in | |
| # an explicit try/except so a malformed citation can never silently | |
| # bypass our envelope. | |
| try: | |
| from backend.policy_identity import clean_display_policy_name | |
| safe_citations: list[CitationOut] = [] | |
| for c in turn.citations or []: | |
| if not isinstance(c, dict): | |
| continue | |
| try: | |
| safe_citations.append( | |
| CitationOut( | |
| policy_id=str(c.get("policy_id", "") or ""), | |
| policy_name=clean_display_policy_name( | |
| str(c.get("policy_name", "") or "") | |
| ), | |
| insurer_slug=str(c.get("insurer_slug", "") or ""), | |
| page_start=int(c.get("page_start", 0) or 0), | |
| page_end=int(c.get("page_end", 0) or 0), | |
| source_url=str(c.get("source_url", "") or ""), | |
| score=float(c.get("score", 0.0) or 0.0), | |
| ) | |
| ) | |
| except Exception as _cite_err: # noqa: BLE001 | |
| logging.warning( | |
| "drop malformed citation (session=%s): %s β payload=%r", | |
| session_id, _cite_err, c, | |
| ) | |
| return ChatResponse( | |
| reply_text=turn.reply_text, | |
| citations=safe_citations, | |
| brain_used=turn.brain_used, | |
| intent=turn.intent, | |
| language=turn.language, | |
| latency_ms=turn.latency_ms, | |
| session_id=session_id, | |
| audio_base64=audio_b64, | |
| audio_mime=audio_mime, | |
| tts_error_code=tts_error_code, | |
| tts_user_message=tts_user_message, | |
| faithfulness_passed=turn.faithfulness_passed, | |
| faithfulness_reasons=turn.faithfulness_reasons, | |
| blocked=turn.blocked, | |
| profile_updates=turn.profile_updates, | |
| profile_complete=_compute_profile_complete(session_id), | |
| returning_user_recalled=_returning_user_recalled, | |
| ) | |
| except Exception as _resp_err: # noqa: BLE001 | |
| # Anything else (TypeError/AttributeError/ValidationError) on the | |
| # response-build path β return the standard error_fallback shape | |
| # so the frontend always parses cleanly. Bug B catch-all. | |
| logging.exception( | |
| "chat response-build failed (session=%s): %s", | |
| session_id, _resp_err, | |
| ) | |
| return ChatResponse( | |
| reply_text=( | |
| "Sorry, something went wrong β try again" | |
| ), | |
| citations=[], | |
| brain_used="error_fallback", | |
| intent="qa", | |
| language="en", | |
| latency_ms=int((time.time() - t_chat0) * 1000), | |
| session_id=session_id, | |
| audio_base64=None, | |
| audio_mime=None, | |
| faithfulness_passed=True, | |
| faithfulness_reasons=[], | |
| blocked=False, | |
| profile_updates={}, | |
| profile_complete=_compute_profile_complete(session_id), | |
| ) | |
| async def coverage(): | |
| """What policies/insurers are indexed in the corpus. | |
| Drives the UI's "what's covered" panel β sets user expectations + reduces | |
| over-refusals from off-corpus queries. | |
| """ | |
| try: | |
| from rag.retrieve import get_collection | |
| coll = get_collection() | |
| total = coll.count() | |
| except Exception: | |
| total = 0 | |
| # Insurer metadata β names + home URLs are curated + verified | |
| # (see eval/verified_urls.json + tools/verify_urls.py). | |
| # KI-132 (2026-05-15) β expanded to all 19 real insurers so /api/coverage | |
| # also returns proper display names (was 10 of 19). | |
| insurer_meta = { | |
| "acko": ("Acko Health Insurance", "https://www.acko.com/health-insurance/"), | |
| "aditya-birla": ("Aditya Birla Health Insurance", "https://www.adityabirlacapital.com/healthinsurance"), | |
| "bajaj-allianz": ("Bajaj Allianz General Insurance", "https://www.bajajallianz.com/"), | |
| "care-health": ("Care Health Insurance", "https://www.careinsurance.com/"), | |
| "cholamandalam": ("Cholamandalam MS General Insurance", "https://www.cholainsurance.com/"), | |
| "go-digit": ("Go Digit General Insurance", "https://www.godigit.com/"), | |
| "hdfc-ergo": ("HDFC ERGO General Insurance", "https://www.hdfcergo.com/"), | |
| "icici-lombard": ("ICICI Lombard General Insurance", "https://www.icicilombard.com/"), | |
| "iffco-tokio": ("IFFCO Tokio General Insurance", "https://www.iffcotokio.co.in/"), | |
| "manipalcigna": ("ManipalCigna Health Insurance", "https://www.manipalcigna.com/"), | |
| "national-insurance": ("National Insurance Company", "https://nationalinsurance.nic.co.in/"), | |
| "new-india": ("New India Assurance", "https://www.newindia.co.in/"), | |
| "niva-bupa": ("Niva Bupa Health Insurance", "https://www.nivabupa.com/"), | |
| "indusind-general": ("IndusInd General Insurance (formerly Reliance General)", "https://www.indusind.com/general-insurance/"), | |
| "oriental-insurance": ("Oriental Insurance Company", "https://orientalinsurance.org.in/"), | |
| "reliance-general": ("Reliance General Insurance", "https://www.reliancegeneral.co.in/"), | |
| "royal-sundaram": ("Royal Sundaram General Insurance", "https://www.royalsundaram.in/"), | |
| "sbi-general": ("SBI General Insurance", "https://www.sbigeneral.in/"), | |
| "star-health": ("Star Health & Allied Insurance", "https://www.starhealth.in/"), | |
| "tata-aig": ("Tata AIG General Insurance", "https://www.tataaig.com/"), | |
| "user-upload": ("Your uploaded policies", ""), | |
| } | |
| # policy -> source_url (verified at download time) | |
| policy_urls: dict[tuple[str, str], str] = {} | |
| by_insurer: dict[str, dict] = {} | |
| # KI-135 (2026-05-15) β count policies the SAME way /api/policies/all | |
| # does (extracted/*.json + curated-facts pass-2) so the marketplace badge | |
| # ALWAYS matches the marketplace card count. Previously this loop read | |
| # Chroma metadata, which under-counted by ~20 because ~15 curated-facts | |
| # policies (Activ One, Optima Secure, Reassure 2/3, Health Guard Gold, | |
| # etc.) are legitimate distinct products that have no Chroma chunks yet, | |
| # plus ~5 display-name mismatches collapsed two policies into one. After | |
| # this refactor: badge = cards = 158 / 19. | |
| # KI-129 + KI-130 invariants still hold (profile + regulatory excluded). | |
| import json as _json | |
| from backend.policy_identity import clean_display_policy_name | |
| _DOCTYPE_RANK_COV = {"wordings": 0, "prospectus": 1, "cis": 2, "brochure": 3} | |
| _doctype_of_cov = lambda stem: stem.rsplit("__", 1)[1] if "__" in stem else "" | |
| _product_key_of_cov = lambda pid: pid.rsplit("__", 1)[0] if "__" in pid else pid | |
| curated_facts = _load_curated_facts() | |
| sorted_files = sorted( | |
| settings.EXTRACTED_DIR.glob("*.json"), | |
| key=lambda fp: (_DOCTYPE_RANK_COV.get(_doctype_of_cov(fp.stem), 99), fp.stem), | |
| ) | |
| seen_product_keys: set[str] = set() | |
| seen_policy_ids: set[str] = set() | |
| # KI-141 (2026-05-15) β pre-compute the alias mapping (curated marketing | |
| # renames whose source PDF maps to an extracted parent). These curated | |
| # entries collapse onto the parent card; they DO NOT count separately. | |
| # Same algorithm as /api/policies/all so the totals stay in sync. | |
| # | |
| # KI-142 (2026-05-15, REFACTORED) β UIN-primary invariant: 1 unique UIN | |
| # = 1 unique marketplace card. Mirrors the /api/policies/all algorithm | |
| # so the coverage policy_count stays in lockstep with the marketplace | |
| # card count. See the long-form comment block in that endpoint for the | |
| # full algorithm rationale. | |
| extracted_stems_cov = {fp.stem for fp in sorted_files} | |
| # Phase A β extracted parents claim their UINs first. We also retain the | |
| # parsed extracted JSON so Phase B can run the KI-145 material-diff check | |
| # without re-reading from disk. | |
| uin_to_parent_cov: dict[str, str] = {} | |
| extracted_uin_cov: dict[str, str] = {} | |
| extracted_data_cov: dict[str, dict] = {} | |
| for fp in sorted_files: | |
| try: | |
| _d = _json.loads(fp.read_text()) | |
| except Exception: | |
| continue | |
| extracted_data_cov[fp.stem] = _d | |
| _u = _d.get("uin_code") | |
| if isinstance(_u, dict): | |
| _u = _u.get("value") | |
| _u = (_u or "").strip() if isinstance(_u, str) else "" | |
| if _u: | |
| extracted_uin_cov[fp.stem] = _u | |
| uin_to_parent_cov.setdefault(_u, fp.stem) | |
| direct_parent_cov: dict[str, str] = {} | |
| curated_canonical_ids_cov: list[str] = [] | |
| # KI-145 β curated entries that failed the material-diffs gate (same UIN | |
| # or source-PDF as a pass-1 card but >= 2 decision-critical fields | |
| # disagree). These must emit as standalone pass-2 cards so the coverage | |
| # policy_count stays in lockstep with /api/policies/all. | |
| ki145_variant_curated_ids_cov: set[str] = set() | |
| # Phase B β walk curated entries deterministically (sorted by policy_id). | |
| for curated_pid, cdata in sorted(curated_facts.items()): | |
| if curated_pid != cdata.get("policy_id", curated_pid): | |
| continue | |
| if any(curated_pid.endswith(f"__{dt}") | |
| for dt in ("wordings", "brochure", "cis", "prospectus")): | |
| continue | |
| curated_canonical_ids_cov.append(curated_pid) | |
| curated_uin = cdata.get("uin_code") | |
| if isinstance(curated_uin, dict): | |
| curated_uin = curated_uin.get("value") | |
| curated_uin = (curated_uin or "").strip() if isinstance(curated_uin, str) else "" | |
| parent_id: str | None = None | |
| if curated_uin and curated_uin in uin_to_parent_cov \ | |
| and uin_to_parent_cov[curated_uin] != curated_pid: | |
| # KI-145 (2026-05-15) β same UIN β same product. Compare | |
| # decision-critical fields against the candidate extracted | |
| # parent; if 2+ disagree (non-null on both sides) this is a | |
| # VARIANT and stays as its own card. Pure RENAME (< 2 diffs) | |
| # falls through to the alias-merge as before. | |
| candidate = uin_to_parent_cov[curated_uin] | |
| # Candidate may be extracted OR curated β fall back to curated | |
| # facts when no extracted JSON exists, so the diff has real data. | |
| cand_data = extracted_data_cov.get(candidate) or curated_facts.get(candidate, {}) | |
| if _ki145_material_diffs(cdata, cand_data) < 2: | |
| parent_id = candidate | |
| else: | |
| ki145_variant_curated_ids_cov.add(curated_pid) | |
| elif curated_uin: | |
| # New UIN β claim it. KI-145 spec: UIN unmatched against any | |
| # extracted parent = standalone. Flag so pass-2 emits even if | |
| # policy_id is a prefix of a seen extracted id. | |
| uin_to_parent_cov[curated_uin] = curated_pid | |
| ki145_variant_curated_ids_cov.add(curated_pid) | |
| if parent_id is None and not curated_uin: | |
| # KI-142 (preserved): source-PDF fallback only when curated entry | |
| # has NO UIN. When UIN is present but unmatched, KI-145 spec | |
| # mandates standalone β PDF coincidence cannot override. | |
| fb_parent = _source_pdf_to_policy_id(cdata.get("_primary_source_pdf")) | |
| if fb_parent and fb_parent in extracted_stems_cov and fb_parent != curated_pid: | |
| ext_data = extracted_data_cov.get(fb_parent, {}) | |
| if _ki145_material_diffs(cdata, ext_data) < 2: | |
| parent_id = fb_parent | |
| else: | |
| ki145_variant_curated_ids_cov.add(curated_pid) | |
| if parent_id: | |
| direct_parent_cov[curated_pid] = parent_id | |
| # Phase C β chain-compress (see /api/policies/all for rationale). | |
| aliased_curated_ids_cov: set[str] = set() | |
| parent_pkey_alias_count: dict[str, int] = {} | |
| def _terminal_parent_cov(start: str) -> str | None: | |
| seen_chain: set[str] = set() | |
| cur = start | |
| while True: | |
| nxt = direct_parent_cov.get(cur) | |
| if not nxt: | |
| return cur if cur != start else None | |
| if nxt in seen_chain or nxt == start: | |
| return None | |
| seen_chain.add(nxt) | |
| cur = nxt | |
| for curated_pid in curated_canonical_ids_cov: | |
| if curated_pid not in direct_parent_cov: | |
| continue | |
| terminal = _terminal_parent_cov(curated_pid) | |
| if not terminal: | |
| continue | |
| if terminal in extracted_stems_cov: | |
| terminal_pkey = _product_key_of_cov(terminal) | |
| else: | |
| terminal_pkey = terminal | |
| aliased_curated_ids_cov.add(curated_pid) | |
| parent_pkey_alias_count[terminal_pkey] = parent_pkey_alias_count.get(terminal_pkey, 0) + 1 | |
| # by_insurer entries: | |
| # products: set of product_keys (matches /api/policies/all card count) | |
| # names: ordered dict of policy_NAME -> first product_key (for sample display) | |
| # aliases: KI-141 β count of curated marketing-rename entries merged | |
| # into this insurer's parent cards (for the alias_count field). | |
| # KI-135 (2026-05-15) β track product_keys (not names) for counting so the | |
| # ~1 within-insurer policy_name collision (e.g. new-india Floater listed | |
| # as both extracted + curated_facts) doesn't collapse the count below the | |
| # marketplace card count. Both representations are still distinct products. | |
| # Pass 1: extracted JSONs (KI-133 dedup by product_key β wordings wins) | |
| for fp in sorted_files: | |
| try: | |
| data = _json.loads(fp.read_text()) | |
| except Exception: | |
| continue | |
| pid = data.get("policy_id", fp.stem) | |
| seen_policy_ids.add(pid) | |
| slug = data.get("insurer_slug", "") | |
| if slug == "regulatory": | |
| continue | |
| pkey = _product_key_of_cov(pid) | |
| if pkey in seen_product_keys: | |
| continue | |
| seen_product_keys.add(pkey) | |
| name = clean_display_policy_name(data.get("policy_name", "") or pid) | |
| url = data.get("source_pdf_url", "") | |
| if slug not in by_insurer: | |
| by_insurer[slug] = {"products": set(), "names": [], "chunks": 0, "aliases": 0} | |
| by_insurer[slug]["products"].add(pkey) | |
| # KI-141 β accumulate alias count from the pre-pass | |
| by_insurer[slug]["aliases"] += parent_pkey_alias_count.get(pkey, 0) | |
| if name not in by_insurer[slug]["names"]: | |
| by_insurer[slug]["names"].append(name) | |
| by_insurer[slug]["chunks"] += 1 | |
| if url and (slug, name) not in policy_urls: | |
| policy_urls[(slug, name)] = url | |
| # Pass 2: curated-facts policies that have no extracted counterpart | |
| for curated_pid, data in curated_facts.items(): | |
| if curated_pid != data.get("policy_id", curated_pid): | |
| continue # permutation alias | |
| if curated_pid in seen_policy_ids: | |
| continue | |
| # KI-145 β bypass the startswith dedup for genuine variants (same | |
| # UIN/source-PDF as a pass-1 card but materially different fields). | |
| # Otherwise variant cards would be silently dropped here. | |
| if curated_pid not in ki145_variant_curated_ids_cov \ | |
| and any(eid.startswith(curated_pid + "__") for eid in seen_policy_ids): | |
| continue | |
| # KI-141 β skip curated entries that have already been collapsed into | |
| # a pass-1 parent's alias list. | |
| if curated_pid in aliased_curated_ids_cov: | |
| continue | |
| seen_policy_ids.add(curated_pid) | |
| slug = data.get("insurer_slug", "") | |
| if slug == "regulatory": | |
| continue | |
| # Curated entries don't have a __doctype suffix, so use the full | |
| # policy_id as the product_key. | |
| pkey = curated_pid | |
| # KI-145 β variants share product_key with their pass-1 sibling | |
| # (different doctype-stripped stems are identical). Allow them past | |
| # this dedup so coverage policy_count = marketplace card count. | |
| if pkey in seen_product_keys and curated_pid not in ki145_variant_curated_ids_cov: | |
| continue | |
| seen_product_keys.add(pkey) | |
| name = clean_display_policy_name( | |
| data.get("policy_name", "") or curated_pid | |
| ) | |
| url = data.get("source_pdf_url", "") | |
| if slug not in by_insurer: | |
| by_insurer[slug] = {"products": set(), "names": [], "chunks": 0, "aliases": 0} | |
| # KI-145 β variants share pkey with a pass-1 sibling, so adding the | |
| # bare pkey to the set would be a no-op (set semantics). Tag variant | |
| # pkeys with a suffix in the counting set so the per-insurer count | |
| # increments by 1, matching the marketplace card count. | |
| if curated_pid in ki145_variant_curated_ids_cov: | |
| by_insurer[slug]["products"].add(f"{pkey}__ki145variant") | |
| else: | |
| by_insurer[slug]["products"].add(pkey) | |
| # KI-142 β accumulate alias count for curated parents (curated entries | |
| # that themselves became the claimant of a new UIN, with later curated | |
| # siblings aliasing onto them). | |
| by_insurer[slug]["aliases"] += parent_pkey_alias_count.get(pkey, 0) | |
| if name not in by_insurer[slug]["names"]: | |
| by_insurer[slug]["names"].append(name) | |
| by_insurer[slug]["chunks"] += 1 | |
| if url and (slug, name) not in policy_urls: | |
| policy_urls[(slug, name)] = url | |
| # #80 β SINGLE SOURCE OF TRUTH. Derive the catalogue counts from the SAME | |
| # de-duplicated marketplace the cards render from, so the header count can | |
| # never drift from what the user actually sees (1 product = 1 card | |
| # everywhere). The old parallel product_key tally double-counted the | |
| # doctype-sibling permutations the marketplace collapses. | |
| from collections import Counter as _Counter | |
| _mp = await policies_all() | |
| _pc = _Counter(p.insurer_slug for p in _mp.policies) | |
| insurers_out = [] | |
| for slug, info in sorted(by_insurer.items()): | |
| # KI-130 β regulatory is not an insurer; never a marketplace card. | |
| if slug == "regulatory": | |
| continue | |
| product_count = _pc.get(slug, 0) | |
| if product_count == 0: | |
| continue | |
| sample_names = sorted(info["names"])[:8] | |
| name, home_url = insurer_meta.get(slug, (slug, "")) | |
| sample_entries = [ | |
| PolicyEntry(name=p, source_url=policy_urls.get((slug, p), "")) | |
| for p in sample_names | |
| ] | |
| insurers_out.append( | |
| InsurerCoverage( | |
| slug=slug, | |
| name=name, | |
| home_url=home_url, | |
| policy_count=product_count, | |
| sample_policies=sample_entries, | |
| alias_count=info.get("aliases", 0), | |
| ) | |
| ) | |
| return CoverageResponse( | |
| total_chunks=total, | |
| total_policies=_mp.total, | |
| total_insurers=_mp.insurers_indexed, | |
| insurers=insurers_out, | |
| ) | |
| async def upload_policy( | |
| request: Request, | |
| file: UploadFile = File(...), | |
| session_id: Optional[str] = Form(None), | |
| ): | |
| """Accept a user-uploaded PDF policy doc, chunk + embed it, add to the | |
| quarantine collection (NOT the shared `policies` corpus). | |
| Each upload is tagged with the caller's session_id so retrieval can scope | |
| quarantine queries to the uploader only. If no session_id is supplied, | |
| falls back to "anonymous" for backwards compatibility. | |
| """ | |
| import re | |
| import tempfile | |
| import time as _time | |
| from pathlib import Path as _PathLib | |
| t0 = _time.time() | |
| contents = await file.read() | |
| if not contents.startswith(b"%PDF"): | |
| raise HTTPException(400, "File does not look like a PDF (magic bytes wrong).") | |
| if len(contents) > 25 * 1024 * 1024: | |
| raise HTTPException(413, "PDF too large (>25 MB). Use a smaller file.") | |
| sid = session_id or "anonymous" | |
| # Slugify filename for policy_id | |
| raw = file.filename or "user_upload.pdf" | |
| stem = _PathLib(raw).stem | |
| slug = re.sub(r"[^a-zA-Z0-9]+", "-", stem.lower()).strip("-")[:80] or "user-upload" | |
| policy_id = f"user-upload__{sid[:12]}__{slug}" | |
| policy_name = stem.replace("_", " ").replace("-", " ").title() | |
| # Save to disk so ingest can read with pdfplumber | |
| user_dir = settings.CORPUS_DIR / "user-upload" | |
| user_dir.mkdir(parents=True, exist_ok=True) | |
| out_path = user_dir / f"{slug}.pdf" | |
| out_path.write_bytes(contents) | |
| # Orphan-file guard (2026-05-16) β the PDF is written to disk BEFORE the | |
| # 8 security gates run (pdfplumber needs a path). On ANY non-success exit | |
| # β security reject, empty-text reject, embed failure, bloat trip, the | |
| # broad 500 catch β the file must NOT be left lying in rag/corpus/ | |
| # user-upload/. `indexed_ok` is flipped True only after a successful | |
| # quarantine collection.add(); the finally block deletes the file unless | |
| # it was actually indexed (or short-circuited via the dedupe accept | |
| # cache, where the bytes are already represented by cached chunks). | |
| indexed_ok = False | |
| # Ingest just this one file | |
| try: | |
| from rag.ingest import ( | |
| _abort_if_hnsw_bloated, | |
| chunk_pages, | |
| get_quarantine_collection, | |
| read_pdf_pages, | |
| ) | |
| from backend.providers.local_embeddings import LocalEmbeddings as _Emb | |
| from backend.security import check_upload, rate_limiter | |
| pages = read_pdf_pages(out_path) | |
| # Run 8-gate security check (dedupe + mechanics + encrypted + content + | |
| # page ceiling + injection + per-session + per-IP rate limit + LLM judge) | |
| full_text = "\n".join(t for _, t in pages) | |
| # #47 β UIN net-new dedup: if the uploaded PDF's IRDAI UIN already | |
| # belongs to a catalogue policy it is NOT net-new β return the | |
| # existing card instead of indexing a duplicate. `indexed_ok` stays | |
| # False so the finally block deletes the freshly-written temp file. | |
| _uin_hit = _match_catalogue_uin(full_text) | |
| if _uin_hit: | |
| return UploadResponse( | |
| policy_id=_uin_hit[0], | |
| policy_name=_uin_hit[1], | |
| chunks_added=0, | |
| pages_indexed=len(pages), | |
| elapsed_ms=int((_time.time() - t0) * 1000), | |
| already_in_catalogue=True, | |
| existing_policy_id=_uin_hit[0], | |
| existing_policy_name=_uin_hit[1], | |
| ) | |
| client_ip = (request.client.host if request and request.client else "") or request.headers.get("X-Forwarded-For", "").split(",")[0].strip() | |
| verdict = await check_upload( | |
| content=contents, | |
| extracted_text=full_text, | |
| page_count=len(pages), | |
| session_id=sid, | |
| ip=client_ip, | |
| ) | |
| if not verdict.accepted: | |
| # File cleanup handled uniformly in the finally block (orphan | |
| # guard) β no explicit unlink needed here. | |
| raise HTTPException( | |
| 400, | |
| f"Upload rejected by security gates: {', '.join(verdict.reasons[:3])}", | |
| ) | |
| # If the dedupe gate found this exact (hash, session) already indexed, | |
| # skip chunking + embedding entirely and return the cached chunk count. | |
| # The bytes are already represented by the previously-indexed chunks, | |
| # so this freshly-written duplicate file is redundant β let the | |
| # finally block delete it (indexed_ok stays False). | |
| if verdict.cached_chunks is not None: | |
| return UploadResponse( | |
| policy_id=policy_id, | |
| policy_name=policy_name, | |
| chunks_added=verdict.cached_chunks, | |
| pages_indexed=len(pages), | |
| elapsed_ms=int((_time.time() - t0) * 1000), | |
| ) | |
| # Successful pass β record IP-level upload for rate-limit ledger | |
| from backend.security import record_ip_upload, record_accept | |
| import hashlib as _hashlib | |
| record_ip_upload(client_ip) | |
| chunks = list(chunk_pages(pages)) | |
| if not chunks: | |
| raise HTTPException(400, "Could not extract any text from the PDF (scanned image-only?).") | |
| # Quarantine HNSW bloat guard (2026-05-16) β fail fast BEFORE we | |
| # spend an embed call if a prior ingest/upload already bloated the | |
| # on-disk index. The guard scans ALL link_lists.bin under | |
| # VECTORS_DIR, so it covers both `policies` and the | |
| # `user_uploads_quarantine` collection. Raises RuntimeError on | |
| # trip; the broad except below converts it to a clean HTTP 500 | |
| # rather than letting the index grow into a disk-fill incident. | |
| _abort_if_hnsw_bloated() | |
| embedder = _Emb() | |
| texts = [c["text"] for c in chunks] | |
| vectors = await embedder.embed(texts, input_type="document") | |
| ids = [f"{policy_id}::chunk{c['chunk_idx']}" for c in chunks] | |
| metadatas = [ | |
| { | |
| "policy_id": policy_id, | |
| "insurer_slug": "user-upload", | |
| "policy_name": policy_name, | |
| "doc_type": "user_upload", | |
| "source_url": "", | |
| "page_start": c["page_start"], | |
| "page_end": c["page_end"], | |
| "chunk_idx": c["chunk_idx"], | |
| "local_path": str(out_path), | |
| "session_id": sid, | |
| } | |
| for c in chunks | |
| ] | |
| collection = get_quarantine_collection() | |
| # Remove any existing chunks under this policy_id (re-upload case) | |
| try: | |
| collection.delete(where={"policy_id": policy_id}) | |
| except Exception: | |
| pass | |
| collection.add(ids=ids, documents=texts, embeddings=vectors, metadatas=metadatas) | |
| # Index write succeeded β the on-disk file is now legitimately | |
| # referenced by chunk metadata.local_path; the finally block must | |
| # NOT delete it. | |
| indexed_ok = True | |
| # Post-add bloat guard β catch a bloat THIS upload caused (e.g. a | |
| # ChromaDB version / batch-size pathology). Mirrors ingest.py's | |
| # _abort_if_hnsw_bloated() after collection.add(). | |
| _abort_if_hnsw_bloated() | |
| # TTL bookkeeping β remember when this session last touched the | |
| # quarantine collection so the periodic purge task can evict its | |
| # chunks after the configured idle window (default 24h). | |
| _quarantine_touch(sid, policy_id) | |
| # Update rate-limit ledger after successful index | |
| rate_limiter.record_upload(sid, len(chunks)) | |
| # Cache this content hash β chunk count so an identical re-upload in | |
| # the same session short-circuits via gate_hash_dedupe. | |
| try: | |
| sha = _hashlib.sha256(contents).hexdigest() | |
| record_accept(sha, sid, len(chunks)) | |
| except Exception: | |
| pass | |
| # ---- #52: PERSIST + add to THE (global) marketplace ---------------- | |
| # The session-scoped quarantine add above is the immediate, private | |
| # path. #52 additionally requires the uploaded doc to become a REAL, | |
| # GRADED, PERSISTENT marketplace card that survives an HF Space | |
| # restart. So we: | |
| # (1) persist the raw PDF + a curated-facts-shaped JSON record + | |
| # the chunk payload under the PERSISTENT UPLOADED_DOCS_DIR, | |
| # (2) add the SAME chunks to the GLOBAL `policies` Chroma | |
| # collection (doc_type='user_upload') so they're retrievable | |
| # for everyone β per spec the doc is added to THE marketplace, | |
| # so global visibility is intentional; only the uploaded | |
| # document itself is exposed, never any session profile, | |
| # (3) invalidate the #40 marketplace-grade cache so the new card | |
| # grades immediately (the curated record flows through the | |
| # EXISTING _marketplace_catalogue Pass-2 + build_scorecard). | |
| # ANY failure here MUST surface (no silent failure): a 200 that | |
| # didn't persist would violate the #52 contract. | |
| from backend import uploaded_docs as _udocs | |
| _record = _udocs.persist_upload( | |
| policy_id=policy_id, | |
| policy_name=policy_name, | |
| pdf_bytes=contents, | |
| full_text=full_text, | |
| chunks=chunks, | |
| session_id=sid, | |
| ) | |
| # Global-collection ingest (idempotent β keyed by policy_id). | |
| from rag.ingest import get_chroma_collection as _get_pol_coll | |
| _pol = _get_pol_coll() | |
| _g_ids = [f"{policy_id}::chunk{c['chunk_idx']}" for c in chunks] | |
| # Use whatever insurer_slug build_record resolved (detected from | |
| # PDF text via detect_insurer_slug, or UPLOAD_INSURER_SLUG on no | |
| # match) so chunk metadata + scorecard reviews lookup agree. | |
| _resolved_insurer_slug = _record.get("insurer_slug", _udocs.UPLOAD_INSURER_SLUG) | |
| _g_meta = [ | |
| { | |
| "policy_id": policy_id, | |
| "insurer_slug": _resolved_insurer_slug, | |
| "policy_name": policy_name, | |
| "doc_type": _udocs.UPLOAD_DOC_TYPE, | |
| "source_url": "", | |
| "page_start": c["page_start"], | |
| "page_end": c["page_end"], | |
| "chunk_idx": c["chunk_idx"], | |
| # GLOBAL by design β NO session_id on these chunks. | |
| } | |
| for c in chunks | |
| ] | |
| try: | |
| _pol.delete(where={"policy_id": policy_id}) | |
| except Exception: # noqa: BLE001 β nothing to delete on first upload | |
| pass | |
| _pol.add(ids=_g_ids, documents=texts, embeddings=vectors, metadatas=_g_meta) | |
| _abort_if_hnsw_bloated() | |
| # Bust the #40 grade cache + the corpus-pdf index so the new card | |
| # appears immediately with a real grade. | |
| try: | |
| global _CORPUS_PDF_IDX | |
| _CORPUS_PDF_IDX = None | |
| with _MG_LOCK: | |
| _MG_CACHE["sig"] = None | |
| _MG_CACHE["index"] = None | |
| except Exception: # noqa: BLE001 β cache bust is best-effort | |
| pass | |
| # ββ Fire LLM-assisted extraction in background (ADR-044) βββββββββ | |
| # Same extractor as the catalogued 148. Runs ~30-60s; the upload | |
| # HTTP response returns now and the frontend polls | |
| # /api/upload/extraction-status/{policy_id} (see below) to know | |
| # when the card-bearing chat message should be pushed. | |
| # Fail-silent: a failed LLM pass leaves the heuristic record | |
| # intact, so the card still has SOMETHING to show β never blocks | |
| # the user. NEVER blocks this request. | |
| try: | |
| from pathlib import Path as _PathLib2 | |
| _persisted_pdf = _udocs.uploaded_docs_dir() / policy_id / "source.pdf" | |
| _detected_insurer_name = _record.get( | |
| "insurer_name", | |
| _udocs.detected_insurer_name(_resolved_insurer_slug) | |
| if _resolved_insurer_slug != _udocs.UPLOAD_INSURER_SLUG | |
| else _udocs.UPLOAD_INSURER_NAME, | |
| ) | |
| # Pre-stamp "pending" so a frontend poll that arrives BEFORE | |
| # extract_one_for_upload's first await still sees a known | |
| # state instead of HTTP 404. | |
| await _udocs._set_extraction_status( | |
| policy_id, | |
| status="pending", | |
| policy_name=policy_name, | |
| insurer_slug=_resolved_insurer_slug, | |
| started_at=None, | |
| completed_at=None, | |
| completeness_pct=None, | |
| overall_grade=None, | |
| error=None, | |
| ) | |
| asyncio.create_task( | |
| _udocs.extract_one_for_upload( | |
| policy_id=policy_id, | |
| pdf_path=_persisted_pdf, | |
| policy_name=policy_name, | |
| insurer_slug=_resolved_insurer_slug, | |
| insurer_name=_detected_insurer_name, | |
| ) | |
| ) | |
| except Exception: # noqa: BLE001 β extraction is async + optional | |
| pass | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(500, f"Indexing failed: {type(e).__name__}: {e}") | |
| finally: | |
| # Orphan-file guard β delete the on-disk PDF unless it was actually | |
| # indexed into the quarantine collection. Covers EVERY non-success | |
| # exit (security reject, empty-text, dedupe short-circuit, embed | |
| # failure, bloat trip, 500 catch). Best-effort: a cleanup failure | |
| # must never mask the real response/exception. | |
| if not indexed_ok: | |
| try: | |
| out_path.unlink(missing_ok=True) | |
| except Exception: # noqa: BLE001 | |
| pass | |
| return UploadResponse( | |
| policy_id=policy_id, | |
| policy_name=policy_name, | |
| chunks_added=len(chunks), | |
| pages_indexed=len(pages), | |
| elapsed_ms=int((_time.time() - t0) * 1000), | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # GET /api/upload/extraction-status/{policy_id} β frontend poll target | |
| # (ADR-044, 2026-05-27). | |
| # | |
| # After the upload endpoint returns, the chat flow needs to know when | |
| # the background LLM extraction completes so it can push the card-bearing | |
| # assistant message into chat with the FULL data (not the heuristic | |
| # stub). This endpoint exposes _UPLOAD_EXTRACTION_STATUS so the | |
| # frontend can poll every ~3s for up to ~120s. | |
| # --------------------------------------------------------------------------- | |
| class ExtractionStatusResponse(BaseModel): | |
| policy_id: str | |
| status: str # "pending" | "running" | "complete" | "failed" | "unknown" | |
| policy_name: Optional[str] = None | |
| insurer_slug: Optional[str] = None | |
| started_at: Optional[str] = None | |
| completed_at: Optional[str] = None | |
| completeness_pct: Optional[float] = None | |
| overall_grade: Optional[str] = None | |
| error: Optional[str] = None | |
| # 2026-05-27 β provenance: which LLM landed the extraction | |
| # ('gemini-2.5-flash#1' / 'gemini-2.5-flash#2' / 'gemini-2.5-flash#3' / | |
| # 'nim-fallback' / None on heuristic-only fallback). | |
| llm_used: Optional[str] = None | |
| llm_response_chars: Optional[int] = None | |
| async def admin_reextract_uploads( | |
| request: Request, | |
| x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"), | |
| force: bool = False, | |
| ): | |
| """Run LLM-assisted extraction on every persisted upload that doesn't | |
| yet have a `rag/extracted/<id>.json` (or `force=true` to re-extract all). | |
| Admin-gated; fires synchronously so the response carries the summary. | |
| Use when an upload was persisted before the LLM-extraction pipeline was | |
| wired and needs to be upgraded without re-uploading. | |
| """ | |
| from backend.admin import _check_admin | |
| _check_admin(request, x_admin_password) | |
| from backend import uploaded_docs as _udocs | |
| summary = await _udocs.backfill_extractions(force=force) | |
| return summary | |
| async def upload_extraction_status(policy_id: str): | |
| """Return the live status of a per-upload LLM-assisted extraction. | |
| Returns `status="unknown"` for an unrecognised policy_id (e.g. the | |
| frontend polled a stale id or a policy that was uploaded on a prior | |
| container) so the client can stop polling without ambiguity. | |
| """ | |
| from backend import uploaded_docs as _udocs | |
| state = _udocs.get_extraction_status(policy_id) | |
| if not state: | |
| return ExtractionStatusResponse(policy_id=policy_id, status="unknown") | |
| return ExtractionStatusResponse( | |
| policy_id=policy_id, | |
| status=state.get("status", "unknown"), | |
| policy_name=state.get("policy_name"), | |
| insurer_slug=state.get("insurer_slug"), | |
| started_at=state.get("started_at"), | |
| completed_at=state.get("completed_at"), | |
| completeness_pct=state.get("completeness_pct"), | |
| overall_grade=state.get("overall_grade"), | |
| error=state.get("error"), | |
| llm_used=state.get("llm_used"), | |
| llm_response_chars=state.get("llm_response_chars"), | |
| ) | |
| class ScorecardSubScore(BaseModel): | |
| name: str | |
| score: int | |
| summary: str | |
| signals: list[str] | |
| class ProfileSummaryModel(BaseModel): | |
| """Deterministic, profile-aware {strengths, caveat} (see | |
| backend.scorecard.build_profile_summary). Rendered at the TOP of every | |
| scorecard surface; the frontend falls back to one_liner when strengths | |
| is empty / insufficient. Optional with a default so every existing | |
| construction path (compare, insufficient-data) stays valid.""" | |
| strengths: list[str] = Field(default_factory=list) | |
| caveat: Optional[str] = None | |
| class ProfileCompletenessResponse(BaseModel): | |
| completeness: float # 0.0 - 1.0 | |
| completeness_pct: int # 0 - 100 | |
| fields_collected: list[str] | |
| fields_missing: list[str] | |
| is_personalized: bool # True if completeness >= threshold | |
| gate_threshold: float = 0.6 | |
| next_question_hint: Optional[str] = None | |
| profile: dict = Field(default_factory=dict) # current profile state for UI to render | |
| session_id: Optional[str] = None | |
| class ProfileUpdateRequest(BaseModel): | |
| session_id: str | |
| name: Optional[str] = None # KI-077 | |
| age: Optional[int] = None | |
| dependents: Optional[str] = None | |
| income_band: Optional[str] = None | |
| existing_cover_inr: Optional[int] = None | |
| primary_goal: Optional[str] = None | |
| location_tier: Optional[str] = None | |
| parents_to_insure: Optional[bool] = None | |
| parents_age_max: Optional[int] = None | |
| parents_has_ped: Optional[bool] = None | |
| health_conditions: Optional[list[str]] = None | |
| budget_band: Optional[str] = None | |
| budget_inr: Optional[int] = None # #64 β exact βΉ/yr from the slider | |
| # Collected by the profile-builder UI; also present on the Profile | |
| # dataclass + chat-path save_profile_field. Whitelisted here so | |
| # POST /api/profile accepts them. | |
| desired_sum_insured_inr: Optional[int] = None | |
| copay_pct: Optional[int] = None | |
| family_medical_history: Optional[list[str]] = None | |
| smoker: Optional[bool] = None | |
| class SessionResetRequest(BaseModel): | |
| session_id: str | |
| drop_profile: bool = False # True = nuke session entirely; False = clear chat only | |
| confirm: bool = False # KI-095 β must be True when drop_profile=True; guards accidental wipes | |
| class SessionResetResponse(BaseModel): | |
| ok: bool | |
| session_id: Optional[str] = None # new session_id when drop_profile=True | |
| cleared_state: bool | |
| class SessionClearRequest(BaseModel): | |
| session_id: str | |
| class SessionClearResponse(BaseModel): | |
| cleared: bool | |
| new_session_id: str | |
| async def session_clear(req: SessionClearRequest): | |
| """Clean Clear-chat semantic. Wipes the in-memory session state for the | |
| supplied session_id and ALWAYS returns a freshly minted UUID the | |
| frontend must adopt as its new session_id going forward. | |
| Post-ADR-043 (2026-05-27) there is nothing to preserve across sessions | |
| β there is no on-disk profile to "leave intact". A clear is a complete | |
| forget. | |
| Body : {session_id: str} | |
| Reply: {cleared: bool, new_session_id: str} | |
| """ | |
| from backend.session_state import clear_session | |
| cleared = clear_session(req.session_id) if req.session_id else False | |
| return SessionClearResponse( | |
| cleared=cleared, | |
| new_session_id=uuid.uuid4().hex[:12], | |
| ) | |
| async def session_reset(req: SessionResetRequest): | |
| """KI-020 β User-facing chat clear / fresh-start toggle. | |
| Two modes: | |
| - drop_profile=False: caller (frontend) wipes its own message history; the | |
| server-side profile is preserved so the next message resumes with what | |
| the bot already knows. Light-touch "clear visible chat". | |
| - drop_profile=True: server-side session state (profile + awaiting_question | |
| + free_form_session flag + on-disk JSON) is deleted entirely. The response | |
| returns a fresh session_id the frontend should adopt as its new id. | |
| """ | |
| from backend.session_state import reset_session | |
| cleared = False | |
| new_sid: Optional[str] = None | |
| if req.drop_profile: | |
| # KI-095 β require explicit confirm=True so a misclick or replayed | |
| # request cannot wipe a populated session by accident. | |
| if not req.confirm: | |
| raise HTTPException(status_code=400, detail="confirm=true required to drop session") | |
| cleared = reset_session(req.session_id) | |
| new_sid = uuid.uuid4().hex[:12] | |
| return SessionResetResponse(ok=True, session_id=new_sid, cleared_state=cleared) | |
| async def profile_update(req: ProfileUpdateRequest): | |
| """Write user-provided profile fields into session_state. Returns the new | |
| completeness so the frontend can immediately reveal personalized scores. | |
| ALSO ingests the profile as a chunk into Chroma (doc_type='profile', | |
| policy_id='profile_<session_id>') so the brain sees user context | |
| alongside policy + regulatory chunks at retrieval time. This is the | |
| "profile RAG" architecture β every recommendation grounds in (policy | |
| text + IRDAI mandate + user's own situation) jointly. | |
| """ | |
| from backend.scorecard import profile_completeness as _completeness | |
| from backend.session_state import get_session | |
| sess = get_session(req.session_id) | |
| # Update only fields the client explicitly sent (non-None) β keeps partial | |
| # save flows clean | |
| for field_name in ( | |
| "name", # KI-077 β accept name updates from the profile-builder UI | |
| "age", "dependents", "income_band", "existing_cover_inr", "primary_goal", | |
| "location_tier", "parents_to_insure", "parents_age_max", "parents_has_ped", | |
| "health_conditions", "budget_band", "budget_inr", | |
| "desired_sum_insured_inr", "copay_pct", "family_medical_history", "smoker", | |
| ): | |
| v = getattr(req, field_name, None) | |
| if v in (None, "", []): | |
| # KI-095 β never clobber a filled field with empty input from the client | |
| continue | |
| setattr(sess.profile, field_name, v) | |
| # KI-196 (ADR-041) β mark the slot as explicitly answered so the | |
| # completeness scorer recognises it. Without this, builder-form | |
| # captures land on the profile but the badge still reads 0% because | |
| # profile_completeness_view now gates on `Profile.asked`. | |
| if field_name not in sess.profile.asked: | |
| sess.profile.asked.append(field_name) | |
| # ADR-043 (2026-05-27) β cross-session persistence + profile_rag | |
| # upsert removed. The captured fields live only in the in-memory | |
| # SessionState for this session's lifetime (1 h idle TTL). | |
| p = sess.profile | |
| # KI-271 β SLOT_UNION-driven profile_dict (15 fields) so copay_pct + | |
| # family_medical_history + desired_sum_insured_inr propagate to the | |
| # save endpoint's response + RAG chunk. | |
| profile_dict = { | |
| slot: getattr(p, slot, None) for slot in brain_tools.SLOT_UNION | |
| } | |
| # KI-196 (ADR-041) β same answered-only gate as profile_completeness_view. | |
| answered = set(getattr(p, "asked", []) or []) | |
| completeness_input = { | |
| k: (v if k in answered else None) for k, v in profile_dict.items() | |
| } | |
| c = _completeness(completeness_input) | |
| collected = [k for k, v in profile_dict.items() if k in answered and v not in (None, "", [], False)] | |
| missing = [k for k, v in profile_dict.items() if k not in answered or v in (None, "", [])] | |
| # profile_rag upsert removed in ADR-043 (2026-05-27). Captured fields | |
| # remain in the in-memory SessionState only. | |
| return ProfileCompletenessResponse( | |
| completeness=c, | |
| completeness_pct=int(c * 100), | |
| fields_collected=collected, | |
| fields_missing=missing, | |
| is_personalized=c >= 0.6, | |
| profile=profile_dict, | |
| session_id=req.session_id, | |
| ) | |
| async def profile_completeness_view(session_id: Optional[str] = None): | |
| """Returns how much we know about the user. Frontend uses this to gate the | |
| personalized scorecard render β until completeness >= 0.6 we show the | |
| insurer-level metrics only, NOT the per-user grade. | |
| """ | |
| from backend.scorecard import profile_completeness as _completeness | |
| from backend.session_state import get_session | |
| from backend.needs_finder import next_question | |
| if not session_id: | |
| return ProfileCompletenessResponse( | |
| completeness=0.0, completeness_pct=0, | |
| fields_collected=[], fields_missing=[], | |
| is_personalized=False, | |
| next_question_hint="Start the chat and tell me about your situation", | |
| ) | |
| sess = get_session(session_id) | |
| p = sess.profile | |
| # KI-271 β profile_dict now built from brain_tools.SLOT_UNION (15 fields) | |
| # so every captured slot (including B5 desired_sum_insured_inr, D2 copay_pct, | |
| # D2 family_medical_history) propagates through to /api/profile/completeness, | |
| # /api/profile/predicted-premium-band, /api/profile/recall-by-name. Prior | |
| # 12-key hand-roll caused E3 to discover the band endpoint ignoring copay | |
| # + family entirely. | |
| profile_dict = { | |
| slot: getattr(p, slot, None) for slot in brain_tools.SLOT_UNION | |
| } | |
| # KI-196 (ADR-041) β Profile completeness gates on Profile.asked. | |
| answered = set(getattr(p, "asked", []) or []) | |
| completeness_input = { | |
| k: (v if k in answered else None) for k, v in profile_dict.items() | |
| } | |
| c = _completeness(completeness_input) | |
| collected = [k for k, v in profile_dict.items() if k in answered and v not in (None, "", [], False)] | |
| missing = [k for k, v in profile_dict.items() if k not in answered or v in (None, "", [])] | |
| hint = None | |
| try: | |
| # next_question returns the field name (str) of the next missing | |
| # slot. The frontend uses it as a slot-hint; the actual phrasing | |
| # is generated by the single-brain LLM. | |
| hint = next_question(p) | |
| except Exception: | |
| pass | |
| return ProfileCompletenessResponse( | |
| completeness=c, | |
| completeness_pct=int(c * 100), | |
| fields_collected=collected, | |
| fields_missing=missing, | |
| is_personalized=c >= 0.6, | |
| next_question_hint=hint, | |
| profile=profile_dict, | |
| session_id=session_id, | |
| ) | |
| class ScorecardResponse(BaseModel): | |
| policy_id: str | |
| policy_name: str | |
| insurer_slug: str | |
| overall_score: int | |
| grade: str | |
| one_liner: str | |
| sub_scores: list[ScorecardSubScore] | |
| data_completeness_pct: float | |
| methodology_link: str | |
| # True β this policy had too little structured data to grade honestly. | |
| # The response is still a valid HTTP-200 ScorecardResponse (grade "β", | |
| # overall_score 0, empty sub_scores, an honest one_liner) so the frontend | |
| # renders a truthful "not enough data yet" state instead of the generic | |
| # Retry fallback or a fabricated grade. Optional w/ default so the | |
| # existing /api/policies/compare construction (no flag) stays valid. | |
| insufficient_data: bool = False | |
| # Deterministic, profile-aware {strengths, caveat} computed on the same | |
| # pass as the grade. Optional w/ default so every construction path | |
| # (compare with no summary, insufficient-data) stays valid. | |
| profile_summary: Optional[ProfileSummaryModel] = None | |
| class CompareEntry(BaseModel): | |
| policy_id: str | |
| policy_name: str | |
| insurer_slug: str | |
| fields: dict | |
| scorecard: Optional[ScorecardResponse] = None | |
| class CompareResponse(BaseModel): | |
| policies: list[CompareEntry] | |
| field_order: list[str] | |
| class MarketplacePolicy(BaseModel): | |
| policy_id: str | |
| policy_name: str | |
| insurer_slug: str | |
| insurer_name: str | |
| insurer_home_url: str | |
| source_pdf_url: Optional[str] = None | |
| grade: str | |
| overall_score: int | |
| one_liner: str | |
| data_completeness_pct: float | |
| # Deterministic, profile-aware {strengths, caveat}. Populated from the | |
| # SAME build_scorecard pass that produced `grade`. None when the catalogue | |
| # was built profile-neutrally and no facts qualified. | |
| profile_summary: Optional[ProfileSummaryModel] = None | |
| # Headline filterable fields | |
| min_entry_age: Optional[int] = None | |
| max_entry_age: Optional[int] = None | |
| sum_insured_options: list[int] = Field(default_factory=list) | |
| # #81 β cover presented as a RANGE (min β max), never a discrete ladder | |
| # or a single deterministic number. | |
| # SI RATIONALISATION (D1/D3) β sum_insured_options / _min / _max are now | |
| # the SOURCE-QUOTE-CORROBORATED set only (backend/sum_insured.py). Values | |
| # the field's own source_quote does not genuinely state are dropped, so | |
| # the marketplace never shows an SI the policy document doesn't back. | |
| # β’ sum_insured_is_band β True only when the corroborated set is a | |
| # genuine continuous band (range language + wide minβmax). The | |
| # frontend renders "βΉX β βΉY"; otherwise it lists the discrete tiers. | |
| # β’ sum_insured_tiers β the corroborated discrete plan amounts | |
| # (== sum_insured_options; kept as an explicit, named field so the | |
| # display contract is unambiguous on the frontend). | |
| sum_insured_min: Optional[int] = None | |
| sum_insured_max: Optional[int] = None | |
| sum_insured_is_band: bool = False | |
| sum_insured_tiers: list[int] = Field(default_factory=list) | |
| pre_existing_disease_waiting_months: Optional[int] = None | |
| initial_waiting_period_days: Optional[int] = None | |
| maternity_waiting_months: Optional[int] = None | |
| copayment_pct: Optional[float] = None | |
| network_hospital_count: Optional[int] = None | |
| no_claim_bonus_pct: Optional[int] = None | |
| ayush_coverage: Optional[bool] = None | |
| maternity_coverage: Optional[bool] = None | |
| cashless_treatment_supported: Optional[bool] = None | |
| room_rent_capping: Optional[str] = None | |
| # #86 β sourced insurer-level network: the official list URL + the | |
| # official stated count (when the insurer publishes one). Replaces the | |
| # web-backfilled per-policy network_hospital_count for display. | |
| network_list_url: Optional[str] = None | |
| network_count_official: Optional[int] = None | |
| network_list_is_pdf: bool = False | |
| # #73/#76 β the curated re-extraction legitimately writes non-numeric | |
| # honest values (e.g. max_entry_age "No maximum age (Lifelong)", a | |
| # fractional no_claim_bonus_pct). Previously these raised ValidationError | |
| # and the ENTIRE policy was dropped from the marketplace. Coerce at the | |
| # model so any construction path self-heals: keep a parseable number, | |
| # else degrade that ONE field to None β never drop the policy. | |
| def _coerce_optional_int(cls, v): | |
| if v is None or isinstance(v, bool): | |
| return None | |
| if isinstance(v, int): | |
| return v | |
| try: | |
| return int(round(float(str(v).replace(",", "").strip()))) | |
| except (ValueError, TypeError): | |
| return None | |
| def _coerce_optional_float(cls, v): | |
| if v is None or isinstance(v, bool): | |
| return None | |
| try: | |
| return float(str(v).replace("%", "").replace(",", "").strip()) | |
| except (ValueError, TypeError): | |
| return None | |
| def _coerce_int_list(cls, v): | |
| if not isinstance(v, list): | |
| return [] | |
| out = [] | |
| for x in v: | |
| try: | |
| out.append(int(round(float(str(x).replace(",", "").strip())))) | |
| except (ValueError, TypeError): | |
| continue | |
| return out | |
| def _coerce_optional_bool(cls, v): | |
| if v is None or isinstance(v, bool): | |
| return v | |
| s = str(v).strip().lower() | |
| if s in ("true", "yes", "covered", "y", "1"): | |
| return True | |
| if s in ("false", "no", "not covered", "excluded", "n", "0"): | |
| return False | |
| return None | |
| # KI-141 (2026-05-15) β marketing-rename aliases that share the same | |
| # source PDF (e.g. "Activ One" and "Activ Health" both point to the | |
| # activ-health-individual__wordings.pdf parent). Default empty list so | |
| # the field is backward-compatible. Frontend renders these as small | |
| # "Also known as: X, Y" sub-labels under the parent card title. | |
| aliases: list[str] = Field(default_factory=list) | |
| class MarketplaceResponse(BaseModel): | |
| policies: list[MarketplacePolicy] | |
| total: int | |
| insurers_indexed: int | |
| async def scorecard_methodology(): | |
| """Transparency endpoint β returns the 6-criterion blueprint with weights, | |
| consumer rationale, fields driving each sub-score, and regulatory anchors. | |
| Frontend renders this inside PolicyDetailModal so the user can see exactly | |
| how the headline number is computed and which of the HealthPolicy schema | |
| fields feed into which criterion. Both counts below are DERIVED (single | |
| source of truth) β never hardcode them on the frontend; consume these. | |
| """ | |
| from backend.scorecard import ( | |
| METHODOLOGY_BLUEPRINT, WEIGHTS, SCORED_FIELDS, grade_for, | |
| ) | |
| from rag.schema import HealthPolicy | |
| # grade_thresholds DERIVED from grade_for() β the single source of truth | |
| # for the frozen cutoffs (2026-05-16). Never restate the numbers here: | |
| # the old hardcoded "β₯85/70β84/β¦" had drifted out of sync with the | |
| # recalibrated Aβ₯76/Bβ₯69/Cβ₯61/Dβ₯54/F<54 scoring, so the disclosed bands | |
| # did not match how grades were actually assigned. Introspecting grade_for | |
| # makes a future recalibration propagate automatically. | |
| _band_lo: dict[str, int] = {} | |
| _band_desc: dict[str, str] = {} | |
| for _s in range(0, 101): | |
| _g, _d = grade_for(_s) | |
| if _g not in _band_lo: | |
| _band_lo[_g] = _s | |
| _band_desc[_g] = _d | |
| _f_cut = min(v for k, v in _band_lo.items() if k != "F") | |
| _grade_thresholds = { | |
| g: (f"<{_f_cut} β {_band_desc[g]}" if g == "F" | |
| else f"β₯{_band_lo[g]} β {_band_desc[g]}") | |
| for g in ("A", "B", "C", "D", "F") | |
| } | |
| return { | |
| "weights": WEIGHTS, | |
| "scored_fields_count": len(SCORED_FIELDS), | |
| "total_schema_fields": len(HealthPolicy.model_fields), | |
| "criteria": METHODOLOGY_BLUEPRINT, | |
| "grade_thresholds": _grade_thresholds, | |
| "scoring_approach": ( | |
| "Rules-based (deterministic), no LLM-in-the-loop. Each criterion produces a " | |
| "0β100 sub-score from concrete schema fields; the overall score is the weighted " | |
| "average. Weights adapt to user profile when age/parents/budget are known." | |
| ), | |
| } | |
| def _build_corpus_url_index() -> dict[str, str]: | |
| """Parse 40-data/corpus_urls.md and return {policy_id: source_url}. Used to | |
| backfill source_pdf_url when the LLM extraction didn't capture it.""" | |
| import re as _re | |
| out: dict[str, str] = {} | |
| md_path = settings.DATA_DIR / "corpus_urls.md" | |
| if not md_path.exists(): | |
| return out | |
| for line in md_path.read_text().splitlines(): | |
| if not line.startswith("|") or "insurer_slug" in line or "---" in line: | |
| continue | |
| parts = [p.strip() for p in line.strip("|").split("|")] | |
| if len(parts) < 5: | |
| continue | |
| insurer_slug = parts[0] | |
| policy_name = parts[2] | |
| doc_type = parts[3] | |
| m = _re.search(r"https?://\S+", parts[4]) | |
| if not (insurer_slug and m): | |
| continue | |
| url = m.group(0) | |
| # Primary key β match rag.ingest.policy_id_for: <insurer>__<filename-stem> | |
| # where filename-stem is the URL's PDF filename without extension. | |
| url_stem = url.rsplit("/", 1)[-1].rsplit("?", 1)[0].rsplit(".", 1)[0] | |
| url_slug = _re.sub(r"[^a-z0-9]+", "-", url_stem.lower()).strip("-") | |
| out[f"{insurer_slug}__{url_slug}"] = url | |
| # Secondary key β derived from policy_name + doc_type (some extracted | |
| # JSONs use a name-based slug when the original URL filename differs) | |
| if policy_name and doc_type: | |
| name_slug = _re.sub(r"[^a-z0-9]+", "-", policy_name.lower()).strip("-") | |
| out.setdefault(f"{insurer_slug}__{name_slug}__{doc_type.lower()}", url) | |
| out.setdefault(f"{insurer_slug}__{name_slug}", url) | |
| return out | |
| def _load_curated_facts() -> dict[str, dict]: | |
| """Load the 40-data/policy_facts/*.json curated layer. Each file has a | |
| `{field: {value, source_pdf_path, source_quote}}` shape. We unwrap to a | |
| flat `{field: value}` dict for the marketplace endpoint, preserving the | |
| provenance in a `_facts_provenance` field for transparency. | |
| KI-141 (2026-05-15) β also computes `_primary_source_pdf`, the most-common | |
| `source_pdf_path` across this curated entry's fields. Used by both | |
| /api/policies/all and /api/coverage to alias-merge marketing-rename | |
| curated entries into their extracted-JSON parent card. | |
| KI-219 (2026-05-15) β CANONICAL PRECEDENCE. When the curated dir has BOTH | |
| a canonical (`<insurer>__<product>.json`) and one or more doctype-suffixed | |
| siblings (`<insurer>__<product>__wordings.json`, `__brochure.json`, | |
| `__cis.json`, `__prospectus.json`) for the same product, the canonical | |
| file's content is the source of truth. The suffixed-sibling keys point | |
| AT the canonical entry so any downstream lookup by either form resolves | |
| to the richer canonical data. Previously the order of `glob('*.json')` | |
| + `setdefault` made the loser non-deterministic; the more complete | |
| canonical entry (e.g. `hdfc-ergo__optima-secure.json` says "No room rent | |
| cap") was getting shadowed by the suffixed sibling (`...__wordings.json` | |
| says "Room rent capped at 1%"), collapsing scorecards to 72/100. | |
| """ | |
| import json as _json | |
| from collections import Counter | |
| facts: dict[str, dict] = {} | |
| facts_dir = settings.DATA_DIR / "policy_facts" | |
| if not facts_dir.exists(): | |
| return facts | |
| _DOCTYPE_SUFFIXES = ("__wordings", "__brochure", "__cis", "__prospectus") | |
| def _flatten(d: dict, fallback_id: str) -> dict: | |
| policy_id = d.get("policy_id") or fallback_id | |
| flat: dict = {} | |
| provenance: dict = {} | |
| all_source_pdfs: list[str] = [] | |
| for k, v in d.items(): | |
| if k.startswith("_") or k in ("policy_id", "policy_name", "insurer_slug"): | |
| flat[k] = v | |
| continue | |
| if isinstance(v, dict) and "value" in v: | |
| flat[k] = v["value"] | |
| if v.get("source_pdf_path") or v.get("source_quote") or v.get("source_url"): | |
| provenance[k] = { | |
| "source_pdf_path": v.get("source_pdf_path"), | |
| "source_quote": v.get("source_quote"), | |
| "source_url": v.get("source_url"), | |
| } | |
| if v.get("source_pdf_path"): | |
| all_source_pdfs.append(v["source_pdf_path"]) | |
| else: | |
| flat[k] = v | |
| flat.setdefault("policy_id", policy_id) | |
| flat["_facts_provenance"] = provenance | |
| flat["_primary_source_pdf"] = ( | |
| Counter(all_source_pdfs).most_common(1)[0][0] | |
| if all_source_pdfs else None | |
| ) | |
| return flat | |
| # Pass 1 β load every curated JSON, indexed by its on-disk stem, AND | |
| # group siblings by their canonical product_key (stem with any trailing | |
| # __doctype suffix stripped). | |
| by_stem: dict[str, dict] = {} | |
| siblings: dict[str, list[tuple[str, bool]]] = {} # product_key β [(stem, is_canonical), ...] | |
| for f in sorted(facts_dir.glob("*.json")): | |
| try: | |
| d = _json.loads(f.read_text()) | |
| except Exception: | |
| continue | |
| stem = f.stem | |
| flat = _flatten(d, stem) | |
| by_stem[stem] = flat | |
| # Determine canonical-ness by FILE STEM (not by policy_id field). | |
| # A stem ending in one of the four doctype tokens is a non-canonical | |
| # sibling; everything else is canonical. | |
| is_canonical = not any(stem.endswith(suf) for suf in _DOCTYPE_SUFFIXES) | |
| if is_canonical: | |
| product_key = stem | |
| else: | |
| # Strip the matching suffix to find the canonical sibling. | |
| for suf in _DOCTYPE_SUFFIXES: | |
| if stem.endswith(suf): | |
| product_key = stem[: -len(suf)] | |
| break | |
| else: | |
| product_key = stem | |
| siblings.setdefault(product_key, []).append((stem, is_canonical)) | |
| # Pass 2 β for each product_key, pick the canonical entry's flat dict if | |
| # present; otherwise fall back to the first suffixed sibling (sorted to | |
| # be deterministic). Then make every sibling key (canonical stem + each | |
| # __doctype variant + each sibling's own stem + each sibling's | |
| # policy_id) point at the chosen flat dict so the source-of-truth wins | |
| # regardless of which key the caller looked up by. | |
| # | |
| # KI-251 (2026-05-16) β FIELD-LEVEL canonical precedence. The original | |
| # KI-219 logic chose ONE entry (canonical) wholesale. That silently | |
| # dropped real curated data whenever the canonical file had a field | |
| # extracted as null (`{"value": null, ... "source_quote": "not stated | |
| # in <pdf>"}`) while a doctype sibling had the genuine value. Concrete | |
| # incident: `icici-lombard__health-elite-plus.json` (canonical, every | |
| # field null) shadowed `icici-lombard__health-elite-plus__wordings.json` | |
| # whose `sum_insured_options` is a real list β so the marketplace card | |
| # rendered "COVER UP TO β" despite the value existing in the curated | |
| # layer. Affected 8 products on sum_insured_options + 3 on entry-age. | |
| # | |
| # Fix: keep canonical precedence for every field the canonical populates | |
| # (KI-219 preserved exactly β `hdfc-ergo__optima-secure` "No room rent | |
| # cap" still wins over its sibling's "1%"), but for any field the | |
| # canonical leaves null/empty, backfill from the highest-ranked sibling | |
| # that has a genuine value. Doctype rank: wordings > prospectus > cis > | |
| # brochure (most authoritative source first). This ONLY surfaces data | |
| # that already exists verbatim in 40-data/policy_facts β nothing is | |
| # fabricated; the per-field provenance pointer is backfilled too so the | |
| # UI still shows the correct source quote for the borrowed field. | |
| _SIB_FILL_RANK = {"__wordings": 0, "__prospectus": 1, "__cis": 2, "__brochure": 3} | |
| def _is_empty(val) -> bool: | |
| return val is None or val == "" or val == [] or val == {} | |
| # Fields that are pure metadata / structural and must NOT be borrowed | |
| # across siblings (they describe the chosen entry itself, not a fact). | |
| _NON_FACT_KEYS = { | |
| "policy_id", "policy_name", "insurer_slug", | |
| "_facts_provenance", "_primary_source_pdf", | |
| } | |
| for product_key, entries in siblings.items(): | |
| canonical_entries = [s for s, c in entries if c] | |
| if canonical_entries: | |
| chosen_stem = canonical_entries[0] | |
| else: | |
| chosen_stem = sorted(s for s, _ in entries)[0] | |
| chosen = dict(by_stem[chosen_stem]) | |
| # Deterministic sibling order for field-level backfill: by doctype | |
| # authority rank, then stem (stable tiebreak). The chosen stem is | |
| # excluded β it is already the base. | |
| def _rank(stem: str) -> tuple: | |
| for suf, r in _SIB_FILL_RANK.items(): | |
| if stem.endswith(suf): | |
| return (r, stem) | |
| return (99, stem) | |
| fill_order = sorted( | |
| (s for s, _ in entries if s != chosen_stem), | |
| key=_rank, | |
| ) | |
| if fill_order: | |
| chosen_prov = dict(chosen.get("_facts_provenance") or {}) | |
| for sib_stem in fill_order: | |
| sib = by_stem[sib_stem] | |
| sib_prov = sib.get("_facts_provenance") or {} | |
| for k, v in sib.items(): | |
| if k in _NON_FACT_KEYS: | |
| continue | |
| # Canonical/base value wins whenever it is populated. | |
| if not _is_empty(chosen.get(k)): | |
| continue | |
| if _is_empty(v): | |
| continue | |
| chosen[k] = v | |
| # Carry the borrowed field's provenance so the UI still | |
| # shows the correct verbatim source quote for it. | |
| if k in sib_prov: | |
| chosen_prov[k] = sib_prov[k] | |
| chosen["_facts_provenance"] = chosen_prov | |
| # Register the canonical product_key. | |
| facts[product_key] = chosen | |
| # Register every doctype-suffix permutation pointing at the chosen | |
| # flat (back-compat with code that looks up by the suffixed name). | |
| for suf in _DOCTYPE_SUFFIXES: | |
| facts.setdefault(f"{product_key}{suf}", chosen) | |
| # Register every sibling's actual on-disk stem AND policy_id field | |
| # so callers that already hold a stem-like ID still resolve to the | |
| # canonical content. | |
| for sib_stem, _is_can in entries: | |
| facts[sib_stem] = chosen | |
| sib_pid = by_stem[sib_stem].get("policy_id") | |
| if isinstance(sib_pid, str) and sib_pid: | |
| facts[sib_pid] = chosen | |
| # #52 β merge PERSISTED user-uploaded docs into the curated layer so each | |
| # surfaces as a marketplace card via the EXISTING _marketplace_catalogue | |
| # Pass-2 + build_scorecard path (NO grading re-implementation). Records | |
| # are already in the curated `{field:{value,source_*}}` shape; run them | |
| # through the same _flatten so per-field provenance is preserved. They | |
| # have unique `user-upload__*` policy_ids so they can never collide with | |
| # a real curated product key. A failure here must NOT break the curated | |
| # layer for the 200+ real policies β log + continue. | |
| try: | |
| from backend import uploaded_docs as _udocs | |
| for _pid, _rec in _udocs.load_persisted_records().items(): | |
| if not isinstance(_rec, dict): | |
| continue | |
| facts[_pid] = _flatten(_rec, _pid) | |
| except Exception as e: # noqa: BLE001 β uploaded layer is additive | |
| logging.warning( | |
| "uploaded-docs curated merge failed (%s: %s) β " | |
| "marketplace falls back to corpus-only cards", | |
| type(e).__name__, e, | |
| ) | |
| return facts | |
| def _source_pdf_to_policy_id(pdf_path: str | None) -> str | None: | |
| """KI-141 β map a curated `source_pdf_path` like | |
| 'rag/corpus/aditya-birla/activ-health-individual__wordings.pdf' to the | |
| extracted-JSON policy_id 'aditya-birla__activ-health-individual__wordings'. | |
| Returns None if the input is empty/None. | |
| """ | |
| if not pdf_path: | |
| return None | |
| s = pdf_path | |
| if s.startswith("rag/corpus/"): | |
| s = s[len("rag/corpus/"):] | |
| if s.endswith(".pdf"): | |
| s = s[: -len(".pdf")] | |
| return s.replace("/", "__") | |
| _INSURER_NET: dict | None = None | |
| def _insurer_network(slug: str) -> dict: | |
| """#86 β official insurer-level network source (40-data/insurer_network | |
| .json): the official list URL + the official stated count where the | |
| insurer publishes one. Sourced, not web-backfilled. Cached.""" | |
| global _INSURER_NET | |
| if _INSURER_NET is None: | |
| p = settings.DATA_DIR / "insurer_network.json" | |
| try: | |
| _INSURER_NET = ( | |
| json.loads(p.read_text()).get("insurers", {}) if p.exists() else {} | |
| ) | |
| except Exception: | |
| _INSURER_NET = {} | |
| return _INSURER_NET.get(slug, {}) or {} | |
| def _recover_scorecard_facts(sc) -> dict: | |
| """#48 β port of the frontend parseScorecardFacts. The detail-modal | |
| snapshot recovers facts (co-pay, PED wait, network, cashless, β¦) from | |
| the scorecard's signal strings when the flat policy field is null. The | |
| marketplace CARD only had the flat fields, so it showed "β" where the | |
| modal showed a real value. Recover the SAME facts server-side and | |
| backfill `data` so card == modal everywhere, with no extra client call.""" | |
| import re as _re | |
| f: dict = {} | |
| sub = getattr(sc, "sub_scores", None) or [] | |
| for s in sub: | |
| for raw in getattr(s, "signals", None) or []: | |
| sig = str(raw).strip() | |
| low = sig.lower() | |
| m = _re.search(r"(\d+(?:\.\d+)?)%\s*copay", sig, _re.I) | |
| if m: | |
| f["copayment_pct"] = float(m.group(1)) | |
| elif _re.search(r"0% copayment", sig, _re.I): | |
| f["copayment_pct"] = 0 | |
| m = _re.search(r"(\d+)\s*mo\s*PED\s*waiting", sig, _re.I) | |
| if m: | |
| f["pre_existing_disease_waiting_months"] = int(m.group(1)) | |
| m = _re.search(r"([\d,]+)\+?\s*network hospitals", sig, _re.I) | |
| if m: | |
| f["network_hospital_count"] = int(m.group(1).replace(",", "")) | |
| if _re.search(r"cashless supported", low, _re.I): | |
| f["cashless_treatment_supported"] = True | |
| elif _re.search(r"no cashless", low, _re.I): | |
| f["cashless_treatment_supported"] = False | |
| if _re.search(r"ayush covered", low, _re.I): | |
| f["ayush_coverage"] = True | |
| elif _re.search(r"no ayush", low, _re.I): | |
| f["ayush_coverage"] = False | |
| if _re.search(r"maternity covered", low, _re.I): | |
| f["maternity_coverage"] = True | |
| if _re.search(r"no room rent cap", low, _re.I): | |
| f["_room_no_cap"] = True | |
| else: | |
| rr = _re.search(r"room rent capped:\s*(.+)$", sig, _re.I) | |
| if rr: | |
| f["_room_cap_text"] = rr.group(1).strip() | |
| m = _re.search(r"entry up to\s*(\d+)", sig, _re.I) | |
| if m: | |
| f["max_entry_age"] = int(m.group(1)) | |
| return f | |
| _CORPUS_PDF_IDX: dict[str, str] | None = None | |
| def _corpus_pdf_index() -> dict[str, str]: | |
| """Every policy in the catalogue exists ONLY because its source PDF was | |
| downloaded into rag/corpus to build the vectors + policy_facts. This maps | |
| each policy_id (full id, file stem, AND the #80 dedup-stripped id) to the | |
| absolute corpus PDF that physically exists on disk β so the marketplace | |
| can always link the real document even when no public origin URL was ever | |
| recorded. Wordings/policy docs win over CIS/brochure/prospectus. Built | |
| once and cached for the process lifetime.""" | |
| global _CORPUS_PDF_IDX | |
| if _CORPUS_PDF_IDX is not None: | |
| return _CORPUS_PDF_IDX | |
| from collections import Counter | |
| prio = {"wordings": 0, "policy": 1, "cis": 2, "prospectus": 3, "brochure": 4} | |
| def _rank(path: str) -> int: | |
| low = path.lower() | |
| for k, v in prio.items(): | |
| if k in low: | |
| return v | |
| return 9 | |
| idx: dict[str, str] = {} | |
| best: dict[str, int] = {} | |
| root = settings.CORPUS_DIR.parent.parent | |
| corpus_root = str(settings.CORPUS_DIR.resolve()) | |
| facts_dir = settings.DATA_DIR / "policy_facts" | |
| if facts_dir.exists(): | |
| for fp in sorted(facts_dir.glob("*.json")): | |
| try: | |
| d = json.loads(fp.read_text()) | |
| except Exception: | |
| continue | |
| pid = d.get("policy_id") or fp.stem | |
| paths = [ | |
| v.get("source_pdf_path") | |
| for v in d.values() | |
| if isinstance(v, dict) and v.get("source_pdf_path") | |
| ] | |
| if not paths: | |
| continue | |
| cand = Counter(paths).most_common(1)[0][0] | |
| ap = (root / cand).resolve() | |
| try: | |
| ok = ap.is_file() and str(ap).startswith(corpus_root) | |
| except Exception: | |
| ok = False | |
| if not ok: | |
| continue | |
| rank = _rank(cand) | |
| keys = {pid, fp.stem} | |
| for suff in ("__wordings", "__brochure", "__cis", "__prospectus", "__policy"): | |
| if pid.endswith(suff): | |
| keys.add(pid[: -len(suff)]) | |
| break | |
| for k in keys: | |
| if k not in idx or rank < best.get(k, 9): | |
| idx[k] = str(ap) | |
| best[k] = rank | |
| # #52 β persisted uploaded docs keep their real PDF in the persistent | |
| # UPLOADED_DOCS_DIR (NOT rag/corpus). Map their policy_id β that file so | |
| # the marketplace card's /api/policy-pdf link resolves to the exact | |
| # document the user uploaded and that the card was graded from. | |
| try: | |
| for d in sorted(settings.UPLOADED_DOCS_DIR.glob("*/source.pdf")): | |
| meta_p = d.parent / "meta.json" | |
| try: | |
| pid = json.loads(meta_p.read_text()).get("policy_id") or d.parent.name | |
| except Exception: # noqa: BLE001 | |
| pid = d.parent.name | |
| idx[pid] = str(d.resolve()) | |
| except Exception: # noqa: BLE001 β uploaded-pdf index is additive | |
| pass | |
| _CORPUS_PDF_IDX = idx | |
| return idx | |
| def _is_credible_pdf_url(u: str | None) -> bool: | |
| """#87 β a recorded source_pdf_url is only trustworthy as the policy-PDF | |
| link if it unambiguously points at a document, not an insurer homepage | |
| or a generic section page (e.g. https://www.sbigeneral.in, | |
| https://nationalinsurance.nic.co.in/en/health-insurance). When it isn't, | |
| we prefer the local corpus PDF we definitively have for every policy.""" | |
| if not u: | |
| return False | |
| from urllib.parse import urlparse | |
| try: | |
| path = (urlparse(u).path or "").lower() | |
| except Exception: | |
| return False | |
| if ".pdf" in path: | |
| return True | |
| return any( | |
| m in path | |
| for m in ("/documents/", "/dam/", "/download", "/sites/default/files/") | |
| ) | |
| def policy_pdf(policy_id: str): | |
| """Serve the local corpus PDF for a policy β the exact document the | |
| catalogue, vectors and facts were all built from. Guarantees every one | |
| of the 148 cards has a working real-PDF link even when no public origin | |
| URL was ever captured. Path is constrained to rag/corpus.""" | |
| idx = _corpus_pdf_index() | |
| ap = idx.get(policy_id) or idx.get(policy_id.replace("/", "__")) | |
| if not ap: | |
| raise HTTPException(status_code=404, detail="No source PDF for this policy") | |
| p = Path(ap).resolve() | |
| # #52 β also allow the persistent uploaded-docs store (the uploaded PDF | |
| # lives there, not in rag/corpus). Both roots are server-controlled | |
| # directories; the index only ever maps to files inside one of them, so | |
| # this stays a strict allowlist (no traversal surface). | |
| _allowed_roots = ( | |
| str(settings.CORPUS_DIR.resolve()), | |
| str(settings.UPLOADED_DOCS_DIR.resolve()), | |
| ) | |
| if not (p.is_file() and any(str(p).startswith(r) for r in _allowed_roots)): | |
| raise HTTPException(status_code=404, detail="Source PDF not found") | |
| return FileResponse( | |
| str(p), | |
| media_type="application/pdf", | |
| headers={"Content-Disposition": f'inline; filename="{p.name}"'}, | |
| ) | |
| def _merge_curated(extracted: dict, curated: dict | None) -> dict: | |
| """Curated facts override LLM extraction for every field they populate. | |
| LLM extraction fills the long tail. Provenance pointers survive in the | |
| merged dict so the UI can show source quotes per field.""" | |
| if not curated: | |
| return extracted | |
| merged = dict(extracted) | |
| for k, v in curated.items(): | |
| if v is not None and v != "" and v != []: | |
| merged[k] = v | |
| return merged | |
| def _si_source_quote(data: dict) -> str: | |
| """The verbatim source_quote backing this policy's sum_insured_options. | |
| Every real SI value in the catalogue comes from the curated | |
| 40-data/policy_facts layer (extracted SI is null across the board); the | |
| flatten step in _load_curated_facts() stores its provenance at | |
| data["_facts_provenance"]["sum_insured_options"]["source_quote"], which | |
| _merge_curated() carries through onto the merged dict. We also accept the | |
| wrapped `{value, source_quote}` shape defensively in case a future | |
| extraction path leaves the field unflattened. | |
| """ | |
| prov = (data.get("_facts_provenance") or {}).get("sum_insured_options") | |
| if isinstance(prov, dict) and prov.get("source_quote"): | |
| return str(prov["source_quote"]) | |
| raw = data.get("sum_insured_options") | |
| if isinstance(raw, dict) and raw.get("source_quote"): | |
| return str(raw["source_quote"]) | |
| return "" | |
| def _rationalise_si(data: dict, si_values: list[int]) -> "_si.SumInsuredView": | |
| """Apply the deterministic D3 source-quote corroboration filter + D1 | |
| band-vs-tier classification to this policy's SI list. Returns a | |
| SumInsuredView the marketplace serializer maps onto sum_insured_*. | |
| """ | |
| return _si.rationalise(si_values, _si_source_quote(data)) | |
| def _policy_corroborated_si(policy_id: str | None) -> "_si.SumInsuredView": | |
| """The corroborated SI view for a single policy_id (D2/D3). Resolves the | |
| same merged extracted+curated `data` the marketplace serializer sees, | |
| then runs the source-quote corroboration filter. `kind == "none"` β the | |
| policy publishes NO corroborated Sum Insured (drives the D2 disclosure). | |
| """ | |
| if not policy_id: | |
| return _si.SumInsuredView(kind="none", tiers=[], min_inr=None, max_inr=None) | |
| import json as _json | |
| try: | |
| curated = _load_curated_facts() | |
| except Exception: | |
| curated = {} | |
| data: dict = {} | |
| ep = settings.EXTRACTED_DIR / f"{policy_id}.json" | |
| if ep.exists(): | |
| try: | |
| data = _json.loads(ep.read_text()) | |
| except Exception: | |
| data = {} | |
| cur = curated.get((data.get("policy_id") if data else None) or policy_id) \ | |
| or curated.get(policy_id) | |
| data = _merge_curated(data, cur) if (data or cur) else {} | |
| si = data.get("sum_insured_options") or [] | |
| if isinstance(si, list): | |
| si = [int(x) for x in si | |
| if isinstance(x, (int, float)) or (isinstance(x, str) and x.isdigit())] | |
| else: | |
| si = [] | |
| return _rationalise_si(data, si) | |
| # Decision-critical fields that distinguish a RENAME (curated entry folds | |
| # onto extracted parent) from a VARIANT (same UIN but materially different | |
| # product β must stay as its own card). Same UIN β same product: | |
| # regulators file one "wordings" PDF that covers multiple marketed variants | |
| # (e.g. ProHealth Prime vs ProHealth Protect both filed under | |
| # MCIHLIP24011V072324; copay/PED/maternity/NCB differ). | |
| _KI145_DIFF_FIELDS: tuple[str, ...] = ( | |
| "copayment_pct", | |
| "pre_existing_disease_waiting_months", | |
| "maternity_coverage", | |
| "maternity_waiting_months", | |
| "room_rent_capping", | |
| "restoration_benefit", | |
| "no_claim_bonus_pct", | |
| "post_hospitalization_days", | |
| ) | |
| def _ki145_extract_value(raw, field: str): | |
| """Unwrap the value from either scalar OR nested `{value, ...}` shapes. | |
| For two fields the extracted-side shape is `{covered, ...}` instead of | |
| `{value, ...}`: maternity_coverage and restoration_benefit. We project | |
| those onto the boolean `covered` so a curated bool/str compares cleanly | |
| against the extracted dict's truthiness. | |
| """ | |
| if raw is None: | |
| return None | |
| if isinstance(raw, dict): | |
| if "value" in raw: | |
| return raw.get("value") | |
| if field in ("maternity_coverage", "restoration_benefit") and "covered" in raw: | |
| return raw.get("covered") | |
| # Unknown dict shape β treat as opaque non-null marker so a real | |
| # diff isn't accidentally suppressed. | |
| return raw | |
| return raw | |
| def _ki145_normalize(field: str, val): | |
| """Coerce field value into a comparable form (numbers as floats, bools as | |
| bools, strings stripped lower-case). Returns None on null/empty/"" so it | |
| is consistently skipped in the diff count.""" | |
| if val is None: | |
| return None | |
| # Numeric fields | |
| if field in ( | |
| "copayment_pct", | |
| "pre_existing_disease_waiting_months", | |
| "maternity_waiting_months", | |
| "no_claim_bonus_pct", | |
| "post_hospitalization_days", | |
| ): | |
| try: | |
| return float(val) | |
| except (TypeError, ValueError): | |
| return None | |
| # Boolean fields | |
| if field == "maternity_coverage": | |
| if isinstance(val, bool): | |
| return val | |
| if isinstance(val, (int, float)): | |
| return bool(val) | |
| if isinstance(val, str): | |
| s = val.strip().lower() | |
| if s in ("true", "yes", "covered"): | |
| return True | |
| if s in ("false", "no", "not covered", "excluded"): | |
| return False | |
| return None | |
| return None | |
| # Restoration may arrive as bool (extracted .covered), str (curated prose) | |
| # or dict (already unwrapped above). Treat presence/absence as the signal: | |
| # a free-text limit phrase = True, explicit False/None = False. | |
| if field == "restoration_benefit": | |
| if isinstance(val, bool): | |
| return val | |
| if isinstance(val, str): | |
| s = val.strip().lower() | |
| if not s: | |
| return None | |
| if s in ("false", "no", "none", "n/a", "not available"): | |
| return False | |
| return True | |
| return None | |
| # String fields (room_rent_capping) | |
| if isinstance(val, str): | |
| s = val.strip().lower() | |
| return s or None | |
| return val | |
| def _ki145_material_diffs(curated: dict, extracted: dict) -> int: | |
| """Count fields where BOTH sides have non-null values that disagree. | |
| Null on either side = SKIP (extraction incompleteness, not a real diff). | |
| >= 2 diffs β VARIANT (keep separate). < 2 β RENAME (alias-merge).""" | |
| diffs = 0 | |
| for f in _KI145_DIFF_FIELDS: | |
| cur_v = _ki145_normalize(f, _ki145_extract_value(curated.get(f), f)) | |
| ext_v = _ki145_normalize(f, _ki145_extract_value(extracted.get(f), f)) | |
| if cur_v is None or ext_v is None: | |
| continue # extraction incompleteness, not a real diff | |
| if cur_v != ext_v: | |
| diffs += 1 | |
| return diffs | |
| def _profile_summary_model(sc) -> Optional[ProfileSummaryModel]: | |
| """Adapt the scorecard's ProfileSummary dataclass β the API model. | |
| None-safe (older Scorecard objects / defensive paths may not carry one). | |
| """ | |
| ps = getattr(sc, "profile_summary", None) | |
| if ps is None: | |
| return None | |
| return ProfileSummaryModel( | |
| strengths=list(getattr(ps, "strengths", []) or []), | |
| caveat=getattr(ps, "caveat", None), | |
| ) | |
| def _marketplace_catalogue(user_profile_dict, _collect_scorecards=None): | |
| """SINGLE SOURCE OF TRUTH for the marketplace card set (#40). | |
| The recommendation path (brain_tools._scorecard_signal via | |
| marketplace_grade) and /api/policies/all BOTH derive a policy's | |
| grade from THIS one computation, so the cited-card grade can never | |
| diverge from the marketplace card grade β including marketing-rename | |
| alias / KI-145 variant cards. Body is the former inline endpoint | |
| logic, moved verbatim; fully synchronous. | |
| Task #31 (single-source-of-truth, option (a)): when `_collect_scorecards` | |
| is a dict, it is populated `{surviving_card.policy_id: Scorecard}` with | |
| the EXACT `Scorecard` object (full 6 sub_scores + profile_summary + | |
| grade) computed here for each card that survives the final dedup. The | |
| single /api/policies/{id}/scorecard endpoint serves that object verbatim | |
| so its profile_summary / grade / sub_scores are byte-identical to the | |
| /api/policies/all card for the same canonical id BY CONSTRUCTION β both | |
| flow through this one `build_scorecard` pass on the same chosen sibling's | |
| `(data, insurer_reviews, profile)`. (The endpoint's old doctype-rank | |
| sibling-reconstruction picked a DIFFERENT sibling than this catalogue's | |
| completeness-based `_best` dedup, emitting a different strength set.) | |
| """ | |
| import json as _json | |
| # Task #31 β {id(MarketplacePolicy): Scorecard}. Keyed by the card | |
| # OBJECT's identity (NOT its policy_id string β several pre-dedup `out` | |
| # entries can share the same curated-canonical policy_id, so a string | |
| # key would let a lower-completeness sibling's Scorecard clobber the | |
| # survivor's). After `_best` picks the surviving object we map each | |
| # survivor's policy_id / canonical `_ident` to ITS OWN Scorecard. | |
| _sc_by_obj: dict = {} if _collect_scorecards is not None else None | |
| from backend.scorecard import build_scorecard | |
| from backend.policy_identity import clean_display_policy_name | |
| corpus_url_index = _build_corpus_url_index() | |
| curated_facts = _load_curated_facts() | |
| # KI-132 (2026-05-15) β expanded insurer_meta to cover all 19 real | |
| # insurers. Previously only 10 had curated display-names + home URLs, | |
| # so 9 insurers rendered as raw slugs in the marketplace dropdown | |
| # (acko, cholamandalam, go-digit, iffco-tokio, national-insurance, | |
| # oriental-insurance, reliance-general, royal-sundaram, sbi-general). | |
| insurer_meta = { | |
| "acko": ("Acko Health Insurance", "https://www.acko.com/health-insurance/"), | |
| "aditya-birla": ("Aditya Birla Health Insurance", "https://www.adityabirlacapital.com/healthinsurance"), | |
| "bajaj-allianz": ("Bajaj Allianz General Insurance", "https://www.bajajallianz.com/"), | |
| "care-health": ("Care Health Insurance", "https://www.careinsurance.com/"), | |
| "cholamandalam": ("Cholamandalam MS General Insurance", "https://www.cholainsurance.com/"), | |
| "go-digit": ("Go Digit General Insurance", "https://www.godigit.com/"), | |
| "hdfc-ergo": ("HDFC ERGO General Insurance", "https://www.hdfcergo.com/"), | |
| "icici-lombard": ("ICICI Lombard General Insurance", "https://www.icicilombard.com/"), | |
| "iffco-tokio": ("IFFCO Tokio General Insurance", "https://www.iffcotokio.co.in/"), | |
| "manipalcigna": ("ManipalCigna Health Insurance", "https://www.manipalcigna.com/"), | |
| "national-insurance": ("National Insurance Company", "https://nationalinsurance.nic.co.in/"), | |
| "new-india": ("New India Assurance", "https://www.newindia.co.in/"), | |
| "niva-bupa": ("Niva Bupa Health Insurance", "https://www.nivabupa.com/"), | |
| "indusind-general": ("IndusInd General Insurance (formerly Reliance General)", "https://www.indusind.com/general-insurance/"), | |
| "oriental-insurance": ("Oriental Insurance Company", "https://orientalinsurance.org.in/"), | |
| "reliance-general": ("Reliance General Insurance", "https://www.reliancegeneral.co.in/"), | |
| "royal-sundaram": ("Royal Sundaram General Insurance", "https://www.royalsundaram.in/"), | |
| "sbi-general": ("SBI General Insurance", "https://www.sbigeneral.in/"), | |
| "star-health": ("Star Health & Allied Insurance", "https://www.starhealth.in/"), | |
| "tata-aig": ("Tata AIG General Insurance", "https://www.tataaig.com/"), | |
| } | |
| def _coerce_bool(v): | |
| if isinstance(v, dict) and "covered" in v: return v.get("covered") | |
| if isinstance(v, bool): return v | |
| return None | |
| # Build a unified policy set: every extracted JSON + every curated facts | |
| # JSON that doesn't have an extracted counterpart yet. This way, even | |
| # policies whose LLM extraction failed still surface in the marketplace | |
| # with their human-curated data. | |
| # | |
| # KI-133 (2026-05-15) β dedup marketplace cards to ONE per policy product | |
| # (not one per PDF document). Previously wordings + brochure + cis of | |
| # the same product created three separate cards, ballooning the | |
| # marketplace from 138 products to ~209 cards and confusing users who | |
| # see "138 POLICIES" in the badge but 209 cards below. | |
| # | |
| # Algorithm: sort source filenames so the canonical "wordings" variant | |
| # is encountered first for each product, then dedup by stripped | |
| # policy_id (everything before the trailing __<doctype>). | |
| # Doctype preference: wordings > prospectus > cis > brochure > anything | |
| # else (alphabetical fallback). | |
| _DOCTYPE_RANK = {"wordings": 0, "prospectus": 1, "cis": 2, "brochure": 3} | |
| def _doctype_of(stem: str) -> str: | |
| return stem.rsplit("__", 1)[1] if "__" in stem else "" | |
| def _product_key_of(policy_id: str) -> str: | |
| # Strip trailing __<doctype> so wordings/brochure/cis of the same | |
| # product collapse to a single key. | |
| return policy_id.rsplit("__", 1)[0] if "__" in policy_id else policy_id | |
| sorted_files = sorted( | |
| settings.EXTRACTED_DIR.glob("*.json"), | |
| key=lambda fp: (_DOCTYPE_RANK.get(_doctype_of(fp.stem), 99), fp.stem), | |
| ) | |
| # KI-141 (2026-05-15) β alias-dedup pre-pass. Curated "marketing rename" | |
| # entries that re-describe the SAME IRDAI-filed product collapse onto a | |
| # single marketplace card; the marketing names surface as `aliases`. | |
| # | |
| # KI-142 (2026-05-15, REFACTORED) β UIN-primary invariant: 1 unique UIN | |
| # = 1 unique marketplace card. The PDF-based gate is now a fallback for | |
| # entries that lack a UIN. | |
| # | |
| # Algorithm (two phases so PDF-backed extracted entries always claim | |
| # their UIN before any curated rename does): | |
| # Phase A: walk extracted/*.json (sorted by doctype rank, then stem). | |
| # Each extracted parent claims its uin_code into `uin_to_parent`. | |
| # Phase B: walk curated_facts (sorted by policy_id for determinism). | |
| # For each canonical curated entry (skip lookup-permutation aliases | |
| # and entries that ARE __wordings/__brochure/__cis themselves): | |
| # 1. Read curated UIN (scalar OR nested .value form). | |
| # 2. If UIN non-empty AND already in `uin_to_parent` (claimant != | |
| # self) β alias of that parent. | |
| # 3. Else if UIN non-empty β claim it (so subsequent curated | |
| # siblings with the same UIN alias onto THIS entry in pass 2). | |
| # 4. Else (UIN empty) OR (UIN had no prior claimant) β fall back | |
| # to the source_pdf gate: if `_primary_source_pdf` maps to an | |
| # extracted parent stem, alias under that parent. | |
| # 5. Otherwise the curated entry stays as a standalone card. | |
| # | |
| # Multi-variant wordings PDFs with a single filed UIN (e.g. | |
| # manipalcigna prohealth-insurance-all-variants.pdf β the PDF text | |
| # confirms only ONE UIN `MCIHLIP24011V072324` is filed for that | |
| # product) correctly collapse all sub-product curated entries onto one | |
| # card. Distinct-UIN siblings under a shared PDF would surface as | |
| # separate cards because their UINs claim independent parents. | |
| extracted_stems = {fp.stem for fp in sorted_files} | |
| # Phase A β extracted parents claim their UINs first. We also retain the | |
| # parsed extracted JSON so Phase B can run the KI-145 material-diff check | |
| # without re-reading from disk. | |
| uin_to_parent: dict[str, str] = {} | |
| extracted_uin: dict[str, str] = {} # kept for downstream introspection | |
| extracted_data: dict[str, dict] = {} | |
| for fp in sorted_files: | |
| try: | |
| _d = _json.loads(fp.read_text()) | |
| except Exception: | |
| continue | |
| extracted_data[fp.stem] = _d | |
| _u = _d.get("uin_code") | |
| if isinstance(_u, dict): | |
| _u = _u.get("value") | |
| _u = (_u or "").strip() if isinstance(_u, str) else "" | |
| if _u: | |
| extracted_uin[fp.stem] = _u | |
| uin_to_parent.setdefault(_u, fp.stem) | |
| # Direct-parent map for each curated entry (built in Phase B), then | |
| # chain-compressed in Phase C so transitive aliases (e.g. activ-one β | |
| # activ-health β activ-health-individual__wordings) flatten onto the | |
| # ultimate extracted parent. | |
| direct_parent: dict[str, str] = {} | |
| curated_canonical_ids: list[str] = [] | |
| # KI-145 β curated entries whose UIN matched a candidate parent but | |
| # failed the material-diffs gate (>= 2 decision-critical fields disagree | |
| # with the parent's extracted JSON). These are genuine variants that | |
| # must emit as standalone cards in pass 2 even when their policy_id is | |
| # a prefix of a seen extracted policy_id (the old startswith-skip would | |
| # otherwise drop them silently). | |
| ki145_variant_curated_ids: set[str] = set() | |
| # Phase B β walk curated entries deterministically (sorted by policy_id). | |
| for curated_policy_id, cdata in sorted(curated_facts.items()): | |
| # Skip the __wordings/__brochure/__cis lookup-permutation aliases | |
| # that _load_curated_facts adds for hit-rate (canonical policy_id is | |
| # stored in the JSON's "policy_id" field). | |
| if curated_policy_id != cdata.get("policy_id", curated_policy_id): | |
| continue | |
| # Skip curated entries that ARE their own __wordings/__brochure/__cis | |
| # (doctype-permutation curated files, not marketing renames; pass-2 | |
| # dedup handles them via the seen_policy_ids prefix check). | |
| if any(curated_policy_id.endswith(f"__{dt}") | |
| for dt in ("wordings", "brochure", "cis", "prospectus")): | |
| continue | |
| curated_canonical_ids.append(curated_policy_id) | |
| # Read curated UIN (scalar OR nested {value, source_pdf_path, ...}). | |
| curated_uin = cdata.get("uin_code") | |
| if isinstance(curated_uin, dict): | |
| curated_uin = curated_uin.get("value") | |
| curated_uin = (curated_uin or "").strip() if isinstance(curated_uin, str) else "" | |
| parent_id: str | None = None | |
| if curated_uin and curated_uin in uin_to_parent \ | |
| and uin_to_parent[curated_uin] != curated_policy_id: | |
| # KI-145 (2026-05-15) β UIN-primary path with smart variant | |
| # detection. Same UIN does NOT guarantee same product: a single | |
| # regulator-filed PDF often covers multiple marketed variants | |
| # (e.g. ProHealth Prime vs ProHealth Protect both filed under | |
| # MCIHLIP24011V072324; activ-assure-diamond curated vs extracted | |
| # disagree on PED/NCB). Compare 8 decision-critical fields; if | |
| # 2+ disagree on non-null values, treat as a VARIANT and keep | |
| # this curated entry as its own card. < 2 = pure rename β merge. | |
| candidate = uin_to_parent[curated_uin] | |
| # Candidate may be an extracted stem OR a previously-claimed | |
| # curated entry. Look up extracted JSON first; fall back to the | |
| # candidate's curated facts so the diff has real data to compare. | |
| cand_data = extracted_data.get(candidate) or curated_facts.get(candidate, {}) | |
| if _ki145_material_diffs(cdata, cand_data) < 2: | |
| parent_id = candidate | |
| else: | |
| ki145_variant_curated_ids.add(curated_policy_id) | |
| elif curated_uin: | |
| # New UIN β this curated entry becomes the claimant so any | |
| # later curated sibling with the same UIN aliases onto it. Per | |
| # KI-145 spec ("if UIN doesn't match any extracted parent β | |
| # treat as standalone"), also flag this entry so pass-2 emits | |
| # it even when its policy_id is a prefix of a seen extracted id. | |
| uin_to_parent[curated_uin] = curated_policy_id | |
| ki145_variant_curated_ids.add(curated_policy_id) | |
| if parent_id is None and not curated_uin: | |
| # KI-142 (preserved): source-PDF fallback only fires for curated | |
| # entries with NO UIN. When UIN is present but unmatched, the | |
| # KI-145 spec mandates standalone β source-PDF coincidence MUST | |
| # NOT override the UIN-mismatch signal. | |
| fb_parent = _source_pdf_to_policy_id(cdata.get("_primary_source_pdf")) | |
| if fb_parent and fb_parent in extracted_stems and fb_parent != curated_policy_id: | |
| ext_data = extracted_data.get(fb_parent, {}) | |
| if _ki145_material_diffs(cdata, ext_data) < 2: | |
| parent_id = fb_parent | |
| else: | |
| ki145_variant_curated_ids.add(curated_policy_id) | |
| if parent_id: | |
| direct_parent[curated_policy_id] = parent_id | |
| # Phase C β chain-compress direct_parent so every curated alias points | |
| # at its terminal parent (an extracted stem, or a curated parent that | |
| # itself has no parent). Detect cycles defensively. After compression | |
| # we emit one alias entry per curated descendant onto the terminal | |
| # parent's product_key. | |
| parent_pkey_aliases: dict[str, list[str]] = {} | |
| aliased_curated_ids: set[str] = set() | |
| def _terminal_parent(start: str) -> str | None: | |
| """Walk direct_parent until we hit an extracted stem or a curated id | |
| with no further parent. Returns None on cycle (defensive).""" | |
| seen_chain: set[str] = set() | |
| cur = start | |
| while True: | |
| nxt = direct_parent.get(cur) | |
| if not nxt: | |
| return cur if cur != start else None | |
| if nxt in seen_chain or nxt == start: | |
| return None # cycle β drop the alias attempt | |
| seen_chain.add(nxt) | |
| cur = nxt | |
| for curated_policy_id in curated_canonical_ids: | |
| if curated_policy_id not in direct_parent: | |
| continue | |
| terminal = _terminal_parent(curated_policy_id) | |
| if not terminal: | |
| continue | |
| # Alias-target product_key: extracted stems use _product_key_of() | |
| # (strips __doctype). Curated terminals use the policy_id directly. | |
| if terminal in extracted_stems: | |
| terminal_pkey = _product_key_of(terminal) | |
| else: | |
| terminal_pkey = terminal | |
| alias_name = clean_display_policy_name( | |
| curated_facts.get(curated_policy_id, {}).get("policy_name") | |
| or curated_policy_id | |
| ) | |
| parent_pkey_aliases.setdefault(terminal_pkey, []).append(alias_name) | |
| aliased_curated_ids.add(curated_policy_id) | |
| seen_product_keys: set[str] = set() | |
| seen_policy_ids: set[str] = set() | |
| out = [] | |
| # Pass 1: existing extracted policies (merged with curated overrides) | |
| for fp in sorted_files: | |
| try: | |
| data = _json.loads(fp.read_text()) | |
| except Exception: | |
| continue | |
| policy_id_local = data.get("policy_id", fp.stem) | |
| curated_for_this = curated_facts.get(policy_id_local) or curated_facts.get(fp.stem) | |
| data = _merge_curated(data, curated_for_this) | |
| seen_policy_ids.add(policy_id_local) | |
| slug = data.get("insurer_slug", "") | |
| # Regulatory is not an insurer; drop entirely from the marketplace. | |
| # IRDAI/NHA docs are still retrieved and cited in chat answers, they | |
| # just don't appear as marketplace cards. | |
| if slug == "regulatory": | |
| continue | |
| # Dedup by product (insurer__product), so the wordings PDF wins and | |
| # the brochure/cis variants don't generate duplicate cards. Pass-1 | |
| # sort order guarantees wordings comes first. | |
| product_key = _product_key_of(policy_id_local) | |
| if product_key in seen_product_keys: | |
| continue | |
| seen_product_keys.add(product_key) | |
| name, home = insurer_meta.get(slug, (slug, "")) | |
| # Get insurer reviews if available for the scorecard | |
| ir = None | |
| if slug: | |
| rp = settings.DATA_DIR / "reviews" / f"{slug}.json" | |
| if rp.exists(): | |
| try: ir = _json.loads(rp.read_text()) | |
| except Exception: pass | |
| sc = build_scorecard(data, insurer_reviews=ir, profile=user_profile_dict) | |
| # #48 β recover facts from the scorecard so the flat marketplace | |
| # fields (hence the CARD) match the detail-modal snapshot. Only | |
| # fill nulls; never overwrite a real extracted value. | |
| _rf = _recover_scorecard_facts(sc) | |
| for _dk in ( | |
| "pre_existing_disease_waiting_months", "copayment_pct", | |
| "network_hospital_count", "cashless_treatment_supported", | |
| "ayush_coverage", "maternity_coverage", "max_entry_age", | |
| ): | |
| if data.get(_dk) is None and _rf.get(_dk) is not None: | |
| data[_dk] = _rf[_dk] | |
| if not data.get("room_rent_capping"): | |
| if _rf.get("_room_no_cap"): | |
| data["room_rent_capping"] = "No room rent cap" | |
| elif _rf.get("_room_cap_text"): | |
| data["room_rent_capping"] = _rf["_room_cap_text"] | |
| si = data.get("sum_insured_options") or [] | |
| if isinstance(si, list): | |
| si = [int(x) for x in si if isinstance(x, (int, float)) or (isinstance(x, str) and x.isdigit())] | |
| else: | |
| si = [] | |
| # D3 β drop every SI value the field's own source_quote does not | |
| # genuinely state, then D1-classify the corroborated set as a | |
| # continuous band or discrete tiers. sum_insured_options/_min/_max | |
| # are now the CORROBORATED set (no fabrication), so the slider | |
| # filter + range display stay honest by construction. | |
| _siv = _rationalise_si(data, si) | |
| si = _siv.tiers | |
| try: | |
| policy_id = data.get("policy_id", fp.stem) | |
| # Backfill source_pdf_url from corpus_urls.md when extraction didn't | |
| # populate it. Try exact policy_id match first, then key permutations. | |
| # #87 β prefer a CREDIBLE public document URL; otherwise use the | |
| # local corpus PDF we definitively have for every policy (served | |
| # via /api/policy-pdf). A homepage/section URL is never trusted | |
| # over the real document. Never an empty link. | |
| _pidx = _corpus_pdf_index() | |
| _cand = ( | |
| data.get("source_pdf_url") | |
| or corpus_url_index.get(policy_id) | |
| or corpus_url_index.get(fp.stem) | |
| or "" | |
| ) | |
| _local = ( | |
| f"/api/policy-pdf/{policy_id}" | |
| if (_pidx.get(policy_id) or _pidx.get(fp.stem)) | |
| else "" | |
| ) | |
| source_pdf_url = ( | |
| _cand if _is_credible_pdf_url(_cand) else (_local or _cand) | |
| ) | |
| _mp = MarketplacePolicy( | |
| policy_id=policy_id, | |
| policy_name=clean_display_policy_name( | |
| data.get("policy_name", fp.stem) | |
| ), | |
| insurer_slug=slug, | |
| insurer_name=name, | |
| insurer_home_url=home, | |
| source_pdf_url=source_pdf_url, | |
| grade=sc.grade, | |
| overall_score=sc.overall_score, | |
| one_liner=sc.one_liner, | |
| data_completeness_pct=sc.data_completeness_pct, | |
| profile_summary=_profile_summary_model(sc), | |
| min_entry_age=data.get("min_entry_age"), | |
| max_entry_age=data.get("max_entry_age"), | |
| sum_insured_options=si, | |
| sum_insured_min=_siv.min_inr, | |
| sum_insured_max=_siv.max_inr, | |
| sum_insured_is_band=_siv.is_band, | |
| sum_insured_tiers=si, | |
| pre_existing_disease_waiting_months=data.get("pre_existing_disease_waiting_months"), | |
| initial_waiting_period_days=data.get("initial_waiting_period_days"), | |
| maternity_waiting_months=data.get("maternity_waiting_months"), | |
| copayment_pct=data.get("copayment_pct") if isinstance(data.get("copayment_pct"), (int, float)) else None, | |
| network_hospital_count=data.get("network_hospital_count"), | |
| no_claim_bonus_pct=data.get("no_claim_bonus_pct"), | |
| ayush_coverage=_coerce_bool(data.get("ayush_coverage")), | |
| maternity_coverage=_coerce_bool(data.get("maternity_coverage")), | |
| cashless_treatment_supported=_coerce_bool(data.get("cashless_treatment_supported")), | |
| room_rent_capping=data.get("room_rent_capping") if isinstance(data.get("room_rent_capping"), str) else None, | |
| network_list_url=_insurer_network(slug).get("network_list_url"), | |
| network_count_official=_insurer_network(slug).get("stated_count"), | |
| network_list_is_pdf=bool(_insurer_network(slug).get("is_pdf")), | |
| # KI-141 β merge marketing-rename curated entries onto this | |
| # parent card. Sorted for deterministic output. | |
| aliases=sorted(parent_pkey_aliases.get(product_key, [])), | |
| ) | |
| out.append(_mp) | |
| if _sc_by_obj is not None: | |
| # Task #31 β bind THIS card object to the exact Scorecard | |
| # built above on the catalogue's chosen sibling | |
| # `data`/`ir`/profile. Object-keyed so it survives the | |
| # post-dedup mapping unambiguously. | |
| _sc_by_obj[id(_mp)] = sc | |
| except Exception as e: | |
| # One malformed extraction should not kill the whole feed | |
| print(f"[marketplace] skipping {fp.name}: {type(e).__name__}: {str(e)[:120]}") | |
| continue | |
| # Pass 2: curated policies that don't yet have an LLM extraction. | |
| # These come straight from 40-data/policy_facts/*.json β fully human-curated | |
| # with verbatim source quotes per field. | |
| for curated_policy_id, data in curated_facts.items(): | |
| # Skip permutation keys (we set __wordings / __brochure / __cis aliases | |
| # in _load_curated_facts to maximise the lookup hit-rate in pass 1) | |
| if curated_policy_id != data.get("policy_id", curated_policy_id): | |
| continue | |
| if curated_policy_id in seen_policy_ids: | |
| continue | |
| # Also skip if any extracted ID matches with a suffix β UNLESS this | |
| # curated entry was classified as a KI-145 variant (same UIN/source-PDF | |
| # as a pass-1 card but materially different decision-critical fields). | |
| # Variants MUST surface as their own marketplace card; the legacy | |
| # startswith dedup would otherwise drop them silently. | |
| if curated_policy_id not in ki145_variant_curated_ids \ | |
| and any(eid.startswith(curated_policy_id + "__") for eid in seen_policy_ids): | |
| continue | |
| # KI-141 β skip curated entries that have already been collapsed onto | |
| # a pass-1 parent card via the aliases mechanism (e.g. Activ One β | |
| # Activ Health Individual Wordings). | |
| if curated_policy_id in aliased_curated_ids: | |
| continue | |
| seen_policy_ids.add(curated_policy_id) | |
| slug = data.get("insurer_slug", "") | |
| # KI-208 (2026-05-15) β defensive symmetry with pass-1 (line 1842): any | |
| # curated_facts entry with insurer_slug=='regulatory' must NOT surface | |
| # as a marketplace card. Today no curated regulatory docs exist, but | |
| # adding the filter here closes a future-leak vector if an operator | |
| # accidentally curates an IRDAI/NHA fact-sheet under 40-data/policy_facts. | |
| if slug == "regulatory": | |
| continue | |
| name, home = insurer_meta.get(slug, (slug, "")) | |
| # Insurer reviews for scorecard | |
| ir = None | |
| if slug: | |
| rp = settings.DATA_DIR / "reviews" / f"{slug}.json" | |
| if rp.exists(): | |
| try: | |
| ir = _json.loads(rp.read_text()) | |
| except Exception: | |
| pass | |
| sc = build_scorecard(data, insurer_reviews=ir, profile=user_profile_dict) | |
| # #48 β recover facts from the scorecard so the flat marketplace | |
| # fields (hence the CARD) match the detail-modal snapshot. Only | |
| # fill nulls; never overwrite a real extracted value. | |
| _rf = _recover_scorecard_facts(sc) | |
| for _dk in ( | |
| "pre_existing_disease_waiting_months", "copayment_pct", | |
| "network_hospital_count", "cashless_treatment_supported", | |
| "ayush_coverage", "maternity_coverage", "max_entry_age", | |
| ): | |
| if data.get(_dk) is None and _rf.get(_dk) is not None: | |
| data[_dk] = _rf[_dk] | |
| if not data.get("room_rent_capping"): | |
| if _rf.get("_room_no_cap"): | |
| data["room_rent_capping"] = "No room rent cap" | |
| elif _rf.get("_room_cap_text"): | |
| data["room_rent_capping"] = _rf["_room_cap_text"] | |
| si = data.get("sum_insured_options") or [] | |
| if isinstance(si, list): | |
| si = [int(x) for x in si if isinstance(x, (int, float)) or (isinstance(x, str) and x.isdigit())] | |
| else: | |
| si = [] | |
| # D3/D1 β same source-quote corroboration + band/tier classification | |
| # as pass 1 (curated-only products take this branch). | |
| _siv = _rationalise_si(data, si) | |
| si = _siv.tiers | |
| try: | |
| # #87 β credible doc URL preferred, else the guaranteed-real | |
| # local corpus PDF; a homepage/section URL is never trusted. | |
| _pidx = _corpus_pdf_index() | |
| _cand = ( | |
| data.get("source_pdf_url") | |
| or corpus_url_index.get(curated_policy_id) | |
| or corpus_url_index.get(f"{curated_policy_id}__wordings") | |
| or "" | |
| ) | |
| _local = ( | |
| f"/api/policy-pdf/{curated_policy_id}" | |
| if _pidx.get(curated_policy_id) | |
| else "" | |
| ) | |
| source_pdf_url = ( | |
| _cand if _is_credible_pdf_url(_cand) else (_local or _cand) | |
| ) | |
| _mp = MarketplacePolicy( | |
| policy_id=curated_policy_id, | |
| policy_name=clean_display_policy_name( | |
| data.get("policy_name", curated_policy_id) | |
| ), | |
| insurer_slug=slug, | |
| insurer_name=name, | |
| insurer_home_url=home, | |
| source_pdf_url=source_pdf_url, | |
| grade=sc.grade, | |
| overall_score=sc.overall_score, | |
| one_liner=sc.one_liner, | |
| data_completeness_pct=sc.data_completeness_pct, | |
| profile_summary=_profile_summary_model(sc), | |
| min_entry_age=data.get("min_entry_age"), | |
| max_entry_age=data.get("max_entry_age"), | |
| sum_insured_options=si, | |
| sum_insured_min=_siv.min_inr, | |
| sum_insured_max=_siv.max_inr, | |
| sum_insured_is_band=_siv.is_band, | |
| sum_insured_tiers=si, | |
| pre_existing_disease_waiting_months=data.get("pre_existing_disease_waiting_months"), | |
| initial_waiting_period_days=data.get("initial_waiting_period_days"), | |
| maternity_waiting_months=data.get("maternity_waiting_months"), | |
| copayment_pct=data.get("copayment_pct") if isinstance(data.get("copayment_pct"), (int, float)) else None, | |
| network_hospital_count=data.get("network_hospital_count"), | |
| no_claim_bonus_pct=data.get("no_claim_bonus_pct"), | |
| ayush_coverage=_coerce_bool(data.get("ayush_coverage")), | |
| maternity_coverage=_coerce_bool(data.get("maternity_coverage")), | |
| cashless_treatment_supported=_coerce_bool(data.get("cashless_treatment_supported")), | |
| room_rent_capping=data.get("room_rent_capping") if isinstance(data.get("room_rent_capping"), str) else None, | |
| network_list_url=_insurer_network(slug).get("network_list_url"), | |
| network_count_official=_insurer_network(slug).get("stated_count"), | |
| network_list_is_pdf=bool(_insurer_network(slug).get("is_pdf")), | |
| # KI-142 β curated entries can ALSO be UIN-claimants when no | |
| # extracted parent owns their UIN. In that case their later | |
| # curated siblings alias onto them and surface here. | |
| aliases=sorted(parent_pkey_aliases.get(curated_policy_id, [])), | |
| ) | |
| out.append(_mp) | |
| if _sc_by_obj is not None: | |
| # Task #31 β exact Scorecard for this curated-only card, | |
| # bound to the card object (see Pass-1 rationale). | |
| _sc_by_obj[id(_mp)] = sc | |
| except Exception as e: | |
| print(f"[marketplace] skipping curated {curated_policy_id}: {type(e).__name__}: {str(e)[:120]}") | |
| continue | |
| # #80 β final safety dedup. The UIN/PDF gate above can still leak the | |
| # SAME logical product as both `insurer__product` and a doctype sibling | |
| # (`insurer__product__wordings|brochure|cis|prospectus|policy`). Collapse | |
| # to ONE card per product identity (richer entry wins; aliases merged) so | |
| # the marketplace never shows a plan twice β 1 product = 1 card. | |
| _DOCT = ("wordings", "brochure", "cis", "prospectus", "policy") | |
| def _ident(pid: str) -> str: | |
| for dt in _DOCT: | |
| if pid.endswith(f"__{dt}"): | |
| return pid[: -(len(dt) + 2)] | |
| return pid | |
| _best: dict[str, MarketplacePolicy] = {} | |
| for p in out: | |
| k = _ident(p.policy_id) | |
| prev = _best.get(k) | |
| if prev is None: | |
| _best[k] = p | |
| continue | |
| s = (p.data_completeness_pct, len(p.sum_insured_options), | |
| 1 if p.policy_id == k else 0) | |
| ps = (prev.data_completeness_pct, len(prev.sum_insured_options), | |
| 1 if prev.policy_id == k else 0) | |
| if s > ps: | |
| p.aliases = sorted(set(p.aliases) | set(prev.aliases)) | |
| _best[k] = p | |
| else: | |
| prev.aliases = sorted(set(prev.aliases) | set(p.aliases)) | |
| deduped = list(_best.values()) | |
| if _collect_scorecards is not None: | |
| # Task #31 β publish ONLY the post-dedup survivors' Scorecards, | |
| # each survivor mapped (by OBJECT identity) to ITS OWN Scorecard, | |
| # keyed by the survivor's policy_id AND its canonical | |
| # `_ident(policy_id)` so /api/policies/{id}/scorecard resolves a | |
| # doctype-suffixed / curated-canonical id onto the SAME card the | |
| # catalogue serves β the exact `Scorecard` object built here on the | |
| # catalogue's chosen sibling. Parity holds by construction (one | |
| # build_scorecard pass feeds both surfaces). Exact policy_id wins | |
| # over the canonical-ident fallback (setdefault) so a precise id is | |
| # never shadowed by a sibling sharing its _ident. | |
| for p in deduped: | |
| sc = _sc_by_obj.get(id(p)) | |
| if sc is None: | |
| continue | |
| _collect_scorecards[p.policy_id] = sc | |
| _collect_scorecards.setdefault(_ident(p.policy_id), sc) | |
| return deduped | |
| async def policies_all(session_id: Optional[str] = None): | |
| """The marketplace data feed β every extracted policy + scorecard + filterable fields. | |
| When session_id is provided AND the session has a profile populated to | |
| β₯0.6 completeness, every policy is scored against THAT profile (dynamic | |
| per-user grade). Otherwise we score with the generic baseline weights. | |
| """ | |
| import json as _json | |
| from backend.scorecard import build_scorecard, profile_completeness as _completeness | |
| from backend.session_state import get_session as _get_sess | |
| # Pull user profile if we have one. KI-271 β drive the profile dict off | |
| # brain_tools.SLOT_UNION (via union_snapshot) so every captured slot β | |
| # including copay_pct, desired_sum_insured_inr, family_medical_history, | |
| # health_conditions, age β flows into build_scorecard's profile-aware | |
| # {strengths, caveat} generator (task #31). union_snapshot already drops | |
| # empty/None/[] slots, so presence == captured. | |
| user_profile_dict: Optional[dict] = None | |
| if session_id: | |
| sess = _get_sess(session_id) | |
| p = sess.profile | |
| profile_dict = brain_tools.union_snapshot(p) | |
| # parents_* are NOT in SLOT_UNION's snapshot if False/None but the | |
| # weight-tuner reads parents_to_insure/parents_age_max/parents_has_ped | |
| # explicitly β carry them through (None-safe) without overwriting a | |
| # snapshot value. | |
| for _pf in ("parents_to_insure", "parents_age_max", "parents_has_ped"): | |
| if _pf not in profile_dict: | |
| _v = getattr(p, _pf, None) | |
| if _v is not None: | |
| profile_dict[_pf] = _v | |
| if _completeness(profile_dict) >= 0.6: | |
| user_profile_dict = profile_dict | |
| deduped = _marketplace_catalogue(user_profile_dict) | |
| return MarketplaceResponse( | |
| policies=deduped, | |
| total=len(deduped), | |
| insurers_indexed=len({p.insurer_slug for p in deduped}), | |
| ) | |
| async def compare_policies(policy_ids: list[str] = None): | |
| """Side-by-side comparison of 2-4 policies with their scorecards + field diffs.""" | |
| import json as _json | |
| from backend.scorecard import build_scorecard | |
| from backend.policy_identity import clean_display_policy_name | |
| if not policy_ids: | |
| from fastapi import Query | |
| raise HTTPException(400, "Provide policy_ids as repeated query params") | |
| if len(policy_ids) < 2 or len(policy_ids) > 4: | |
| raise HTTPException(400, "compare requires 2 to 4 policy_ids") | |
| entries = [] | |
| # KI: apply the SAME curated-override as /api/policies/all so COMPARE ALL | |
| # reflects the corrected/verbatim 40-data/policy_facts, not stale extract. | |
| _curated = _load_curated_facts() | |
| for pid in policy_ids: | |
| ep = settings.EXTRACTED_DIR / f"{pid}.json" | |
| data: Optional[dict] = None | |
| if ep.exists(): | |
| try: | |
| data = _json.loads(ep.read_text()) | |
| except Exception: | |
| data = None | |
| if data is not None: | |
| data = _merge_curated( | |
| data, | |
| _curated.get(data.get("policy_id", pid)) or _curated.get(pid), | |
| ) | |
| if data is None: | |
| # #75 (2026-05-18) β curated-only catalogued products (e.g. | |
| # star-health__star-comprehensive, UIN SHAHLIP26044V092526) have | |
| # NO rag/extracted/<pid>.json. The marketplace, single | |
| # /api/scorecard, and bulk scorecard endpoints all resolve these | |
| # from the curated layer; compare_policies alone still 404'd, | |
| # breaking "Compare all" for those policies. Mirror the same | |
| # curated fallback (curated dict also carries doctype-suffixed | |
| # alias keys) instead of raising. | |
| data = ( | |
| _curated.get(pid) | |
| or _curated.get(f"{pid}__wordings") | |
| or _curated.get(f"{pid}__cis") | |
| or _curated.get(f"{pid}__brochure") | |
| or _curated.get(f"{pid}__prospectus") | |
| ) | |
| if not data: | |
| raise HTTPException(404, f"No data for {pid}") | |
| # Insurer reviews for scorecard | |
| slug = data.get("insurer_slug") | |
| ir = None | |
| if slug: | |
| rp = settings.DATA_DIR / "reviews" / f"{slug}.json" | |
| if rp.exists(): | |
| try: ir = _json.loads(rp.read_text()) | |
| except Exception: pass | |
| sc = build_scorecard(data, insurer_reviews=ir) | |
| entries.append(CompareEntry( | |
| policy_id=pid, | |
| policy_name=clean_display_policy_name( | |
| data.get("policy_name", pid) | |
| ), | |
| insurer_slug=slug or "?", | |
| fields=data, | |
| scorecard=ScorecardResponse( | |
| policy_id=sc.policy_id, policy_name=sc.policy_name, insurer_slug=sc.insurer_slug, | |
| overall_score=sc.overall_score, grade=sc.grade, one_liner=sc.one_liner, | |
| sub_scores=[ScorecardSubScore(**s.__dict__) for s in sc.sub_scores], | |
| data_completeness_pct=sc.data_completeness_pct, | |
| methodology_link=sc.methodology_link, | |
| profile_summary=_profile_summary_model(sc), | |
| ), | |
| )) | |
| # Comparison-critical fields, in order | |
| field_order = [ | |
| "policy_type", "uin_code", | |
| "min_entry_age", "max_entry_age", | |
| "sum_insured_options", | |
| "initial_waiting_period_days", "pre_existing_disease_waiting_months", | |
| "maternity_waiting_months", | |
| "pre_hospitalization_days", "post_hospitalization_days", | |
| "day_care_treatments_count", "ayush_coverage", "maternity_coverage", | |
| "newborn_coverage", "organ_donor_expenses", | |
| "no_claim_bonus_pct", "restoration_benefit", | |
| "room_rent_capping", "copayment_pct", "deductible_amount", | |
| "network_hospital_count", "cashless_treatment_supported", | |
| "claim_settlement_ratio", "tat_cashless_authorization_hours", | |
| ] | |
| return CompareResponse(policies=entries, field_order=field_order) | |
| async def policy_scorecard( | |
| policy_id: str, | |
| age: Optional[int] = None, | |
| parents_to_insure: Optional[bool] = None, | |
| budget_band: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| ): | |
| """Compute the 6-sub-score A-F scorecard for an extracted policy. | |
| Now also pulls insurer-level reviews (IRDAI claim ratio + complaints) into | |
| the Claim Experience sub-score. See 70-docs/scorecard-methodology.md. | |
| Β§4c (task #31) β when `session_id` is supplied AND that session's profile | |
| is populated to β₯0.6 completeness, the policy is scored against THAT full | |
| profile (resolved the SAME way /api/policies/all does, via | |
| brain_tools.union_snapshot) so this endpoint's grade + profile_summary | |
| are byte-identical to the marketplace card for the same canonical id. The | |
| standalone `age` / `parents_to_insure` / `budget_band` query params remain | |
| a back-compat fallback when no session profile is available. | |
| """ | |
| import json as _json | |
| from backend.scorecard import build_scorecard | |
| from backend.scorecard import profile_completeness as _completeness | |
| from backend.session_state import get_session as _get_sess | |
| # ROOT-CAUSE FIX (scorecard 404 for catalogued curated-only products): | |
| # /api/policies/all catalogues a card for every extracted JSON AND every | |
| # curated-facts product (40-data/policy_facts/<insurer>__<product>.json). | |
| # Curated-only products (e.g. Tata AIG MediCare Lite β policy_id | |
| # `tata-aig__medicare-lite`) have NO `rag/extracted/<policy_id>.json` β | |
| # only doctype-suffixed extractions like `...__cis.json` β so the old | |
| # `extracted_path.exists() β 404` made the scorecard hard-fail for ~77 of | |
| # 170 catalogued policies, surfacing as the frontend's generic Retry | |
| # fallback. The marketplace builds those cards' grades straight from the | |
| # curated dict (policies_all Pass-2 `build_scorecard(data, ...)`); the | |
| # scorecard endpoint must resolve the SAME way. A catalogued policy_id | |
| # therefore resolves from extracted-with-curated-override OR, when no | |
| # extracted file exists, from the curated layer alone β never a 404 for a | |
| # catalogued product, never a fabricated grade. | |
| _curated = _load_curated_facts() | |
| extracted_path = settings.EXTRACTED_DIR / f"{policy_id}.json" | |
| if extracted_path.exists(): | |
| try: | |
| policy = _json.loads(extracted_path.read_text()) | |
| except Exception as e: | |
| raise HTTPException(500, f"Could not load extracted policy: {e}") | |
| # KI: same curated-override as /api/policies/all so the standalone | |
| # scorecard reflects the corrected/verbatim 40-data/policy_facts. | |
| policy = _merge_curated( | |
| policy, | |
| _curated.get(policy.get("policy_id", policy_id)) or _curated.get(policy_id), | |
| ) | |
| else: | |
| # No bare `<policy_id>.json` extraction. Task #31 PARITY FIX: the | |
| # marketplace card for a doctype-suffixed extracted-only product | |
| # (e.g. star-health__star-cardiac-care, whose only extraction is | |
| # `...__wordings.json`) is built by /api/policies/all Pass-1 from | |
| # that doctype-suffixed EXTRACTED file (preferred over curated via | |
| # _DOCTYPE_RANK). The standalone endpoint previously skipped straight | |
| # to the curated layer, so its grade + profile_summary diverged from | |
| # the card for the SAME canonical id. Mirror the catalogue's doctype | |
| # preference (wordings > prospectus > cis > brochure) on the | |
| # EXTRACTED layer first, with the same curated-override, before | |
| # falling back to a curated-only product. | |
| policy = None | |
| for _dt in ("wordings", "prospectus", "cis", "brochure"): | |
| _ep = settings.EXTRACTED_DIR / f"{policy_id}__{_dt}.json" | |
| if _ep.exists(): | |
| try: | |
| policy = _json.loads(_ep.read_text()) | |
| except Exception: | |
| policy = None | |
| continue | |
| policy = _merge_curated( | |
| policy, | |
| _curated.get(policy.get("policy_id", policy_id)) | |
| or _curated.get(f"{policy_id}__{_dt}") | |
| or _curated.get(policy_id), | |
| ) | |
| break | |
| if policy is None: | |
| # No extraction in ANY doctype β fall back to the human-curated | |
| # facts layer (mirrors /api/policies/all Pass 2). The curated | |
| # dict also carries doctype-suffixed alias keys, so try the | |
| # canonical id and the raw lookup keys. | |
| policy = _curated.get(policy_id) or _curated.get(f"{policy_id}__cis") \ | |
| or _curated.get(f"{policy_id}__wordings") \ | |
| or _curated.get(f"{policy_id}__brochure") \ | |
| or _curated.get(f"{policy_id}__prospectus") | |
| if not policy: | |
| # Genuinely not in EITHER layer β this id is not a catalogued | |
| # product at all (bad/typo id). 404 is the correct, honest | |
| # response here β it is NOT a catalogued policy. | |
| raise HTTPException(404, f"No data for policy_id={policy_id}") | |
| policy = dict(policy) | |
| policy.setdefault("policy_id", policy_id) | |
| # Load insurer reviews if present so the Claim Experience sub-score | |
| # uses authoritative IRDAI data, not just the (mostly-null) per-policy fields. | |
| insurer_reviews = None | |
| slug = policy.get("insurer_slug") | |
| if slug: | |
| rp = settings.DATA_DIR / "reviews" / f"{slug}.json" | |
| if rp.exists(): | |
| try: | |
| insurer_reviews = _json.loads(rp.read_text()) | |
| except Exception: | |
| pass | |
| # Β§4c β resolve the session profile the SAME way /api/policies/all does | |
| # (brain_tools.union_snapshot full dict + parents_* carry-through) so this | |
| # endpoint's grade + profile_summary match the marketplace card for the | |
| # same canonical id by construction. Only when β₯0.6 complete. | |
| # | |
| # `catalogue_profile` is EXACTLY what /api/policies/all would pass to | |
| # _marketplace_catalogue for this session (the β₯0.6 SLOT_UNION snapshot, | |
| # else None) β used below for the catalogue-card parity override. The | |
| # back-compat query-param path is a separate, profile-NEUTRAL-vs-catalogue | |
| # fallback (the catalogue is never built from loose query params). | |
| catalogue_profile: Optional[dict] = None | |
| profile: dict = {} | |
| if session_id: | |
| try: | |
| _p = _get_sess(session_id).profile | |
| _pd = brain_tools.union_snapshot(_p) | |
| for _pf in ("parents_to_insure", "parents_age_max", "parents_has_ped"): | |
| if _pf not in _pd: | |
| _v = getattr(_p, _pf, None) | |
| if _v is not None: | |
| _pd[_pf] = _v | |
| if _completeness(_pd) >= 0.6: | |
| profile = _pd | |
| catalogue_profile = _pd | |
| except Exception: # noqa: BLE001 β bad/expired session β back-compat path | |
| profile = {} | |
| catalogue_profile = None | |
| if not profile: | |
| # Back-compat: standalone query params when no usable session profile. | |
| if age is not None: profile["age"] = age | |
| if parents_to_insure is not None: profile["parents_to_insure"] = parents_to_insure | |
| if budget_band is not None: profile["budget_band"] = budget_band | |
| # TASK #31 β SINGLE SOURCE OF TRUTH (option (a)). When this id IS a | |
| # marketplace card, serve the EXACT `Scorecard` object the | |
| # /api/policies/all catalogue built for that canonical card under THIS | |
| # session's profile β full sub_scores + profile_summary + grade + | |
| # data_completeness + one_liner, all from the catalogue's ONE | |
| # build_scorecard pass on the catalogue's chosen sibling | |
| # `(data, insurer_reviews, profile)`. Parity is byte-identical BY | |
| # CONSTRUCTION: the same object feeds both surfaces, so the endpoint can | |
| # no longer pick a different doctype sibling than the catalogue's | |
| # completeness-based `_best` dedup (the prior bug β the old endpoint | |
| # reconstructed `policy` via its own doctype-rank loop and emitted a | |
| # different strength set / caveat for the same canonical id). | |
| # | |
| # `_catalogue_scorecard` returns None ONLY when the id is not a | |
| # catalogued product at all β then we fall through to the locally-built | |
| # scorecard so the curated-only / back-compat query-param / never-404 | |
| # behaviour the resolution block above guarantees is fully preserved. | |
| cat_sc = None | |
| try: | |
| cat_sc = _catalogue_scorecard(policy_id, catalogue_profile) | |
| except Exception: # noqa: BLE001 β never let the SSOT resolver 500 a card | |
| cat_sc = None | |
| sc = cat_sc if cat_sc is not None else build_scorecard( | |
| policy, insurer_reviews=insurer_reviews, profile=profile or None | |
| ) | |
| return ScorecardResponse( | |
| policy_id=sc.policy_id, | |
| policy_name=sc.policy_name, | |
| insurer_slug=sc.insurer_slug, | |
| overall_score=sc.overall_score, | |
| grade=sc.grade, | |
| one_liner=sc.one_liner, | |
| sub_scores=[ScorecardSubScore(**s.__dict__) for s in sc.sub_scores], | |
| data_completeness_pct=sc.data_completeness_pct, | |
| methodology_link=sc.methodology_link, | |
| insufficient_data=sc.insufficient_data, | |
| profile_summary=_profile_summary_model(sc), | |
| ) | |
| # ---------------------------------------------------------------------------- | |
| # Bulk scorecard endpoint β powers the PolicyCompareModal scorecard widget. | |
| # ---------------------------------------------------------------------------- | |
| # Why bulk: the compare modal renders 2-4 scorecards in parallel and each is | |
| # profile-tuned. Doing N sequential GETs from the client wastes the per-policy | |
| # JSON I/O cost (we re-load every reviews file even for the same insurer) and | |
| # fans out N renders. One POST with the full profile + id list lets us: | |
| # - load each reviews file once per slug (memoized in the loop) | |
| # - return missing policies as N/A so the client renders a clean placeholder | |
| # - share one profile dict β no copy-paste of every field in N query strings | |
| class BulkScorecardRequest(BaseModel): | |
| policy_ids: list[str] | |
| profile: Optional[dict] = None | |
| class BulkScorecardEntry(BaseModel): | |
| policy_id: str | |
| policy_name: str | |
| insurer_slug: str | |
| overall_grade: str # "A" / "B+" / etc β letter only for missing | |
| overall_score: int # 0-100 | |
| sub_scores: dict[str, int] # {coverage_breadth: 82, cost_predictability: 64, ...} | |
| profile_rationale: list[str] # bullets explaining WHY this score for this user | |
| data_completeness_pct: float | |
| one_liner: str = "" | |
| # raw signals per sub-score so the widget can pop-out a tooltip with detail | |
| signals: dict[str, list[str]] = Field(default_factory=dict) | |
| # Deterministic, profile-aware {strengths, caveat} β the structured | |
| # replacement for the generic one_liner the widget now renders at top. | |
| profile_summary: Optional[ProfileSummaryModel] = None | |
| class BulkScorecardResponse(BaseModel): | |
| per_policy: dict[str, BulkScorecardEntry] | |
| def _slugify_subscore(name: str) -> str: | |
| """'Coverage Breadth' -> 'coverage_breadth' (stable key for the widget).""" | |
| return name.lower().replace("-", "_").replace("&", "and").replace(" ", "_").replace("__", "_") | |
| def _profile_rationale_for(policy: dict, profile: Optional[dict], sub_scores) -> list[str]: | |
| """Turn raw signals + profile facts into 2-5 plain-English bullets. | |
| Each bullet is shaped as 'Strong fit:' or 'Weak fit:' so the buyer can scan | |
| pros and cons at a glance. We anchor each bullet to a concrete profile | |
| attribute (you mentioned X) so the user trusts the personalization is real. | |
| """ | |
| if not profile: | |
| return [] | |
| bullets: list[str] = [] | |
| conditions = profile.get("health_conditions") or [] | |
| cond_str = " ".join(str(c).lower() for c in conditions) if isinstance(conditions, list) else "" | |
| age = profile.get("age") if isinstance(profile.get("age"), int) else None | |
| deps = (profile.get("dependents") or "").lower() | |
| loc = profile.get("location_tier") | |
| goal = (profile.get("primary_goal") or "").lower() | |
| existing = profile.get("existing_cover_inr") | |
| # Pre-existing disease handling | |
| if cond_str and any(c in cond_str for c in ("diab", "bp", "hyper", "thyroid", "heart")): | |
| ped = policy.get("pre_existing_disease_waiting_months") | |
| try: | |
| ped_n = int(ped) if ped is not None else None | |
| except (TypeError, ValueError): | |
| ped_n = None | |
| if ped_n is not None: | |
| if ped_n <= 24: | |
| bullets.append(f"Strong fit: PED waiting is only {ped_n} months β short for your {cond_str.strip()}.") | |
| elif ped_n >= 48: | |
| bullets.append(f"Weak fit: {ped_n}-month PED waiting is long for your {cond_str.strip()} β alternatives offer 24-36 months.") | |
| else: | |
| bullets.append(f"Fair fit: {ped_n}-month PED waiting is standard for your {cond_str.strip()}.") | |
| # Senior + claim reliability | |
| if age and age >= 60: | |
| nh = policy.get("network_hospital_count") | |
| try: | |
| nh_n = int(nh) if nh is not None else None | |
| except (TypeError, ValueError): | |
| nh_n = None | |
| if nh_n is not None and nh_n >= 7000: | |
| bullets.append(f"Strong fit: {nh_n:,}+ cashless hospitals matters at age {age} when access speed counts.") | |
| elif nh_n is not None and nh_n < 3000: | |
| bullets.append(f"Weak fit: only {nh_n} cashless hospitals β thin network for age {age}.") | |
| # max_renewal_age removed: lifelong renewability is the IRDAI norm for | |
| # every health-indemnity policy (universal β not a differentiator, and | |
| # the old `>= 99` check fired on the fabricated 999 sentinel). | |
| # Family + room-rent / maternity | |
| if any(k in deps for k in ("spouse", "wife", "husband", "partner", "kid", "child", "family")): | |
| rrc = policy.get("room_rent_capping") | |
| rrc_text = rrc if isinstance(rrc, str) else (rrc.get("limit_text") if isinstance(rrc, dict) else None) | |
| if rrc_text and "no cap" in rrc_text.lower(): | |
| bullets.append("Strong fit: no room-rent cap β works for any hospital your family chooses.") | |
| elif rrc_text and ("1%" in rrc_text or "%" in rrc_text): | |
| metro_qual = " in a metro" if loc == "metro" else "" | |
| bullets.append(f"Weak fit: room rent capped ({rrc_text[:40].strip()}) may be tight for hospitals{metro_qual}.") | |
| if any(k in deps for k in ("spouse", "wife", "husband", "partner")): | |
| mc = policy.get("maternity_coverage") | |
| covered = mc.get("covered") if isinstance(mc, dict) else mc | |
| if covered is True: | |
| mw = policy.get("maternity_waiting_months") | |
| bullets.append( | |
| f"Strong fit: maternity covered with {mw}-month wait β relevant to your spouse." | |
| if mw else | |
| "Strong fit: maternity covered β relevant to your spouse." | |
| ) | |
| elif covered is False: | |
| bullets.append("Weak fit: no maternity coverage β you'd need a separate rider.") | |
| # First-time buyer β simplicity / premium predictability | |
| if existing == 0: | |
| copay = policy.get("copayment_pct") | |
| try: | |
| copay_n = float(copay) if copay is not None else None | |
| except (TypeError, ValueError): | |
| copay_n = None | |
| if copay_n is not None and copay_n == 0: | |
| bullets.append("Strong fit: zero co-pay β simpler to budget for as a first-time buyer.") | |
| elif copay_n is not None and copay_n >= 20: | |
| bullets.append(f"Weak fit: {copay_n:.0f}% co-pay adds a surprise out-of-pocket β hard to plan as a first-time buyer.") | |
| # Tax-saving goal anchor | |
| if "tax" in goal: | |
| bullets.append("Note: premium qualifies for Section 80D deduction β aligned with your tax-saving goal.") | |
| # If we still have <2 bullets, fall back to top sub-score deltas vs neutral | |
| if len(bullets) < 2: | |
| ranked = sorted(sub_scores, key=lambda s: s.score, reverse=True) | |
| if ranked: | |
| top = ranked[0] | |
| bullets.append(f"Strongest area: {top.name} ({top.score}/100) β {top.summary.lower()}.") | |
| if len(ranked) > 1: | |
| bot = ranked[-1] | |
| if bot.score < 60: | |
| bullets.append(f"Watch out: {bot.name} ({bot.score}/100) β {bot.summary.lower()}.") | |
| return bullets[:5] | |
| def _letter_grade_with_plus(score: int) -> str: | |
| """Convert 0-100 to A / A- / B+ / B / B- / C+ / C / C- / D / F. | |
| The base grade_for() returns flat letters (A/B/C/D/F). For the compare | |
| widget the buyer wants finer distinction between e.g. an 84 (top of B) and | |
| a 71 (bottom of B). Thresholds: | |
| 90+ A, 85-89 A-, 80-84 B+, 75-79 B, 70-74 B-, | |
| 65-69 C+, 60-64 C, 55-59 C-, 40-54 D, <40 F. | |
| """ | |
| if score >= 90: return "A" | |
| if score >= 85: return "A-" | |
| if score >= 80: return "B+" | |
| if score >= 75: return "B" | |
| if score >= 70: return "B-" | |
| if score >= 65: return "C+" | |
| if score >= 60: return "C" | |
| if score >= 55: return "C-" | |
| if score >= 40: return "D" | |
| return "F" | |
| async def scorecard_bulk(req: BulkScorecardRequest): | |
| """Compute profile-tuned scorecards for N policies in one round-trip. | |
| Body: { policy_ids: [...], profile: {...} } | |
| Returns: { per_policy: { <policy_id>: { overall_grade, overall_score, | |
| sub_scores, profile_rationale, | |
| data_completeness_pct } } } | |
| Missing policy_ids get overall_grade="N/A" + rationale=["Data not indexed"]. | |
| """ | |
| import json as _json | |
| from backend.scorecard import build_scorecard | |
| if not req.policy_ids: | |
| raise HTTPException(400, "policy_ids must be a non-empty list") | |
| if len(req.policy_ids) > 8: | |
| raise HTTPException(400, "bulk scorecard caps at 8 policies per call") | |
| profile = req.profile or None | |
| insurer_cache: dict[str, Optional[dict]] = {} | |
| out: dict[str, BulkScorecardEntry] = {} | |
| # KI: same curated-override as /api/policies/all so the bulk scorecard | |
| # badges reflect the corrected/verbatim 40-data/policy_facts. | |
| _curated = _load_curated_facts() | |
| for pid in req.policy_ids: | |
| extracted_path = settings.EXTRACTED_DIR / f"{pid}.json" | |
| policy = None | |
| if extracted_path.exists(): | |
| try: | |
| policy = _json.loads(extracted_path.read_text()) | |
| except Exception as e: | |
| out[pid] = BulkScorecardEntry( | |
| policy_id=pid, policy_name=pid, insurer_slug="?", | |
| overall_grade="N/A", overall_score=0, sub_scores={}, | |
| profile_rationale=[f"Data unreadable: {e}"], | |
| data_completeness_pct=0.0, | |
| one_liner="Extraction file is corrupted.", | |
| signals={}, | |
| ) | |
| continue | |
| policy = _merge_curated( | |
| policy, _curated.get(policy.get("policy_id", pid)) or _curated.get(pid) | |
| ) | |
| else: | |
| # ROOT-CAUSE FIX #60 (2026-05-18): curated-only catalogued | |
| # products (e.g. star-health__star-comprehensive, UIN | |
| # SHAHLIP26044V092526) have NO rag/extracted/<pid>.json β only | |
| # doctype-suffixed extractions. The marketplace + the single | |
| # /api/scorecard endpoint already resolve these from the curated | |
| # layer (policies_all Pass-2 / lines ~3617). The BULK endpoint | |
| # did not, so it emitted the N/A "No extraction available" / | |
| # "Data not indexed" sentinel β the broken-card defect the user | |
| # saw for Star Comprehensive. Mirror that curated fallback here; | |
| # the curated dict also carries doctype-suffixed alias keys. | |
| policy = ( | |
| _curated.get(pid) | |
| or _curated.get(f"{pid}__wordings") | |
| or _curated.get(f"{pid}__cis") | |
| or _curated.get(f"{pid}__brochure") | |
| or _curated.get(f"{pid}__prospectus") | |
| ) | |
| if not policy: | |
| # Genuinely absent from BOTH layers β not a catalogued product. | |
| out[pid] = BulkScorecardEntry( | |
| policy_id=pid, | |
| policy_name=pid, | |
| insurer_slug="?", | |
| overall_grade="N/A", | |
| overall_score=0, | |
| sub_scores={}, | |
| profile_rationale=["Data not indexed"], | |
| data_completeness_pct=0.0, | |
| one_liner="No extraction available for this policy.", | |
| signals={}, | |
| ) | |
| continue | |
| slug = policy.get("insurer_slug") or "?" | |
| if slug not in insurer_cache: | |
| insurer_cache[slug] = None | |
| rp = settings.DATA_DIR / "reviews" / f"{slug}.json" | |
| if rp.exists(): | |
| try: | |
| insurer_cache[slug] = _json.loads(rp.read_text()) | |
| except Exception: | |
| insurer_cache[slug] = None | |
| sc = build_scorecard(policy, insurer_reviews=insurer_cache[slug], profile=profile) | |
| sub_map = {_slugify_subscore(s.name): s.score for s in sc.sub_scores} | |
| signal_map = {_slugify_subscore(s.name): s.signals for s in sc.sub_scores} | |
| psm = _profile_summary_model(sc) | |
| # Bridge the legacy profile_rationale list off the deterministic | |
| # profile_summary so the old field stays populated AND consistent | |
| # with the new structured data (strengths + [caveat]). Only fall | |
| # back to the heuristic _profile_rationale_for when the deterministic | |
| # summary produced too little to be useful (insufficient-data / | |
| # profile-neutral with <3 facts) so no surface goes blank. | |
| if psm and psm.strengths: | |
| rationale = list(psm.strengths) | |
| if psm.caveat: | |
| rationale.append(psm.caveat) | |
| else: | |
| rationale = _profile_rationale_for(policy, profile, sc.sub_scores) | |
| out[pid] = BulkScorecardEntry( | |
| policy_id=sc.policy_id or pid, | |
| policy_name=sc.policy_name or pid, | |
| insurer_slug=sc.insurer_slug or slug, | |
| overall_grade=_letter_grade_with_plus(sc.overall_score), | |
| overall_score=sc.overall_score, | |
| sub_scores=sub_map, | |
| profile_rationale=rationale, | |
| data_completeness_pct=sc.data_completeness_pct, | |
| one_liner=sc.one_liner, | |
| signals=signal_map, | |
| profile_summary=psm, | |
| ) | |
| return BulkScorecardResponse(per_policy=out) | |
| class ReviewsResponse(BaseModel): | |
| insurer_slug: str | |
| insurer_name: str | |
| # #76 β these structured sub-objects are NOT present in every review | |
| # file (e.g. acko.json has none); requiring them 500'd the endpoint and | |
| # blanked the whole reputation panel even though real data existed. | |
| # Default-empty so the endpoint always returns 200 with whatever real | |
| # data the file does have (InsurerReviewsBlock already renders each | |
| # sub-object conditionally). | |
| aggregate_score: dict = Field(default_factory=dict) | |
| claim_metrics: dict = Field(default_factory=dict) | |
| aggregator_ratings: dict = Field(default_factory=dict) | |
| reddit_sentiment: dict = Field(default_factory=dict) | |
| youtube_coverage: dict = Field(default_factory=dict) | |
| in_news: list = Field(default_factory=list) | |
| last_updated: str = "" | |
| async def get_reviews(insurer_slug: str): | |
| """Aggregated reviews + claim metrics for an insurer. | |
| Data sourced from IRDAI annual report + PolicyBazaar/InsuranceDekho + | |
| Reddit r/IndianFinance + YouTube finance creators (Ditto et al) + | |
| news mentions. Per-insurer JSON at 40-data/reviews/<slug>.json β see | |
| 40-data/reviews/INDEX.md for leaderboard. | |
| """ | |
| import json | |
| p = settings.DATA_DIR / "reviews" / f"{insurer_slug}.json" | |
| if not p.exists(): | |
| raise HTTPException(404, f"No reviews for insurer={insurer_slug}") | |
| try: | |
| d = json.loads(p.read_text()) | |
| return ReviewsResponse(**d) | |
| except Exception as e: | |
| raise HTTPException(500, f"Failed to load reviews: {e}") | |
| class PremiumEstimateRequest(BaseModel): | |
| age: int = Field(..., ge=0, le=120) | |
| sum_insured_inr: int = Field(..., ge=100000, le=100000000) | |
| city_tier: str = Field("metro", pattern="^(metro|tier1|tier2)$") | |
| smoker: bool = False | |
| # family_size: 0 is the slider "self-only" sentinel (treated identical to 1) | |
| family_size: int = Field(1, ge=0, le=8) | |
| policy_id: Optional[str] = None | |
| # Pre-existing condition flag β controls PED premium load. Allowed values | |
| # mirror the FALLBACK_PED keys in backend/premium_calculator.py | |
| pre_existing_conditions: str = Field( | |
| "none", | |
| pattern="^(none|diabetes_or_hypertension|heart_disease|multiple)$", | |
| ) | |
| # Voluntary co-payment % β reduces premium ~7% per 10pp of co-pay | |
| copayment_pct: float = Field(0.0, ge=0, le=40) | |
| # Family medical history tokens (cancer / diabetes / heart_disease / β¦). | |
| # Applies the same family_history_loading (1.0Γβ1.10Γ) the header band and | |
| # bulk path use, so the per-policy panel reflects family history too (#52). | |
| family_medical_history: Optional[list[str]] = None | |
| # B2 widget parity (KI-bugfix, 2026-05-15) β optional slider overrides so | |
| # PolicyPremiumWidget (compare modal) can use the same curated-anchored | |
| # estimate() pipeline as PremiumCalculatorPanel instead of the divergent | |
| # bulk_estimate() flat-base path. Applied as straight multipliers on top | |
| # of the estimate() result using premium_calculator.BULK_TENURE_MULT / | |
| # BULK_DEDUCTIBLE_DISCOUNT constants β leaves estimate() math untouched. | |
| tenure_years: Optional[int] = Field(None, ge=1, le=3) | |
| deductible_inr: Optional[int] = Field(None, ge=0, le=200_000) | |
| class PremiumEstimateResponse(BaseModel): | |
| policy_id: str | |
| point_estimate_inr: int | |
| low_inr: int | |
| high_inr: int | |
| methodology: str | |
| sources: list[str] | |
| is_illustrative: bool = True | |
| disclaimer: str = ( | |
| "Illustrative range only β actual premium depends on underwriting + " | |
| "medical history + risk factors. Confirm with the insurer before purchase." | |
| ) | |
| # Echo back the effective tenure / deductible so the widget can render a | |
| # consistent breakdown line without re-deriving them. Optional for legacy | |
| # callers (PremiumCalculatorPanel ignores both). | |
| tenure_years: Optional[int] = None | |
| deductible_inr: Optional[int] = None | |
| # BUG #29 β whether THIS policy genuinely offers a user-selectable | |
| # voluntary deductible (curated deductible_amount > 0 AND not a | |
| # top-up). Only ~2 of 148 do. The widget hides the deductible selector | |
| # entirely when False; allowed_deductibles is the exact pill set. | |
| supports_voluntary_deductible: bool = False | |
| allowed_deductibles: list[int] = [0] | |
| # True when the underlying estimate() anchored to a curated quote sample. | |
| # PolicyPremiumWidget uses this (instead of bulk_estimate's `assumed` flag) | |
| # to decide whether to show its "Estimate" badge. | |
| base_sample_used: bool = False | |
| # D2 (2026-05-16) β non-null ONLY when the policy publishes no | |
| # corroborated Sum Insured, so this estimate was priced against a | |
| # fallback cover. The frontend renders it verbatim under the estimate: | |
| # "Estimate shown for βΉX cover β this policy's sum insured isn't published." | |
| sum_insured_disclosure: Optional[str] = None | |
| async def premium_estimate(req: PremiumEstimateRequest): | |
| """Illustrative premium calculator β rules-based estimate from curated public data.""" | |
| from backend.premium_calculator import ( | |
| estimate as _estimate, | |
| BULK_TENURE_MULT, | |
| BULK_DEDUCTIBLE_DISCOUNT, | |
| ) | |
| e = _estimate( | |
| age=req.age, | |
| sum_insured_inr=req.sum_insured_inr, | |
| city_tier=req.city_tier, | |
| smoker=req.smoker, | |
| family_size=req.family_size, | |
| policy_id=req.policy_id, | |
| pre_existing_conditions=req.pre_existing_conditions, | |
| copayment_pct=req.copayment_pct, | |
| family_medical_history=req.family_medical_history, | |
| ) | |
| # Snap incoming tenure / deductible to the nearest supported bucket so the | |
| # widget can pass raw slider values without precomputing. | |
| point = e.point_estimate_inr | |
| low = e.low_inr | |
| high = e.high_inr | |
| effective_tenure: Optional[int] = None | |
| effective_ded: Optional[int] = None | |
| if req.tenure_years is not None: | |
| effective_tenure = req.tenure_years if req.tenure_years in BULK_TENURE_MULT else 1 | |
| tenure_mult = BULK_TENURE_MULT.get(effective_tenure, 1.0) | |
| point = int(round(point * tenure_mult)) | |
| low = int(round(low * tenure_mult)) | |
| high = int(round(high * tenure_mult)) | |
| # BUG #29 β resolve whether this policy genuinely supports a voluntary | |
| # deductible. Only ~2 of 148 do; for every other policy a caller-supplied | |
| # deductible must NOT discount the premium. | |
| from backend.premium_calculator import policy_deductible_support | |
| _supports, _allowed = policy_deductible_support(req.policy_id) | |
| if req.deductible_inr is not None: | |
| if not _supports or req.deductible_inr not in _allowed: | |
| # Unsupported policy (or a value outside this policy's allowed | |
| # set) β no phantom discount, honest echo. | |
| effective_ded = 0 | |
| elif req.deductible_inr in BULK_DEDUCTIBLE_DISCOUNT: | |
| effective_ded = req.deductible_inr | |
| else: | |
| effective_ded = min( | |
| BULK_DEDUCTIBLE_DISCOUNT.keys(), | |
| key=lambda d: abs(d - req.deductible_inr), | |
| ) | |
| ded_mult = BULK_DEDUCTIBLE_DISCOUNT.get(effective_ded, 1.0) | |
| point = int(round(point * ded_mult)) | |
| low = int(round(low * ded_mult)) | |
| high = int(round(high * ded_mult)) | |
| # D2 β when this policy publishes NO corroborated Sum Insured, the | |
| # estimate was necessarily priced against a fallback cover (the SI the | |
| # caller sent, which the per-policy estimator seeds from | |
| # desired_sum_insured_inr ?? βΉ10 L). Surface the verbatim disclosure so | |
| # the user knows the SI is assumed, not the policy's own. | |
| si_disclosure: Optional[str] = None | |
| if req.policy_id: | |
| try: | |
| _siv = _policy_corroborated_si(req.policy_id) | |
| if _siv.kind == "none": | |
| from backend.premium_calculator import unpublished_si_disclosure | |
| si_disclosure = unpublished_si_disclosure(req.sum_insured_inr) | |
| except Exception: | |
| si_disclosure = None | |
| return PremiumEstimateResponse( | |
| policy_id=e.policy_id, | |
| point_estimate_inr=point, | |
| low_inr=low, | |
| high_inr=high, | |
| methodology=e.methodology, | |
| sources=e.sources or [], | |
| tenure_years=effective_tenure, | |
| deductible_inr=effective_ded, | |
| supports_voluntary_deductible=_supports, | |
| allowed_deductibles=_allowed, | |
| base_sample_used=e.base_sample_used is not None, | |
| sum_insured_disclosure=si_disclosure, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # /api/premium/bulk β multi-policy slider-driven premium calculator | |
| # Powers the PolicyPremiumWidget inside PolicyCompareModal. | |
| # --------------------------------------------------------------------------- | |
| class PremiumBulkProfile(BaseModel): | |
| age: Optional[int] = Field(None, ge=0, le=120) | |
| dependents: Optional[str] = None | |
| location_tier: Optional[str] = None | |
| family_size: Optional[int] = Field(None, ge=0, le=10) | |
| smoker: Optional[bool] = False | |
| pre_existing_conditions: Optional[str] = "none" | |
| class PremiumBulkOverride(BaseModel): | |
| sum_insured_inr: Optional[int] = Field(None, ge=100_000, le=100_000_000) | |
| tenure_years: Optional[int] = Field(None, ge=1, le=3) | |
| deductible_inr: Optional[int] = Field(None, ge=0, le=200_000) | |
| class PremiumBulkRequest(BaseModel): | |
| policy_ids: list[str] = Field(..., min_length=1, max_length=20) | |
| profile: PremiumBulkProfile = Field(default_factory=PremiumBulkProfile) | |
| overrides: Optional[dict[str, PremiumBulkOverride]] = None | |
| class PremiumBulkRow(BaseModel): | |
| policy_id: str | |
| premium_inr_annual: int | |
| breakdown: dict | |
| sum_insured_inr: int | |
| tenure_years: int | |
| deductible_inr: int | |
| assumed: bool | |
| notes: list[str] = [] | |
| class PremiumBulkResponse(BaseModel): | |
| per_policy: dict[str, PremiumBulkRow] | |
| profile_used: PremiumBulkProfile | |
| disclaimer: str = ( | |
| "Illustrative estimates only β actual premiums depend on underwriting, " | |
| "medical history, and quote-time risk factors. Confirm with the insurer." | |
| ) | |
| async def premium_bulk(req: PremiumBulkRequest): | |
| """Bulk slider-driven premium estimator for the PolicyCompareModal widget.""" | |
| from backend.premium_calculator import bulk_estimate as _bulk | |
| overrides = { | |
| pid: (ov.model_dump(exclude_none=True) if ov else {}) | |
| for pid, ov in (req.overrides or {}).items() | |
| } | |
| rows = _bulk( | |
| policy_ids=req.policy_ids, | |
| profile=req.profile.model_dump(exclude_none=True), | |
| overrides=overrides, | |
| ) | |
| return PremiumBulkResponse( | |
| per_policy={ | |
| pid: PremiumBulkRow( | |
| policy_id=r.policy_id, | |
| premium_inr_annual=r.premium_inr_annual, | |
| breakdown=r.breakdown, | |
| sum_insured_inr=r.sum_insured_inr, | |
| tenure_years=r.tenure_years, | |
| deductible_inr=r.deductible_inr, | |
| assumed=r.assumed, | |
| notes=r.notes, | |
| ) | |
| for pid, r in rows.items() | |
| }, | |
| profile_used=req.profile, | |
| ) | |
| async def tts(req: TTSRequest): | |
| """Standalone TTS endpoint β returns base64 WAV.""" | |
| try: | |
| audio = await get_tts().synthesize( | |
| text=req.text, | |
| language_code=req.language_code, | |
| speaker=req.speaker, | |
| ) | |
| except Exception as e: | |
| raise HTTPException(500, f"TTS failed: {type(e).__name__}: {e}") | |
| return JSONResponse({"audio_base64": base64.b64encode(audio).decode("utf-8")}) | |
| async def api_root(): | |
| return { | |
| "service": "Insurance Sales Portfolio Expert API", | |
| "version": "0.1.0", | |
| "docs": "/docs", | |
| "health": "/api/health", | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Profile-level predicted-premium BAND β feeds the chat-UI chip that sits | |
| # next to the "X% DONE" profile-completeness pill. Updates reactively as the | |
| # profile fills in (frontend refetches whenever completeness_pct changes). | |
| # --------------------------------------------------------------------------- | |
| class PredictedPremiumBandResponse(BaseModel): | |
| min_inr: int | |
| median_inr: int | |
| max_inr: int | |
| sample_size: int | |
| assumed: bool | |
| # #63 β the SI the band was priced at. estimate_premium_band's KI-278 | |
| # contract already returns this; the model dropped it, so the pill | |
| # couldn't tell the user the band is the TYPICAL cohort range at this | |
| # cover (vs the per-plan LIVE PREMIUM, which is one specific plan and | |
| # may sit outside the typical band β expected, not a contradiction). | |
| sum_insured_used: int = 0 | |
| async def predicted_premium_band(session_id: Optional[str] = None): | |
| """Return the user's estimated premium band aggregated across a | |
| representative basket of marketplace policies. Mirrors the slot-shape | |
| used by /api/profile/completeness so the chip and the bar share triggers. | |
| """ | |
| from backend.premium_calculator import estimate_premium_band | |
| from backend.session_state import get_session | |
| if not session_id: | |
| return PredictedPremiumBandResponse( | |
| min_inr=0, median_inr=0, max_inr=0, sample_size=0, assumed=True, | |
| sum_insured_used=0, | |
| ) | |
| sess = get_session(session_id) | |
| p = sess.profile | |
| # KI-271 β band endpoint now drives off SLOT_UNION so copay_pct + | |
| # family_medical_history (D2/KI-269) actually shift the band. Prior | |
| # 12-key hand-roll silently omitted both β E3 smoke caught identical | |
| # bands with/without copay+family input. | |
| profile_dict = { | |
| slot: getattr(p, slot, None) for slot in brain_tools.SLOT_UNION | |
| } | |
| # Same answered-only gate as profile_completeness_view (KI-196 / ADR-041) β | |
| # only feed slots the user has actually answered, not pre-populated | |
| # defaults. Keeps the band stable until the user has actually said | |
| # something meaningful. | |
| answered = set(getattr(p, "asked", []) or []) | |
| filtered_profile = { | |
| k: (v if k in answered else None) for k, v in profile_dict.items() | |
| } | |
| band = estimate_premium_band(filtered_profile) | |
| return PredictedPremiumBandResponse(**band) | |
| # /api/profile/recall-by-name was REMOVED in ADR-043 (2026-05-27). | |
| # Cross-session profile recall is gone β sessions are in-memory only, so | |
| # there is nothing to "recall" off a bare name. The frontend api.ts caller | |
| # that wrapped this endpoint has also been removed. Old clients still | |
| # pinging the path get a 404, which is the correct degraded behaviour. | |
| # ---- Static frontend (served alongside /api on the same port for HF Spaces) ---- | |
| # The Next.js frontend is statically exported during the Docker build to | |
| # /app/frontend/out. In local dev, this directory may not exist β we still | |
| # want the backend to start cleanly. | |
| import os | |
| from pathlib import Path as _Path | |
| _FRONTEND_DIR = _Path(__file__).resolve().parent.parent / "frontend" / "out" | |
| if _FRONTEND_DIR.exists(): | |
| # Serve the built site as the catch-all. /api/* routes registered above | |
| # take precedence because they are matched first. | |
| app.mount("/", StaticFiles(directory=str(_FRONTEND_DIR), html=True), name="static") | |
| else: | |
| async def root(): | |
| return { | |
| "service": "Insurance Sales Portfolio Expert API", | |
| "version": "0.1.0", | |
| "frontend": "not built β run `cd frontend && npm run build`", | |
| "docs": "/docs", | |
| "health": "/api/health", | |
| } | |
| # ---- #40 SSOT grade resolver ------------------------------------------------ | |
| # marketplace_grade(policy_id) returns the SAME (grade, overall) the | |
| # marketplace card for that policy's canonical identity shows. The | |
| # recommendation path calls this instead of re-deriving a scorecard, so | |
| # rec-card grade == marketplace grade for ALL 148 by construction. | |
| import threading as _mg_threading | |
| _MG_LOCK = _mg_threading.Lock() | |
| _MG_CACHE: dict = {"sig": None, "index": None} | |
| def _mg_data_signature() -> tuple: | |
| """Cheap fingerprint that changes when any grading input changes | |
| (so an uploaded-PDF card or a curated edit invalidates the cache).""" | |
| sig = [] | |
| for d in (settings.EXTRACTED_DIR, settings.DATA_DIR / "policy_facts", | |
| settings.DATA_DIR / "reviews"): | |
| try: | |
| for fp in sorted(d.glob("*.json")): | |
| st = fp.stat() | |
| sig.append((fp.name, int(st.st_mtime), st.st_size)) | |
| except Exception: # noqa: BLE001 β missing dir β empty contribution | |
| continue | |
| # #52 β PERSISTED uploaded-doc records are ALSO grading inputs | |
| # (_load_curated_facts merges them). Walk the persistent UPLOADED_DOCS_DIR | |
| # so a brand-new upload β or a restart that re-materialised the dir β | |
| # invalidates the #40 grade cache and the new card grades immediately. | |
| try: | |
| for fp in sorted(settings.UPLOADED_DOCS_DIR.glob("*/record.json")): | |
| st = fp.stat() | |
| sig.append((str(fp.relative_to(settings.UPLOADED_DOCS_DIR)), | |
| int(st.st_mtime), st.st_size)) | |
| except Exception: # noqa: BLE001 β missing dir β empty contribution | |
| pass | |
| return tuple(sig) | |
| def _mg_norm_uin(raw) -> str: | |
| try: | |
| from backend.policy_identity import normalize_uin | |
| return normalize_uin(raw) | |
| except Exception: # noqa: BLE001 | |
| return "" | |
| def _mg_build_index() -> dict: | |
| """{lookup_key -> (grade, overall_score)} for every marketplace card, | |
| keyed by policy_id, product_key, and normalised UIN so a variant / | |
| alias id resolves to its canonical card's grade.""" | |
| cards = _marketplace_catalogue(None) | |
| cur = _load_curated_facts() | |
| idx: dict = {} | |
| def _put(k, val): | |
| if k: | |
| idx.setdefault(k, val) | |
| for c in cards: | |
| val = (c.grade, c.overall_score) | |
| pid = c.policy_id or "" | |
| _put(f"id:{pid}", val) | |
| try: | |
| from backend.policy_identity import product_key as _pk | |
| _put(f"pk:{_pk(pid)}", val) | |
| except Exception: # noqa: BLE001 | |
| pass | |
| # UIN of the card's underlying data (curated wins, like the catalogue) | |
| cdata = cur.get(pid) or {} | |
| uin = _mg_norm_uin(cdata.get("uin_code") or cdata.get("uin")) | |
| if not uin: | |
| try: | |
| ep = settings.EXTRACTED_DIR / f"{pid}.json" | |
| if ep.exists(): | |
| import json as _j | |
| uin = _mg_norm_uin(_j.loads(ep.read_text()).get("uin_code")) | |
| except Exception: # noqa: BLE001 | |
| uin = "" | |
| _put(f"uin:{uin}" if uin else "", val) | |
| return idx | |
| def _mg_index() -> dict: | |
| sig = _mg_data_signature() | |
| with _MG_LOCK: | |
| if _MG_CACHE["sig"] != sig or _MG_CACHE["index"] is None: | |
| _MG_CACHE["index"] = _mg_build_index() | |
| _MG_CACHE["sig"] = sig | |
| return _MG_CACHE["index"] | |
| # Task #31 β profile-keyed {policy_id -> MarketplacePolicy} index so the | |
| # single /api/policies/{id}/scorecard endpoint can serve the EXACT card the | |
| # /api/policies/all catalogue produced for that id. This is the only way to | |
| # guarantee byte-identical profile_summary (and grade / overall_score) for | |
| # every card id β including doctype-suffixed stems the catalogue's pre- | |
| # existing #133/#145 dedup picks as the canonical card-id while computing | |
| # the scorecard from a different-doctype sibling. We do NOT re-architect | |
| # that dedup (out of scope, protected by test_full_id_universe_parity); | |
| # instead the endpoint defers to the catalogue, the single source of truth. | |
| _CAT_CARD_LOCK = _mg_threading.Lock() | |
| _CAT_CARD_CACHE: dict = {} # profile_key -> (sig, card_idx, sc_idx) | |
| def _profile_cache_key(profile: Optional[dict]) -> str: | |
| if not profile: | |
| return "β " | |
| import json as _j | |
| return _j.dumps( | |
| {k: profile[k] for k in sorted(profile)}, | |
| sort_keys=True, default=str, | |
| ) | |
| def _catalogue_indices(profile: Optional[dict]) -> tuple[dict, dict]: | |
| """`({policy_id -> MarketplacePolicy}, {policy_id|_ident -> Scorecard})` | |
| for `profile`, cached on the data signature + a stable profile key so | |
| repeated single-scorecard calls in one render don't rebuild the catalogue | |
| per request. | |
| The Scorecard index is the Task #31 single-source-of-truth: it holds the | |
| EXACT `Scorecard` object `_marketplace_catalogue` built for each surviving | |
| card (full sub_scores + profile_summary + grade), keyed by the card's | |
| policy_id AND its canonical `_ident`. `/api/policies/{id}/scorecard` | |
| serves it verbatim, so its scorecard is byte-identical to the | |
| /api/policies/all card for the same canonical id by construction.""" | |
| sig = _mg_data_signature() | |
| pkey = _profile_cache_key(profile) | |
| with _CAT_CARD_LOCK: | |
| entry = _CAT_CARD_CACHE.get(pkey) | |
| if entry is None or entry[0] != sig: | |
| sc_idx: dict = {} | |
| cards = _marketplace_catalogue(profile, _collect_scorecards=sc_idx) | |
| card_idx = {c.policy_id: c for c in cards} | |
| _CAT_CARD_CACHE[pkey] = (sig, card_idx, sc_idx) | |
| # Bound the cache so distinct profiles don't grow it unbounded. | |
| if len(_CAT_CARD_CACHE) > 16: | |
| for k in list(_CAT_CARD_CACHE.keys())[:-16]: | |
| _CAT_CARD_CACHE.pop(k, None) | |
| entry = _CAT_CARD_CACHE[pkey] | |
| return entry[1], entry[2] | |
| def _catalogue_card_index(profile: Optional[dict]) -> dict: | |
| """{policy_id -> MarketplacePolicy} for `profile` (back-compat shim).""" | |
| return _catalogue_indices(profile)[0] | |
| # Doctype suffixes used to canonicalise a requested policy_id onto the | |
| # catalogue's surviving card id (mirrors _marketplace_catalogue._ident). | |
| _SCORECARD_DOCT = ("wordings", "brochure", "cis", "prospectus", "policy") | |
| def _canonical_ident(pid: str) -> str: | |
| for dt in _SCORECARD_DOCT: | |
| if pid.endswith(f"__{dt}"): | |
| return pid[: -(len(dt) + 2)] | |
| return pid | |
| def _catalogue_scorecard(policy_id: str, profile: Optional[dict]): | |
| """The EXACT `Scorecard` the /api/policies/all catalogue produced for | |
| `policy_id`'s canonical card under `profile`, or None when `policy_id` | |
| is not a catalogued product. | |
| Resolution order (single source of truth β same dedup the catalogue | |
| uses): exact policy_id -> canonical `_ident(policy_id)` -> the | |
| canonical id of any catalogue card whose `aliases` contains this id's | |
| display name. Returns None (NOT a 404) so the caller keeps its existing | |
| curated-only / back-compat / never-404 behaviour for non-card ids.""" | |
| if not policy_id: | |
| return None | |
| card_idx, sc_idx = _catalogue_indices(profile) | |
| pid = policy_id.strip() | |
| sc = sc_idx.get(pid) or sc_idx.get(_canonical_ident(pid)) | |
| if sc is not None: | |
| return sc | |
| # Alias path: a marketing-rename id maps onto its canonical card. | |
| try: | |
| from backend.policy_identity import clean_display_policy_name | |
| want = clean_display_policy_name(pid) | |
| except Exception: # noqa: BLE001 | |
| want = pid | |
| for c in card_idx.values(): | |
| if pid in (c.aliases or []) or want in (c.aliases or []): | |
| cand = sc_idx.get(c.policy_id) or sc_idx.get( | |
| _canonical_ident(c.policy_id) | |
| ) | |
| if cand is not None: | |
| return cand | |
| return None | |
| def marketplace_grade(policy_id: str) -> dict: | |
| """{"_grade", "_overall_score"} for policy_id, identical to its | |
| marketplace card. Resolution order: exact id -> product_key -> UIN | |
| (so a marketing-rename / variant id maps onto its canonical card). | |
| Returns {} only when the policy is unknown to the marketplace.""" | |
| if not policy_id: | |
| return {} | |
| idx = _mg_index() | |
| from backend.policy_identity import product_key as _pk | |
| cur = _load_curated_facts() | |
| pid = policy_id.strip() | |
| keys = [f"id:{pid}", f"pk:{_pk(pid)}"] | |
| cdata = cur.get(pid) or cur.get(_pk(pid)) or {} | |
| uin = _mg_norm_uin(cdata.get("uin_code") or cdata.get("uin")) | |
| if not uin: | |
| try: | |
| import json as _j | |
| for cand in (pid, _pk(pid)): | |
| ep = settings.EXTRACTED_DIR / f"{cand}.json" | |
| if ep.exists(): | |
| uin = _mg_norm_uin(_j.loads(ep.read_text()).get("uin_code")) | |
| if uin: | |
| break | |
| except Exception: # noqa: BLE001 | |
| uin = "" | |
| if uin: | |
| keys.append(f"uin:{uin}") | |
| for k in keys: | |
| if k in idx: | |
| g, o = idx[k] | |
| return {"_grade": g, "_overall_score": o} | |
| return {} | |