Spaces:
Running
Running
File size: 10,882 Bytes
be54038 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 | """
arbiter.py β Hierarchy of Truth merge for UK Motor Insurance.
The PolicyArbiter takes one Schedule extraction and one Certificate extraction
and produces a single authoritative UKMotorGoldenRecord.
Document Authoritative for
ββββββββββββββββ ββββββββββββββββββββββββββββββββββββββββββββββββββ
Schedule vehicle_details, excess_breakdown, financial_summary,
driver DOB / occupation / license_type, NCB, cover_type
Certificate class_of_use, driving_other_cars
"""
from __future__ import annotations
import logging
from typing import Optional
from schema import (
ConflictEntry,
CoverAndExcesses,
Driver,
ExcessBreakdown,
NoClaimsDiscount,
PeriodOfCover,
PolicyHeader,
UKMotorGoldenRecord,
)
logger = logging.getLogger(__name__)
# Minimum rapidfuzz token_sort_ratio to consider two driver names a match.
_DRIVER_NAME_MATCH_THRESHOLD = 85
# ---------------------------------------------------------------------------
# PolicyArbiter
# ---------------------------------------------------------------------------
class PolicyArbiter:
"""
Merges a Schedule extraction and a Certificate extraction into one
authoritative UKMotorGoldenRecord using the Hierarchy of Truth.
Usage
-----
>>> arbiter = PolicyArbiter()
>>> golden, conflicts = arbiter.merge_records(
... schedule_record, "Schedule of Insurance (1).pdf",
... certificate_record, "Certificate of Motor Insurance.pdf",
... )
"""
def merge_records(
self,
schedule_record: UKMotorGoldenRecord,
schedule_filename: str,
certificate_record: UKMotorGoldenRecord,
certificate_filename: str,
) -> tuple[UKMotorGoldenRecord, list[ConflictEntry]]:
"""
Merge Schedule and Certificate extractions into one Golden Record.
Schedule is master for: vehicle_details, excess_breakdown,
financial_summary, driver DOB/occupation/license_type, NCB, cover_type.
Certificate is master for: class_of_use, driving_other_cars.
Returns
-------
tuple[UKMotorGoldenRecord, list[ConflictEntry]]
(golden_record, list of fields where the two documents disagreed)
"""
conflicts: list[ConflictEntry] = []
merged = UKMotorGoldenRecord()
# ββ Policy header βββββββββββββββββββββββββββββββββββββββββββββββββββ
merged.policy_header = _merge_policy_header(schedule_record, certificate_record, conflicts)
# ββ Vehicle details: Schedule is authoritative ββββββββββββββββββββββ
merged.vehicle_details = schedule_record.vehicle_details
# ββ Drivers: Schedule has DOB/occupation/licence ββββββββββββββββββββ
merged.driver_details = _merge_drivers(schedule_record, certificate_record, conflicts)
# ββ Cover and excesses: hybrid ββββββββββββββββββββββββββββββββββββββ
# class_of_use + driving_other_cars β Certificate
# cover_type + NCB + excess_breakdown β Schedule
merged.cover_and_excesses = _merge_cover_and_excesses(
schedule_record, certificate_record, conflicts
)
# ββ Financial summary: Schedule is authoritative ββββββββββββββββββββ
merged.financial_summary = schedule_record.financial_summary
# ββ Additional risk data: Schedule is authoritative βββββββββββββββββ
merged.additional_risk_data = schedule_record.additional_risk_data
# ββ Merge field_citations from both source records ββββββββββββββββββ
# Schedule wins on key conflicts (consistent with merge hierarchy).
# Stored on the merged record for provenance matching; excluded from JSON output.
sched_fc = dict(getattr(schedule_record, "field_citations", None) or {})
cert_fc = dict(getattr(certificate_record, "field_citations", None) or {})
merged_fc = {**cert_fc, **sched_fc}
if merged_fc:
merged.field_citations = merged_fc
if conflicts:
logger.info(
"Merge conflicts (%d): %s",
len(conflicts),
[c.field for c in conflicts],
)
logger.info(
"Merge complete: schedule='%s' + certificate='%s' β %d conflict(s)",
schedule_filename, certificate_filename, len(conflicts),
)
return merged, conflicts
# ---------------------------------------------------------------------------
# Private merge helpers
# ---------------------------------------------------------------------------
def _first(*values):
"""Return the first non-None value, or None if all are None."""
for v in values:
if v is not None:
return v
return None
def _check_conflict(
conflicts: list[ConflictEntry],
field: str,
sched_val,
cert_val,
winner: str,
):
"""
Detect a conflict between two scalar values, record it, and return the winner's value.
A conflict is logged only when both values are non-None *and* differ.
``winner`` must be ``"schedule"`` or ``"certificate"``.
"""
if sched_val is not None and cert_val is not None:
if str(sched_val).strip().lower() != str(cert_val).strip().lower():
conflicts.append(ConflictEntry(
field=field,
schedule_value=str(sched_val),
certificate_value=str(cert_val),
winner=winner,
))
if winner == "certificate":
return _first(cert_val, sched_val)
return _first(sched_val, cert_val) # schedule wins (default)
def _find_matching_driver(name: str, candidates: list[Driver]) -> Driver | None:
"""
Find the best-matching driver from *candidates* using fuzzy name matching.
Uses ``rapidfuzz.fuzz.token_sort_ratio`` so middle-name or word-order
differences (e.g. "JOHN A SMITH" vs "SMITH JOHN") still match.
Returns None when the best score is below ``_DRIVER_NAME_MATCH_THRESHOLD``.
"""
try:
from rapidfuzz import fuzz as rfuzz
except ImportError:
# Graceful fallback: exact uppercase match (original behaviour)
upper = name.strip().upper()
return next((d for d in candidates if d.name.strip().upper() == upper), None)
best_score = 0
best_driver: Driver | None = None
for candidate in candidates:
score = rfuzz.token_sort_ratio(name.strip(), candidate.name.strip())
if score > best_score:
best_score = score
best_driver = candidate
return best_driver if best_score >= _DRIVER_NAME_MATCH_THRESHOLD else None
def _merge_policy_header(
sched: UKMotorGoldenRecord,
cert: UKMotorGoldenRecord,
conflicts: list[ConflictEntry],
) -> Optional[PolicyHeader]:
"""Schedule is master; fill any gap from Certificate."""
sh = sched.policy_header or PolicyHeader()
ch = cert.policy_header or PolicyHeader()
poc: Optional[PeriodOfCover] = _first(sh.period_of_cover, ch.period_of_cover)
return PolicyHeader(
policy_number=_check_conflict(conflicts, "policy_header.policy_number", sh.policy_number, ch.policy_number, "schedule"),
insurer=_check_conflict(conflicts, "policy_header.insurer", sh.insurer, ch.insurer, "schedule"),
product_name=_check_conflict(conflicts, "policy_header.product_name", sh.product_name, ch.product_name, "schedule"),
period_of_cover=poc,
)
def _merge_drivers(
sched: UKMotorGoldenRecord,
cert: UKMotorGoldenRecord,
conflicts: list[ConflictEntry],
) -> list[Driver]:
"""
Schedule drivers are the base (they carry DOB, occupation, license_type).
For each Schedule driver, fuzzy-match against Certificate drivers and enrich
with relationship or is_main_driver if the Schedule record lacks them.
Falls back to the Certificate list when Schedule has no drivers.
Uses rapidfuzz ``token_sort_ratio`` with an 85-point threshold so minor
name variations (initials, hyphenation, word order) still merge correctly.
"""
sched_drivers = sched.driver_details or []
cert_drivers = cert.driver_details or []
if not sched_drivers:
return cert_drivers
merged: list[Driver] = []
for sd in sched_drivers:
cd = _find_matching_driver(sd.name, cert_drivers)
if cd is not None and sd.is_main_driver != cd.is_main_driver:
conflicts.append(ConflictEntry(
field=f"driver_details[{sd.name}].is_main_driver",
schedule_value=str(sd.is_main_driver),
certificate_value=str(cd.is_main_driver),
winner="schedule",
))
merged.append(Driver(
name=sd.name,
dob=_first(sd.dob, cd.dob if cd else None),
relationship=_first(sd.relationship, cd.relationship if cd else None),
occupation=_first(sd.occupation, cd.occupation if cd else None),
license_type=_first(sd.license_type, cd.license_type if cd else None),
is_main_driver=sd.is_main_driver or (cd.is_main_driver if cd else False),
specific_excess=_first(sd.specific_excess, cd.specific_excess if cd else None),
))
return merged
def _merge_cover_and_excesses(
sched: UKMotorGoldenRecord,
cert: UKMotorGoldenRecord,
conflicts: list[ConflictEntry],
) -> Optional[CoverAndExcesses]:
"""
Hybrid merge:
- class_of_use, driving_other_cars β Certificate is master
- cover_type, NCB, excess_breakdown β Schedule is master
"""
sc = sched.cover_and_excesses or CoverAndExcesses()
cc = cert.cover_and_excesses or CoverAndExcesses()
return CoverAndExcesses(
cover_type=_check_conflict(conflicts, "cover_and_excesses.cover_type", sc.cover_type, cc.cover_type, "schedule"),
no_claims_discount=_first(sc.no_claims_discount, cc.no_claims_discount),
excess_breakdown=_first(sc.excess_breakdown, cc.excess_breakdown),
# Certificate is authoritative for legal-use fields
class_of_use=_check_conflict(conflicts, "cover_and_excesses.class_of_use", sc.class_of_use, cc.class_of_use, "certificate"),
driving_other_cars=_check_conflict(conflicts, "cover_and_excesses.driving_other_cars", sc.driving_other_cars, cc.driving_other_cars, "certificate"),
)
|