AI-PolicyTrace / src /arbiter.py
teja141290's picture
Deploy PolicyTrace Hugging Face Space
be54038
"""
arbiter.py — Hierarchy of Truth merge for UK Motor Insurance.
The PolicyArbiter takes one Schedule extraction and one Certificate extraction
and produces a single authoritative UKMotorGoldenRecord.
Document Authoritative for
──────────────── ──────────────────────────────────────────────────
Schedule vehicle_details, excess_breakdown, financial_summary,
driver DOB / occupation / license_type, NCB, cover_type
Certificate class_of_use, driving_other_cars
"""
from __future__ import annotations
import logging
from typing import Optional
from schema import (
ConflictEntry,
CoverAndExcesses,
Driver,
ExcessBreakdown,
NoClaimsDiscount,
PeriodOfCover,
PolicyHeader,
UKMotorGoldenRecord,
)
logger = logging.getLogger(__name__)
# Minimum rapidfuzz token_sort_ratio to consider two driver names a match.
_DRIVER_NAME_MATCH_THRESHOLD = 85
# ---------------------------------------------------------------------------
# PolicyArbiter
# ---------------------------------------------------------------------------
class PolicyArbiter:
"""
Merges a Schedule extraction and a Certificate extraction into one
authoritative UKMotorGoldenRecord using the Hierarchy of Truth.
Usage
-----
>>> arbiter = PolicyArbiter()
>>> golden, conflicts = arbiter.merge_records(
... schedule_record, "Schedule of Insurance (1).pdf",
... certificate_record, "Certificate of Motor Insurance.pdf",
... )
"""
def merge_records(
self,
schedule_record: UKMotorGoldenRecord,
schedule_filename: str,
certificate_record: UKMotorGoldenRecord,
certificate_filename: str,
) -> tuple[UKMotorGoldenRecord, list[ConflictEntry]]:
"""
Merge Schedule and Certificate extractions into one Golden Record.
Schedule is master for: vehicle_details, excess_breakdown,
financial_summary, driver DOB/occupation/license_type, NCB, cover_type.
Certificate is master for: class_of_use, driving_other_cars.
Returns
-------
tuple[UKMotorGoldenRecord, list[ConflictEntry]]
(golden_record, list of fields where the two documents disagreed)
"""
conflicts: list[ConflictEntry] = []
merged = UKMotorGoldenRecord()
# ── Policy header ───────────────────────────────────────────────────
merged.policy_header = _merge_policy_header(schedule_record, certificate_record, conflicts)
# ── Vehicle details: Schedule is authoritative ──────────────────────
merged.vehicle_details = schedule_record.vehicle_details
# ── Drivers: Schedule has DOB/occupation/licence ────────────────────
merged.driver_details = _merge_drivers(schedule_record, certificate_record, conflicts)
# ── Cover and excesses: hybrid ──────────────────────────────────────
# class_of_use + driving_other_cars → Certificate
# cover_type + NCB + excess_breakdown → Schedule
merged.cover_and_excesses = _merge_cover_and_excesses(
schedule_record, certificate_record, conflicts
)
# ── Financial summary: Schedule is authoritative ────────────────────
merged.financial_summary = schedule_record.financial_summary
# ── Additional risk data: Schedule is authoritative ─────────────────
merged.additional_risk_data = schedule_record.additional_risk_data
# ── Merge field_citations from both source records ──────────────────
# Schedule wins on key conflicts (consistent with merge hierarchy).
# Stored on the merged record for provenance matching; excluded from JSON output.
sched_fc = dict(getattr(schedule_record, "field_citations", None) or {})
cert_fc = dict(getattr(certificate_record, "field_citations", None) or {})
merged_fc = {**cert_fc, **sched_fc}
if merged_fc:
merged.field_citations = merged_fc
if conflicts:
logger.info(
"Merge conflicts (%d): %s",
len(conflicts),
[c.field for c in conflicts],
)
logger.info(
"Merge complete: schedule='%s' + certificate='%s' — %d conflict(s)",
schedule_filename, certificate_filename, len(conflicts),
)
return merged, conflicts
# ---------------------------------------------------------------------------
# Private merge helpers
# ---------------------------------------------------------------------------
def _first(*values):
"""Return the first non-None value, or None if all are None."""
for v in values:
if v is not None:
return v
return None
def _check_conflict(
conflicts: list[ConflictEntry],
field: str,
sched_val,
cert_val,
winner: str,
):
"""
Detect a conflict between two scalar values, record it, and return the winner's value.
A conflict is logged only when both values are non-None *and* differ.
``winner`` must be ``"schedule"`` or ``"certificate"``.
"""
if sched_val is not None and cert_val is not None:
if str(sched_val).strip().lower() != str(cert_val).strip().lower():
conflicts.append(ConflictEntry(
field=field,
schedule_value=str(sched_val),
certificate_value=str(cert_val),
winner=winner,
))
if winner == "certificate":
return _first(cert_val, sched_val)
return _first(sched_val, cert_val) # schedule wins (default)
def _find_matching_driver(name: str, candidates: list[Driver]) -> Driver | None:
"""
Find the best-matching driver from *candidates* using fuzzy name matching.
Uses ``rapidfuzz.fuzz.token_sort_ratio`` so middle-name or word-order
differences (e.g. "JOHN A SMITH" vs "SMITH JOHN") still match.
Returns None when the best score is below ``_DRIVER_NAME_MATCH_THRESHOLD``.
"""
try:
from rapidfuzz import fuzz as rfuzz
except ImportError:
# Graceful fallback: exact uppercase match (original behaviour)
upper = name.strip().upper()
return next((d for d in candidates if d.name.strip().upper() == upper), None)
best_score = 0
best_driver: Driver | None = None
for candidate in candidates:
score = rfuzz.token_sort_ratio(name.strip(), candidate.name.strip())
if score > best_score:
best_score = score
best_driver = candidate
return best_driver if best_score >= _DRIVER_NAME_MATCH_THRESHOLD else None
def _merge_policy_header(
sched: UKMotorGoldenRecord,
cert: UKMotorGoldenRecord,
conflicts: list[ConflictEntry],
) -> Optional[PolicyHeader]:
"""Schedule is master; fill any gap from Certificate."""
sh = sched.policy_header or PolicyHeader()
ch = cert.policy_header or PolicyHeader()
poc: Optional[PeriodOfCover] = _first(sh.period_of_cover, ch.period_of_cover)
return PolicyHeader(
policy_number=_check_conflict(conflicts, "policy_header.policy_number", sh.policy_number, ch.policy_number, "schedule"),
insurer=_check_conflict(conflicts, "policy_header.insurer", sh.insurer, ch.insurer, "schedule"),
product_name=_check_conflict(conflicts, "policy_header.product_name", sh.product_name, ch.product_name, "schedule"),
period_of_cover=poc,
)
def _merge_drivers(
sched: UKMotorGoldenRecord,
cert: UKMotorGoldenRecord,
conflicts: list[ConflictEntry],
) -> list[Driver]:
"""
Schedule drivers are the base (they carry DOB, occupation, license_type).
For each Schedule driver, fuzzy-match against Certificate drivers and enrich
with relationship or is_main_driver if the Schedule record lacks them.
Falls back to the Certificate list when Schedule has no drivers.
Uses rapidfuzz ``token_sort_ratio`` with an 85-point threshold so minor
name variations (initials, hyphenation, word order) still merge correctly.
"""
sched_drivers = sched.driver_details or []
cert_drivers = cert.driver_details or []
if not sched_drivers:
return cert_drivers
merged: list[Driver] = []
for sd in sched_drivers:
cd = _find_matching_driver(sd.name, cert_drivers)
if cd is not None and sd.is_main_driver != cd.is_main_driver:
conflicts.append(ConflictEntry(
field=f"driver_details[{sd.name}].is_main_driver",
schedule_value=str(sd.is_main_driver),
certificate_value=str(cd.is_main_driver),
winner="schedule",
))
merged.append(Driver(
name=sd.name,
dob=_first(sd.dob, cd.dob if cd else None),
relationship=_first(sd.relationship, cd.relationship if cd else None),
occupation=_first(sd.occupation, cd.occupation if cd else None),
license_type=_first(sd.license_type, cd.license_type if cd else None),
is_main_driver=sd.is_main_driver or (cd.is_main_driver if cd else False),
specific_excess=_first(sd.specific_excess, cd.specific_excess if cd else None),
))
return merged
def _merge_cover_and_excesses(
sched: UKMotorGoldenRecord,
cert: UKMotorGoldenRecord,
conflicts: list[ConflictEntry],
) -> Optional[CoverAndExcesses]:
"""
Hybrid merge:
- class_of_use, driving_other_cars → Certificate is master
- cover_type, NCB, excess_breakdown → Schedule is master
"""
sc = sched.cover_and_excesses or CoverAndExcesses()
cc = cert.cover_and_excesses or CoverAndExcesses()
return CoverAndExcesses(
cover_type=_check_conflict(conflicts, "cover_and_excesses.cover_type", sc.cover_type, cc.cover_type, "schedule"),
no_claims_discount=_first(sc.no_claims_discount, cc.no_claims_discount),
excess_breakdown=_first(sc.excess_breakdown, cc.excess_breakdown),
# Certificate is authoritative for legal-use fields
class_of_use=_check_conflict(conflicts, "cover_and_excesses.class_of_use", sc.class_of_use, cc.class_of_use, "certificate"),
driving_other_cars=_check_conflict(conflicts, "cover_and_excesses.driving_other_cars", sc.driving_other_cars, cc.driving_other_cars, "certificate"),
)