""" arbiter.py — Hierarchy of Truth merge for UK Motor Insurance. The PolicyArbiter takes one Schedule extraction and one Certificate extraction and produces a single authoritative UKMotorGoldenRecord. Document Authoritative for ──────────────── ────────────────────────────────────────────────── Schedule vehicle_details, excess_breakdown, financial_summary, driver DOB / occupation / license_type, NCB, cover_type Certificate class_of_use, driving_other_cars """ from __future__ import annotations import logging from typing import Optional from schema import ( ConflictEntry, CoverAndExcesses, Driver, ExcessBreakdown, NoClaimsDiscount, PeriodOfCover, PolicyHeader, UKMotorGoldenRecord, ) logger = logging.getLogger(__name__) # Minimum rapidfuzz token_sort_ratio to consider two driver names a match. _DRIVER_NAME_MATCH_THRESHOLD = 85 # --------------------------------------------------------------------------- # PolicyArbiter # --------------------------------------------------------------------------- class PolicyArbiter: """ Merges a Schedule extraction and a Certificate extraction into one authoritative UKMotorGoldenRecord using the Hierarchy of Truth. Usage ----- >>> arbiter = PolicyArbiter() >>> golden, conflicts = arbiter.merge_records( ... schedule_record, "Schedule of Insurance (1).pdf", ... certificate_record, "Certificate of Motor Insurance.pdf", ... ) """ def merge_records( self, schedule_record: UKMotorGoldenRecord, schedule_filename: str, certificate_record: UKMotorGoldenRecord, certificate_filename: str, ) -> tuple[UKMotorGoldenRecord, list[ConflictEntry]]: """ Merge Schedule and Certificate extractions into one Golden Record. Schedule is master for: vehicle_details, excess_breakdown, financial_summary, driver DOB/occupation/license_type, NCB, cover_type. Certificate is master for: class_of_use, driving_other_cars. Returns ------- tuple[UKMotorGoldenRecord, list[ConflictEntry]] (golden_record, list of fields where the two documents disagreed) """ conflicts: list[ConflictEntry] = [] merged = UKMotorGoldenRecord() # ── Policy header ─────────────────────────────────────────────────── merged.policy_header = _merge_policy_header(schedule_record, certificate_record, conflicts) # ── Vehicle details: Schedule is authoritative ────────────────────── merged.vehicle_details = schedule_record.vehicle_details # ── Drivers: Schedule has DOB/occupation/licence ──────────────────── merged.driver_details = _merge_drivers(schedule_record, certificate_record, conflicts) # ── Cover and excesses: hybrid ────────────────────────────────────── # class_of_use + driving_other_cars → Certificate # cover_type + NCB + excess_breakdown → Schedule merged.cover_and_excesses = _merge_cover_and_excesses( schedule_record, certificate_record, conflicts ) # ── Financial summary: Schedule is authoritative ──────────────────── merged.financial_summary = schedule_record.financial_summary # ── Additional risk data: Schedule is authoritative ───────────────── merged.additional_risk_data = schedule_record.additional_risk_data # ── Merge field_citations from both source records ────────────────── # Schedule wins on key conflicts (consistent with merge hierarchy). # Stored on the merged record for provenance matching; excluded from JSON output. sched_fc = dict(getattr(schedule_record, "field_citations", None) or {}) cert_fc = dict(getattr(certificate_record, "field_citations", None) or {}) merged_fc = {**cert_fc, **sched_fc} if merged_fc: merged.field_citations = merged_fc if conflicts: logger.info( "Merge conflicts (%d): %s", len(conflicts), [c.field for c in conflicts], ) logger.info( "Merge complete: schedule='%s' + certificate='%s' — %d conflict(s)", schedule_filename, certificate_filename, len(conflicts), ) return merged, conflicts # --------------------------------------------------------------------------- # Private merge helpers # --------------------------------------------------------------------------- def _first(*values): """Return the first non-None value, or None if all are None.""" for v in values: if v is not None: return v return None def _check_conflict( conflicts: list[ConflictEntry], field: str, sched_val, cert_val, winner: str, ): """ Detect a conflict between two scalar values, record it, and return the winner's value. A conflict is logged only when both values are non-None *and* differ. ``winner`` must be ``"schedule"`` or ``"certificate"``. """ if sched_val is not None and cert_val is not None: if str(sched_val).strip().lower() != str(cert_val).strip().lower(): conflicts.append(ConflictEntry( field=field, schedule_value=str(sched_val), certificate_value=str(cert_val), winner=winner, )) if winner == "certificate": return _first(cert_val, sched_val) return _first(sched_val, cert_val) # schedule wins (default) def _find_matching_driver(name: str, candidates: list[Driver]) -> Driver | None: """ Find the best-matching driver from *candidates* using fuzzy name matching. Uses ``rapidfuzz.fuzz.token_sort_ratio`` so middle-name or word-order differences (e.g. "JOHN A SMITH" vs "SMITH JOHN") still match. Returns None when the best score is below ``_DRIVER_NAME_MATCH_THRESHOLD``. """ try: from rapidfuzz import fuzz as rfuzz except ImportError: # Graceful fallback: exact uppercase match (original behaviour) upper = name.strip().upper() return next((d for d in candidates if d.name.strip().upper() == upper), None) best_score = 0 best_driver: Driver | None = None for candidate in candidates: score = rfuzz.token_sort_ratio(name.strip(), candidate.name.strip()) if score > best_score: best_score = score best_driver = candidate return best_driver if best_score >= _DRIVER_NAME_MATCH_THRESHOLD else None def _merge_policy_header( sched: UKMotorGoldenRecord, cert: UKMotorGoldenRecord, conflicts: list[ConflictEntry], ) -> Optional[PolicyHeader]: """Schedule is master; fill any gap from Certificate.""" sh = sched.policy_header or PolicyHeader() ch = cert.policy_header or PolicyHeader() poc: Optional[PeriodOfCover] = _first(sh.period_of_cover, ch.period_of_cover) return PolicyHeader( policy_number=_check_conflict(conflicts, "policy_header.policy_number", sh.policy_number, ch.policy_number, "schedule"), insurer=_check_conflict(conflicts, "policy_header.insurer", sh.insurer, ch.insurer, "schedule"), product_name=_check_conflict(conflicts, "policy_header.product_name", sh.product_name, ch.product_name, "schedule"), period_of_cover=poc, ) def _merge_drivers( sched: UKMotorGoldenRecord, cert: UKMotorGoldenRecord, conflicts: list[ConflictEntry], ) -> list[Driver]: """ Schedule drivers are the base (they carry DOB, occupation, license_type). For each Schedule driver, fuzzy-match against Certificate drivers and enrich with relationship or is_main_driver if the Schedule record lacks them. Falls back to the Certificate list when Schedule has no drivers. Uses rapidfuzz ``token_sort_ratio`` with an 85-point threshold so minor name variations (initials, hyphenation, word order) still merge correctly. """ sched_drivers = sched.driver_details or [] cert_drivers = cert.driver_details or [] if not sched_drivers: return cert_drivers merged: list[Driver] = [] for sd in sched_drivers: cd = _find_matching_driver(sd.name, cert_drivers) if cd is not None and sd.is_main_driver != cd.is_main_driver: conflicts.append(ConflictEntry( field=f"driver_details[{sd.name}].is_main_driver", schedule_value=str(sd.is_main_driver), certificate_value=str(cd.is_main_driver), winner="schedule", )) merged.append(Driver( name=sd.name, dob=_first(sd.dob, cd.dob if cd else None), relationship=_first(sd.relationship, cd.relationship if cd else None), occupation=_first(sd.occupation, cd.occupation if cd else None), license_type=_first(sd.license_type, cd.license_type if cd else None), is_main_driver=sd.is_main_driver or (cd.is_main_driver if cd else False), specific_excess=_first(sd.specific_excess, cd.specific_excess if cd else None), )) return merged def _merge_cover_and_excesses( sched: UKMotorGoldenRecord, cert: UKMotorGoldenRecord, conflicts: list[ConflictEntry], ) -> Optional[CoverAndExcesses]: """ Hybrid merge: - class_of_use, driving_other_cars → Certificate is master - cover_type, NCB, excess_breakdown → Schedule is master """ sc = sched.cover_and_excesses or CoverAndExcesses() cc = cert.cover_and_excesses or CoverAndExcesses() return CoverAndExcesses( cover_type=_check_conflict(conflicts, "cover_and_excesses.cover_type", sc.cover_type, cc.cover_type, "schedule"), no_claims_discount=_first(sc.no_claims_discount, cc.no_claims_discount), excess_breakdown=_first(sc.excess_breakdown, cc.excess_breakdown), # Certificate is authoritative for legal-use fields class_of_use=_check_conflict(conflicts, "cover_and_excesses.class_of_use", sc.class_of_use, cc.class_of_use, "certificate"), driving_other_cars=_check_conflict(conflicts, "cover_and_excesses.driving_other_cars", sc.driving_other_cars, cc.driving_other_cars, "certificate"), )