Spaces:
Sleeping
Sleeping
| """Generate the Information Source Map β a detailed catalog of what knowledge | |
| the corpus contains, used for both reviewer-facing explainability and | |
| faithfulness verification at runtime. | |
| Outputs two artifacts after ingestion + extraction have run: | |
| 1. 70-docs/information_source_map.md | |
| Human-readable per-policy catalog: insurer, policy, doc type, chunk count, | |
| pages covered, extracted-field summary, source URL. The "what does the bot | |
| know" reference. | |
| 2. rag/source_map.json | |
| Machine-readable per-chunk index: {chunk_id, policy_id, page_range, | |
| extracted_terms, primary_topics}. Used by faithfulness verifier to look up | |
| whether a claim could plausibly trace to a chunk. | |
| Run: | |
| python -m rag.source_map | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import re | |
| import time | |
| from collections import defaultdict | |
| from pathlib import Path | |
| import chromadb | |
| import duckdb | |
| from chromadb.config import Settings as ChromaSettings | |
| from backend.config import settings | |
| ROOT = settings.CORPUS_DIR.parent.parent | |
| MD_OUTPUT = ROOT / "docs" / "information_source_map.md" | |
| JSON_OUTPUT = settings.VECTORS_DIR.parent / "source_map.json" | |
| # Topic keywords used to tag chunks with the high-level concepts they cover. | |
| # Used for the JSON catalog + chunk-routing in retrieval. | |
| TOPIC_KEYWORDS: dict[str, list[str]] = { | |
| "waiting_period": ["waiting period", "pre-existing", "PED", "specific waiting", "initial waiting"], | |
| "coverage_scope": ["covered", "covers", "inpatient", "outpatient", "OPD", "domiciliary"], | |
| "exclusions": ["exclusion", "excluded", "not covered", "shall not pay", "permanent exclusion"], | |
| "claim_process": ["claim", "settlement", "TAT", "turnaround time", "reimbursement", "cashless"], | |
| "sum_insured": ["sum insured", "sum assured", "policy limit", "annual limit"], | |
| "room_rent": ["room rent", "ICU", "private room", "single room"], | |
| "copayment": ["co-payment", "copay", "deductible", "patient share"], | |
| "maternity": ["maternity", "pregnancy", "delivery", "newborn"], | |
| "ayush": ["AYUSH", "Ayurveda", "Yoga", "Unani", "Siddha", "Homeopathy"], | |
| "critical_illness": ["critical illness", "cancer", "stroke", "heart attack", "kidney failure"], | |
| "network": ["network hospital", "cashless", "network of hospitals", "empanelled"], | |
| "ncb": ["no claim bonus", "NCB", "cumulative bonus", "renewal bonus"], | |
| "restoration": ["restoration", "refill", "recharge"], | |
| "geography": ["pan-india", "worldwide", "overseas", "geographic"], | |
| "tax_section_80d": ["80D", "tax benefit", "tax deduction", "income tax"], | |
| "renewal": ["renewal", "renewability", "lifelong", "guaranteed renewal"], | |
| } | |
| def chroma_collection(): | |
| client = chromadb.PersistentClient( | |
| path=str(settings.VECTORS_DIR), | |
| settings=ChromaSettings(anonymized_telemetry=False), | |
| ) | |
| return client.get_or_create_collection( | |
| name="policies", | |
| metadata={"hnsw:space": "cosine"}, | |
| ) | |
| def load_extracted_policies() -> dict[str, dict]: | |
| """Map policy_id -> extracted JSON from DuckDB.""" | |
| out: dict[str, dict] = {} | |
| db = settings.STRUCTURED_DB | |
| if not db.exists(): | |
| return out | |
| con = duckdb.connect(str(db), read_only=True) | |
| try: | |
| rows = con.execute("SELECT policy_id, data_json FROM policies").fetchall() | |
| for pid, data in rows: | |
| try: | |
| out[pid] = json.loads(data) | |
| except Exception: | |
| pass | |
| finally: | |
| con.close() | |
| return out | |
| def tag_topics(text: str) -> list[str]: | |
| """Return the topics this chunk text covers.""" | |
| t = text.lower() | |
| return [topic for topic, kws in TOPIC_KEYWORDS.items() if any(kw.lower() in t for kw in kws)] | |
| def summarize_fields(p: dict) -> dict: | |
| """Pick high-leverage fields for the per-policy summary in the markdown.""" | |
| def get(k, default="β"): | |
| v = p.get(k, default) | |
| if v is None or v == "" or v == []: | |
| return default | |
| return v | |
| return { | |
| "policy_name": get("policy_name"), | |
| "insurer_name": get("insurer_name"), | |
| "policy_type": get("policy_type"), | |
| "min_entry_age": get("min_entry_age"), | |
| "max_entry_age": get("max_entry_age"), | |
| "sum_insured_options": get("sum_insured_options"), | |
| "pre_existing_disease_waiting_months": get("pre_existing_disease_waiting_months"), | |
| "maternity_waiting_months": get("maternity_waiting_months"), | |
| "ayush_coverage": get("ayush_coverage"), | |
| "room_rent_capping": get("room_rent_capping"), | |
| "copayment_pct": get("copayment_pct"), | |
| "no_claim_bonus_pct": get("no_claim_bonus_pct"), | |
| "network_hospital_count": get("network_hospital_count"), | |
| "extraction_confidence_pct": get("extraction_confidence_pct"), | |
| } | |
| def build_machine_index() -> dict: | |
| """Per-chunk index used by faithfulness verifier.""" | |
| coll = chroma_collection() | |
| total = coll.count() | |
| if total == 0: | |
| return {"total_chunks": 0, "chunks": []} | |
| PAGE = 500 | |
| chunks_out: list[dict] = [] | |
| for offset in range(0, total, PAGE): | |
| res = coll.get(limit=PAGE, offset=offset, include=["documents", "metadatas"]) | |
| for cid, doc, meta in zip(res["ids"], res["documents"], res["metadatas"]): | |
| chunks_out.append({ | |
| "chunk_id": cid, | |
| "policy_id": meta.get("policy_id", ""), | |
| "insurer_slug": meta.get("insurer_slug", ""), | |
| "policy_name": meta.get("policy_name", ""), | |
| "doc_type": meta.get("doc_type", ""), | |
| "page_start": meta.get("page_start"), | |
| "page_end": meta.get("page_end"), | |
| "topics": tag_topics(doc), | |
| }) | |
| return { | |
| "generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), | |
| "total_chunks": total, | |
| "chunks": chunks_out, | |
| } | |
| def build_markdown(machine: dict, extracted: dict[str, dict]) -> str: | |
| """Human-readable per-policy + per-topic catalog.""" | |
| # Group chunks by policy | |
| by_policy: dict[str, list[dict]] = defaultdict(list) | |
| for c in machine.get("chunks", []): | |
| by_policy[c["policy_id"]].append(c) | |
| # Per-policy summary | |
| policies_md = [] | |
| for pid in sorted(by_policy.keys()): | |
| chunks = by_policy[pid] | |
| meta = chunks[0] | |
| pages = sorted(set(c["page_start"] for c in chunks if c["page_start"])) | |
| topic_counts: dict[str, int] = defaultdict(int) | |
| for c in chunks: | |
| for t in c.get("topics", []): | |
| topic_counts[t] += 1 | |
| topic_summary = ", ".join(f"{t}({n})" for t, n in sorted(topic_counts.items(), key=lambda kv: -kv[1])[:8]) | |
| # Extracted-field summary | |
| ext = extracted.get(pid, {}) | |
| f = summarize_fields(ext) if ext else {} | |
| field_lines = [] | |
| for k, v in f.items(): | |
| if v not in ("β", None, ""): | |
| field_lines.append(f" - **{k}**: {v}") | |
| field_block = "\n".join(field_lines) if field_lines else " - (extraction not yet run for this policy)" | |
| policies_md.append( | |
| f"### {meta['policy_name']} \n" | |
| f"_{meta['insurer_slug']} Β· {meta['doc_type']} Β· {len(chunks)} chunks Β· pages {min(pages) if pages else '?'}-{max(pages) if pages else '?'}_\n\n" | |
| f"**Topics covered:** {topic_summary or '(none auto-tagged)'}\n\n" | |
| f"**Extracted fields:**\n{field_block}\n\n" | |
| f"`policy_id`: `{pid}`\n" | |
| ) | |
| # Per-topic inverted index | |
| topic_to_policies: dict[str, set[str]] = defaultdict(set) | |
| for c in machine.get("chunks", []): | |
| for t in c.get("topics", []): | |
| topic_to_policies[t].add(c["policy_id"]) | |
| topic_md = [] | |
| for topic in sorted(topic_to_policies.keys()): | |
| pols = sorted(topic_to_policies[topic]) | |
| topic_md.append(f"- **{topic}** β covered in {len(pols)} policies: {', '.join(pols[:8])}{', β¦' if len(pols) > 8 else ''}") | |
| md = f"""# Information Source Map | |
| | Field | Value | | |
| | --- | --- | | |
| | Generated | {machine.get('generated_at', 'never')} | | |
| | Total chunks in vector store | {machine.get('total_chunks', 0)} | | |
| | Policies indexed | {len(by_policy)} | | |
| | Topics auto-tagged | {len(TOPIC_KEYWORDS)} | | |
| ## 0. Purpose | |
| This document is the **authoritative catalog of what the bot can answer**. Every chunk in the Chroma vector store is summarized here, grouped by policy. For each policy, the high-value extracted fields are listed alongside. | |
| A reviewer can use this file to answer two questions: | |
| 1. **"Could the bot know this?"** β look up the policy + topic. | |
| 2. **"Is the bot's answer plausibly grounded?"** β cross-reference the policy_id and field in the runtime audit log. | |
| This artifact is regenerated after every ingestion or extraction run via `python -m rag.source_map`. | |
| ## 1. Topic inverted index β what is covered, where | |
| {chr(10).join(topic_md) if topic_md else '_(no topics indexed yet β has ingestion run?)_'} | |
| ## 2. Per-policy catalog | |
| {(chr(10) + chr(10)).join(policies_md) if policies_md else '_(no policies indexed yet)_'} | |
| --- | |
| ## 3. Machine-readable index | |
| A JSON form of this catalog is at `rag/source_map.json` β used by the faithfulness verifier to look up whether a claim could plausibly trace to a chunk before allowing it through. | |
| ## 4. Coverage gaps (transparent) | |
| These are areas where the corpus is thin. Bot questions on these should refuse: | |
| - **Regulatory documents (IRDAI):** Deferred β see `decisions.md` D-017. The bot's faithfulness Gate 1 (retrieval floor) refuses these correctly. | |
| - **Premium pricing:** Out of scope (advisor, not broker). See `decisions.md` D-007. | |
| - **Categories beyond Health (Life, Motor, Travel):** Out of scope v1. | |
| - **Star Health policies (11 PDFs):** Star Health's CDN actively blocks scripted downloads. Mitigation pending in v2. | |
| """ | |
| return md | |
| def main(): | |
| extracted = load_extracted_policies() | |
| machine = build_machine_index() | |
| JSON_OUTPUT.parent.mkdir(parents=True, exist_ok=True) | |
| JSON_OUTPUT.write_text(json.dumps(machine, indent=2)) | |
| md = build_markdown(machine, extracted) | |
| MD_OUTPUT.parent.mkdir(parents=True, exist_ok=True) | |
| MD_OUTPUT.write_text(md) | |
| print(f"Wrote:") | |
| print(f" {MD_OUTPUT.relative_to(ROOT)} ({len(md)} bytes)") | |
| print(f" {JSON_OUTPUT.relative_to(ROOT)} ({machine.get('total_chunks', 0)} chunks)") | |
| print(f"Policies indexed: {len({c['policy_id'] for c in machine.get('chunks', [])})}") | |
| if __name__ == "__main__": | |
| main() | |