Spaces:
Sleeping
Sleeping
| """Curate every policy PDF in rag/corpus/ that doesn't have a matching | |
| 40-data/policy_facts/<policy_id>.json yet. Includes group/B2B/specialty plans | |
| the batch-2 agent excluded. | |
| Uses pdfplumber to read PDF text + regex patterns from curate_batch2 to | |
| extract structured fields. Output JSON follows the same provenance schema | |
| as batches 1 + 2. | |
| Run: | |
| .venv/bin/python3 tools/curate_remaining.py | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import re | |
| import sys | |
| import time | |
| from pathlib import Path | |
| BASE = Path(__file__).resolve().parent.parent | |
| CORPUS = BASE / "rag" / "corpus" | |
| OUT_DIR = BASE / "40-data" / "policy_facts" | |
| OUT_DIR.mkdir(exist_ok=True, parents=True) | |
| # Reuse the same regex extractors as batch 2 by importing the module | |
| sys.path.insert(0, str(BASE / "tools")) | |
| try: | |
| import pdfplumber | |
| except ImportError: | |
| print("ERROR: pdfplumber not installed. Run from venv.") | |
| sys.exit(1) | |
| def existing_policy_ids() -> set[str]: | |
| out = set() | |
| for f in OUT_DIR.glob("*.json"): | |
| if f.stem.startswith("_"): | |
| continue | |
| out.add(f.stem) | |
| return out | |
| def policy_id_for(pdf: Path) -> str: | |
| insurer = pdf.parent.name | |
| return f"{insurer}__{pdf.stem}" | |
| def extract_text(pdf: Path, max_pages: int = 30) -> str: | |
| chunks = [] | |
| try: | |
| with pdfplumber.open(pdf) as p: | |
| for i, page in enumerate(p.pages[:max_pages]): | |
| t = page.extract_text() or "" | |
| chunks.append(t) | |
| except Exception as e: # noqa: BLE001 | |
| return "" | |
| return "\n\n".join(chunks) | |
| # ---- Field extractors (compact subset of batch_2 patterns) ----------------- | |
| def find_first(patterns: list[str], text: str, flags: int = re.IGNORECASE) -> tuple[str | None, str | None]: | |
| """Return (matched_value, source_quote) for the first pattern that hits.""" | |
| for p in patterns: | |
| m = re.search(p, text, flags=flags) | |
| if m: | |
| q = m.group(0)[:140] | |
| v = m.group(1) if m.groups() else m.group(0) | |
| return v.strip(), q.strip() | |
| return None, None | |
| def extract_uin(text: str) -> tuple[str | None, str | None]: | |
| return find_first([ | |
| r"UIN[:\s]*([A-Z0-9]{12,20})", | |
| r"Product UIN[:\s]*([A-Z0-9]{12,20})", | |
| r"Unique Identification Number[:\s]*([A-Z0-9]{12,20})", | |
| ], text) | |
| def extract_int_after(label_patterns: list[str], text: str, range_min: int = 0, range_max: int = 1000) -> tuple[int | None, str | None]: | |
| for p in label_patterns: | |
| m = re.search(p, text, flags=re.IGNORECASE) | |
| if m: | |
| try: | |
| v = int(re.sub(r"\D", "", m.group(1))) | |
| except (ValueError, IndexError): | |
| continue | |
| if range_min <= v <= range_max: | |
| return v, m.group(0)[:120] | |
| return None, None | |
| def extract_field_pack(text: str, insurer_slug: str, pdf: Path) -> dict: | |
| """Run the regex patterns over the PDF text + assemble the JSON.""" | |
| out: dict = {} | |
| pdf_rel = str(pdf.relative_to(BASE)) | |
| def wrap(value, quote, url=None): | |
| if value is None and quote is None: | |
| return {"value": None, "source_pdf_path": pdf_rel, "source_quote": ""} | |
| return { | |
| "value": value, | |
| "source_pdf_path": pdf_rel, | |
| "source_quote": (quote or "")[:140], | |
| } | |
| # UIN | |
| uin, uin_q = extract_uin(text) | |
| out["uin_code"] = wrap(uin, uin_q) | |
| # Entry age (min / max) | |
| min_age, min_q = extract_int_after([ | |
| r"(?:Minimum|Entry).{0,30}?(?:Age)[:\s]*(\d{1,3})", | |
| r"Age (?:at )?Entry[:\s\-]*(?:Min(?:imum)?)?[:\s]*(\d{1,3})", | |
| ], text, 0, 99) | |
| out["min_entry_age"] = wrap(min_age, min_q) | |
| max_age, max_q = extract_int_after([ | |
| r"(?:Maximum|Max\.?).{0,30}?(?:Age|Entry Age)[:\s]*(\d{2,3})", | |
| r"Age (?:at )?Entry.{0,40}?Max[^:]*[:\s]*(\d{2,3})", | |
| ], text, 18, 200) | |
| out["max_entry_age"] = wrap(max_age, max_q) | |
| # Renewal | |
| renew_q_match = re.search(r"(Lifelong|Life[- ]long).{0,40}(?:renew|Renewal)", text, flags=re.IGNORECASE) | |
| if renew_q_match: | |
| out["max_renewal_age"] = {"value": 99, "source_pdf_path": pdf_rel, "source_quote": renew_q_match.group(0)[:120]} | |
| else: | |
| renew_age, renew_q = extract_int_after([ | |
| r"Renewal[\s\S]{0,80}?up to[:\s]*(\d{2,3})", | |
| r"(?:Maximum renewal|Renew till|Renewal age)[:\s]*(\d{2,3})", | |
| ], text, 50, 120) | |
| out["max_renewal_age"] = wrap(renew_age, renew_q) | |
| # PED waiting | |
| ped_months, ped_q = extract_int_after([ | |
| r"Pre[-\s]?existing[^\n]{0,200}?(\d{1,3})\s*(?:months|month|consecutive months)", | |
| r"Pre[-\s]?existing[^\n]{0,200}?(\d{1,2})\s*years", | |
| ], text, 0, 120) | |
| if ped_months is not None and "year" in (ped_q or "").lower() and ped_months < 10: | |
| ped_months *= 12 # convert years to months | |
| out["pre_existing_disease_waiting_months"] = wrap(ped_months, ped_q) | |
| # Initial waiting (days) | |
| initial_days, init_q = extract_int_after([ | |
| r"Initial waiting[\s\S]{0,80}?(\d{1,3})\s*days", | |
| r"first (\d{1,3}) days from[^\n]{0,50}commencement", | |
| ], text, 0, 365) | |
| out["initial_waiting_period_days"] = wrap(initial_days, init_q) | |
| # Maternity | |
| mat_q_match = re.search(r"Maternity[\s\S]{0,300}?(?:not covered|excluded|no cover)", text, flags=re.IGNORECASE) | |
| if mat_q_match: | |
| out["maternity_coverage"] = {"value": False, "source_pdf_path": pdf_rel, "source_quote": mat_q_match.group(0)[:120]} | |
| out["maternity_waiting_months"] = wrap(None, None) | |
| else: | |
| mat_months, mat_q = extract_int_after([ | |
| r"Maternity[\s\S]{0,200}?(\d{1,3})\s*months", | |
| r"Maternity[\s\S]{0,200}?(\d{1,2})\s*years", | |
| ], text, 0, 60) | |
| if mat_months is not None and "year" in (mat_q or "").lower() and mat_months < 10: | |
| mat_months *= 12 | |
| if mat_months is not None: | |
| out["maternity_coverage"] = {"value": True, "source_pdf_path": pdf_rel, "source_quote": (mat_q or "")[:120]} | |
| out["maternity_waiting_months"] = wrap(mat_months, mat_q) | |
| else: | |
| out["maternity_coverage"] = wrap(None, None) | |
| out["maternity_waiting_months"] = wrap(None, None) | |
| # AYUSH | |
| ayush_match = re.search(r"AYUSH[\s\S]{0,200}?(covered|included|payable|reimburs)", text, flags=re.IGNORECASE) | |
| no_ayush = re.search(r"AYUSH[\s\S]{0,80}?(not covered|excluded)", text, flags=re.IGNORECASE) | |
| if no_ayush: | |
| out["ayush_coverage"] = {"value": False, "source_pdf_path": pdf_rel, "source_quote": no_ayush.group(0)[:120]} | |
| elif ayush_match: | |
| out["ayush_coverage"] = {"value": True, "source_pdf_path": pdf_rel, "source_quote": ayush_match.group(0)[:120]} | |
| else: | |
| out["ayush_coverage"] = wrap(None, None) | |
| # Cashless | |
| cashless_match = re.search(r"cashless[\s\S]{0,80}?(facility|treatment|hospital|network)", text, flags=re.IGNORECASE) | |
| out["cashless_treatment_supported"] = { | |
| "value": True if cashless_match else None, | |
| "source_pdf_path": pdf_rel, | |
| "source_quote": (cashless_match.group(0)[:120] if cashless_match else ""), | |
| } | |
| # Co-pay | |
| copay, copay_q = extract_int_after([ | |
| r"Co-?[\s]?pay(?:ment)?[\s\S]{0,80}?(\d{1,2})\s*%", | |
| ], text, 0, 50) | |
| out["copayment_pct"] = wrap(copay, copay_q) | |
| # NCB | |
| ncb, ncb_q = extract_int_after([ | |
| r"(?:No[-\s]?Claim Bonus|Cumulative Bonus|NCB)[\s\S]{0,100}?(\d{1,3})\s*%", | |
| ], text, 5, 100) | |
| out["no_claim_bonus_pct"] = wrap(ncb, ncb_q) | |
| # Restoration | |
| restore_match = re.search(r"Restoration[\s\S]{0,180}?(\d+\s*%|once|unlimited|automatic|on full exhaustion)", text, flags=re.IGNORECASE) | |
| if restore_match: | |
| out["restoration_benefit"] = { | |
| "value": restore_match.group(0)[:80].strip(), | |
| "source_pdf_path": pdf_rel, | |
| "source_quote": restore_match.group(0)[:120], | |
| } | |
| else: | |
| out["restoration_benefit"] = wrap(None, None) | |
| # Room rent | |
| room_match = re.search(r"Room rent[\s\S]{0,200}?(no limit|single private|capped|up to|\d+\s*%)", text, flags=re.IGNORECASE) | |
| if room_match: | |
| out["room_rent_capping"] = { | |
| "value": room_match.group(0)[:80].strip(), | |
| "source_pdf_path": pdf_rel, | |
| "source_quote": room_match.group(0)[:120], | |
| } | |
| else: | |
| out["room_rent_capping"] = wrap(None, None) | |
| # Pre-/post-hospitalisation | |
| pre_h, pre_h_q = extract_int_after([ | |
| r"Pre[-\s]?hospitali[sz]ation[\s\S]{0,100}?(\d{1,3})\s*days", | |
| ], text, 0, 365) | |
| post_h, post_h_q = extract_int_after([ | |
| r"Post[-\s]?hospitali[sz]ation[\s\S]{0,100}?(\d{1,3})\s*days", | |
| ], text, 0, 365) | |
| out["pre_hospitalization_days"] = wrap(pre_h, pre_h_q) | |
| out["post_hospitalization_days"] = wrap(post_h, post_h_q) | |
| # Day care | |
| daycare, dc_q = extract_int_after([ | |
| r"(\d{2,4})\s*(?:Day[\s-]?Care|day care procedures|daycare)", | |
| ], text, 50, 1500) | |
| out["day_care_treatments_count"] = wrap(daycare, dc_q) | |
| # Network hospitals | |
| net, net_q = extract_int_after([ | |
| r"(\d{3,6})\s*\+?\s*(?:Network Hospitals|cashless hospitals|hospital network)", | |
| ], text, 500, 100000) | |
| out["network_hospital_count"] = wrap(net, net_q) | |
| # Sum insured options β heuristic, only when clearly enumerated | |
| si_match = re.search(r"Sum Insured.{0,30}?Options?[:\s]*([\d,\s/L/Lakh/Cr]+)", text, flags=re.IGNORECASE) | |
| out["sum_insured_options"] = wrap(None, si_match.group(0)[:120] if si_match else None) | |
| # Universal fields that aren't in policy PDFs | |
| out["claim_settlement_ratio"] = wrap(None, None) | |
| out["tat_cashless_authorization_hours"] = wrap(None, None) | |
| # Policy-type heuristic | |
| text_lower = text.lower() | |
| if "group health" in text_lower or "employer" in text_lower or pdf.stem.startswith("group"): | |
| ptype = "group" | |
| elif "top up" in text_lower or "top-up" in text_lower or "super top" in text_lower: | |
| ptype = "top_up" | |
| elif "hospital cash" in text_lower or "daily cash" in text_lower: | |
| ptype = "hospital_cash" | |
| elif "cancer" in pdf.stem.lower() or "criti" in pdf.stem.lower() or "critical illness" in text_lower: | |
| ptype = "critical_illness" | |
| elif "personal accident" in text_lower: | |
| ptype = "personal_accident" | |
| else: | |
| ptype = "indemnity" | |
| out["policy_type"] = {"value": ptype, "source_pdf_path": pdf_rel, "source_quote": f"classified as {ptype} from PDF heuristics"} | |
| return out | |
| def derive_policy_name(pdf: Path) -> str: | |
| """Convert filename to human-readable policy name.""" | |
| stem = pdf.stem | |
| stem = re.sub(r"__(wordings|brochure|cis|prospectus|policy).*$", "", stem) | |
| parts = re.split(r"[-_]+", stem) | |
| return " ".join(p.capitalize() for p in parts if p) | |
| def main(): | |
| existing = existing_policy_ids() | |
| print(f"Currently curated: {len(existing)}") | |
| work = [] | |
| for pdf in sorted(CORPUS.rglob("*.pdf")): | |
| if pdf.parent.name == "regulatory": | |
| continue | |
| pid = policy_id_for(pdf) | |
| # Also try stripped suffix forms | |
| stem_clean = re.sub(r"__(wordings|brochure|cis|prospectus|policy)$", "", pdf.stem) | |
| short = f"{pdf.parent.name}__{stem_clean}" | |
| if pid in existing or short in existing: | |
| continue | |
| work.append(pdf) | |
| print(f"Uncurated: {len(work)} PDFs") | |
| completeness_scores = [] | |
| for i, pdf in enumerate(work, 1): | |
| pid = policy_id_for(pdf) | |
| text = extract_text(pdf) | |
| if len(text) < 200: | |
| print(f" [{i}/{len(work)}] SKIP {pid} β too short ({len(text)} chars)") | |
| continue | |
| fields = extract_field_pack(text, pdf.parent.name, pdf) | |
| # Add identity + meta | |
| out_doc = { | |
| "policy_id": pid, | |
| "policy_name": derive_policy_name(pdf), | |
| "insurer_slug": pdf.parent.name, | |
| **fields, | |
| } | |
| # Completeness based on non-null values | |
| relevant = [k for k in out_doc if isinstance(out_doc.get(k), dict) and "value" in out_doc[k]] | |
| non_null = sum(1 for k in relevant if out_doc[k].get("value") not in (None, "", [])) | |
| completeness_pct = round(non_null / max(1, len(relevant)) * 100) | |
| out_doc["_meta"] = { | |
| "curated_at": time.strftime("%Y-%m-%d"), | |
| "primary_source_pdf": str(pdf.relative_to(BASE)), | |
| "completeness_pct": completeness_pct, | |
| "notes": "Curated by tools/curate_remaining.py β pattern-based extraction from local PDF", | |
| } | |
| out_path = OUT_DIR / f"{pid}.json" | |
| out_path.write_text(json.dumps(out_doc, indent=2)) | |
| completeness_scores.append(completeness_pct) | |
| print(f" [{i}/{len(work)}] {pid}: {completeness_pct}%") | |
| avg = sum(completeness_scores) / max(1, len(completeness_scores)) | |
| print(f"\nDone. {len(completeness_scores)} new JSONs, avg completeness {avg:.1f}%") | |
| print(f"Total curated now: {len(existing_policy_ids())}") | |
| if __name__ == "__main__": | |
| main() | |