File size: 16,662 Bytes
be54038
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
"""
provenance.py β€” Post-extraction provenance mapping for the Visual Audit UI.

After the LLM extracts a flat Golden Record, this module walks the record and
fuzzy-matches each extracted value against a ProvenanceCorpus built from the
Docling document IR.  The LLM is never asked to self-report geometry β€” that
would cause hallucinations; this module handles localisation as a pure
post-processing step.

Coordinate convention
─────────────────────
  Docling bbox  : PDF space β€” origin bottom-left, y increases upward, unit = pt
  Stored bbox   : Browser % β€” origin top-left, y increases downward, range 0–100

  Conversion (per axis):
      x0% = bbox.l / page_width  * 100
      y0% = (page_height - bbox.t) / page_height * 100   # top of element
      x1% = bbox.r / page_width  * 100
      y1% = (page_height - bbox.b) / page_height * 100   # bottom of element
"""
from __future__ import annotations

import logging
import re
from dataclasses import dataclass
from typing import Any, Iterator

logger = logging.getLogger(__name__)

# ── Matching parameters ──────────────────────────────────────────────────────
_MATCH_THRESHOLD = 78    # minimum rapidfuzz WRatio (0–100) for normalised-value fallback
_CITATION_THRESHOLD = 88 # minimum partial_ratio for LLM-supplied verbatim citation quotes
_MIN_VALUE_LEN = 4       # skip matching for values shorter than this (too ambiguous)

# Leaf field names whose values are boolean-like and would match too broadly
_SKIP_LEAF_NAMES = {
    "is_main_driver", "protected", "has_security_device",
    "tracker_fitted", "driving_other_cars",
}

# Top-level section names to skip entirely.
# `source_document` and `field_citations` are internal provenance fields β€”
# they don't contain verbatim PDF values so matching against them is meaningless.
_SKIP_SECTION_NAMES = {"source_document", "field_citations"}

# Document types whose corpora are unreliable for field-level matching.
# Policy Booklets contain generic boilerplate β€” matching against them produces
# false positives for almost every field ("Full", "UK", date digits, etc.).
_EXCLUDE_FROM_MATCHING: set[str] = {"PolicyBooklet", "Unknown"}

# Padding added to each bbox for display.  The Docling bbox is a tight text
# box (~1% page height per line) which is hard to see.  We expand it so the
# highlight is clearly visible without losing positional accuracy.
_BBOX_PAD_X = 0.4   # % to expand left/right
_BBOX_PAD_Y = 0.6   # % to expand top/bottom
_BBOX_MIN_H = 2.0   # % minimum height after padding


# ---------------------------------------------------------------------------
# Corpus data structures
# ---------------------------------------------------------------------------


@dataclass
class CorpusItem:
    """One text element from a Docling DoclingDocument, with browser % geometry."""

    text: str
    page: int
    bbox: list[float]       # [x0%, y0%, x1%, y1%] β€” top-left origin, 0–100
    source_filename: str


class ProvenanceCorpus:
    """All extractable text elements from one PDF, with their page geometry."""

    def __init__(self, source_filename: str = "", doc_type: str = "Unknown") -> None:
        self.source_filename = source_filename
        self.doc_type = doc_type   # e.g. "Schedule", "Certificate", "PolicyBooklet"
        self.items: list[CorpusItem] = []

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    def add_from_docling(self, doc: Any, filename: str) -> None:
        """
        Populate the corpus from a Docling DoclingDocument.

        Safely handles API variations across docling versions β€” logs a warning
        rather than propagating exceptions, so the calling pipeline stays alive
        even if provenance extraction fails.
        """
        self.source_filename = filename
        try:
            self._extract_items(doc, filename)
            logger.debug(
                "Corpus '%s': %d items, %d pages",
                filename, len(self.items), self._count_pages(doc),
            )
        except Exception as exc:  # noqa: BLE001
            logger.warning(
                "Provenance extraction skipped for '%s': %s", filename, exc
            )

    # ------------------------------------------------------------------
    # Private helpers
    # ------------------------------------------------------------------

    def _extract_items(self, doc: Any, filename: str) -> None:
        page_sizes = _build_page_sizes(doc)
        if not page_sizes:
            logger.debug("No page size data for '%s' β€” provenance skipped", filename)
            return

        for item in _iter_items(doc):
            text = _item_text(item)
            if not text or len(text) < 2:
                continue
            for prov in getattr(item, "prov", []):
                self._add_prov_item(prov, text, filename, page_sizes)

    def _add_prov_item(
        self,
        prov: Any,
        text: str,
        filename: str,
        page_sizes: dict[int, tuple[float, float]],
    ) -> None:
        page_no = getattr(prov, "page_no", None)
        if page_no is None:
            return
        page_no = int(page_no)
        if page_no not in page_sizes:
            return

        pw, ph = page_sizes[page_no]
        bbox = getattr(prov, "bbox", None)
        if bbox is None:
            return

        l   = float(getattr(bbox, "l", 0))
        t_v = float(getattr(bbox, "t", ph))  # top in PDF space  (high y value)
        r   = float(getattr(bbox, "r", pw))
        b   = float(getattr(bbox, "b", 0))   # bottom in PDF space (low y value)

        # Convert: PDF (bottom-left origin, pts) β†’ browser % (top-left origin)
        x0 = _clamp(l   / pw * 100)
        y0 = _clamp((ph - t_v) / ph * 100)  # top of element in browser coords
        x1 = _clamp(r   / pw * 100)
        y1 = _clamp((ph - b)   / ph * 100)  # bottom of element in browser coords

        self.items.append(CorpusItem(
            text=text,
            page=page_no,
            bbox=[round(x0, 3), round(y0, 3), round(x1, 3), round(y1, 3)],
            source_filename=filename,
        ))

    @staticmethod
    def _count_pages(doc: Any) -> int:
        return len(getattr(doc, "pages", {}))


# ---------------------------------------------------------------------------
# Module-level helpers for corpus building
# ---------------------------------------------------------------------------


def _build_page_sizes(doc: Any) -> dict[int, tuple[float, float]]:
    sizes: dict[int, tuple[float, float]] = {}
    for page_no, page_item in getattr(doc, "pages", {}).items():
        size = getattr(page_item, "size", None)
        if size:
            w = float(getattr(size, "width", 0))
            h = float(getattr(size, "height", 0))
            if w > 0 and h > 0:
                sizes[int(page_no)] = (w, h)
    return sizes


def _iter_items(doc: Any):
    """Yield all document items, trying iterate_items() first then .texts/.tables."""
    try:
        for item, _level in doc.iterate_items():
            yield item
    except AttributeError:
        for item in getattr(doc, "texts", []):
            yield item
        for item in getattr(doc, "tables", []):
            yield item


def _item_text(item: Any) -> str:
    """Extract a string from a Docling TextItem or TableItem."""
    text = getattr(item, "text", None)
    if text is not None:
        return str(text).strip()
    # TableItem: concatenate all cell text into one searchable blob
    data = getattr(item, "data", None)
    if data is not None:
        cells = [
            str(getattr(cell, "text", "")).strip()
            for row in getattr(data, "grid", [])
            for cell in row
        ]
        return " | ".join(c for c in cells if c)
    return ""


def _clamp(v: float) -> float:
    return max(0.0, min(100.0, v))


# ---------------------------------------------------------------------------
# Field-level provenance builder (main public function)
# ---------------------------------------------------------------------------


def build_provenance(
    record: Any,                         # UKMotorGoldenRecord
    corpora: list[ProvenanceCorpus],
) -> list[Any]:                          # list[FieldProvenance]
    """
    Walk the Golden Record and fuzzy-match each extracted value against all
    trusted corpora (Schedule, Certificate, StatementOfFact).

    Policy Booklet corpora are excluded β€” they contain generic boilerplate
    that produces false positives for almost every field value.

    Returns a ``FieldProvenance`` entry for every field that can be located
    above the match threshold.  Fields with no good corpus match are omitted β€”
    the UI shows them as "No location data".
    """
    from schema import FieldProvenance, Location  # local import avoids circular dep

    try:
        from rapidfuzz import fuzz as rfuzz
    except ImportError:
        logger.warning(
            "rapidfuzz not installed β€” provenance matching disabled. "
            "Run: pip install rapidfuzz"
        )
        return []

    # Filter to trusted corpora only (exclude Policy Booklet and Unknown docs)
    trusted_corpora = [
        c for c in corpora if c.doc_type not in _EXCLUDE_FROM_MATCHING
    ]
    if not trusted_corpora:
        logger.warning(
            "No trusted corpora available β€” all %d corpus/corpora are excluded "
            "(types: %s). Provenance will be empty.",
            len(corpora),
            [c.doc_type for c in corpora],
        )
        return []

    # LLM-supplied verbatim source quotes: field_path β†’ raw text phrase.
    # These are always preferred over the normalised extracted value because
    # the LLM copies them directly from the document (e.g. "15/04/2026 at 00:00
    # hours" rather than the ISO "2026-04-15T00:00:00" we store in the record).
    citation_map: dict[str, str] = dict(getattr(record, "field_citations", None) or {})
    logger.info("  field_citations from LLM: %d entries", len(citation_map))

    results: list[FieldProvenance] = []
    citation_hits = 0
    # Track assigned positions to avoid two fields pointing to the same corpus item.
    # Key: (source_filename, page, x0, y0) β€” unpadded, original corpus position.
    used_positions: set[tuple] = set()

    for field_path, value_str in _walk_record(record):
        leaf = field_path.split(".")[-1].strip("[]0123456789")
        if leaf in _SKIP_LEAF_NAMES:
            continue

        # Prefer the verbatim citation quote; fall back to the normalised value.
        # For ISO dates/datetimes also try UK DD/MM/YYYY format as a secondary fallback.
        search_str = citation_map.get(field_path, value_str)
        alt_search: str | None = None
        if field_path not in citation_map:
            alt_search = _iso_to_uk_date(value_str)

        if len(search_str) < _MIN_VALUE_LEN:
            continue

        using_citation = field_path in citation_map
        # When matching a citation quote use partial_ratio β€” the quote is a
        # verbatim substring of the document and WRatio penalises length disparity.
        # For normalised fallback values use WRatio to avoid short false matches.
        score_fn = rfuzz.partial_ratio if using_citation else rfuzz.WRatio
        threshold = _CITATION_THRESHOLD if using_citation else _MATCH_THRESHOLD

        # Find best match, preferring positions not yet assigned to another field.
        best_score = 0
        best_item: CorpusItem | None = None
        best_unused_score = 0
        best_unused_item: CorpusItem | None = None

        for corpus in trusted_corpora:
            for item in corpus.items:
                score = score_fn(search_str.lower(), item.text.lower())
                # Also try UK-formatted date if available
                if alt_search and score < threshold:
                    alt_score = rfuzz.partial_ratio(alt_search, item.text.lower())
                    if alt_score > score:
                        score = alt_score
                pos_key = (item.source_filename, item.page, item.bbox[0], item.bbox[1])
                if score > best_score:
                    best_score = score
                    best_item = item
                if score > best_unused_score and pos_key not in used_positions:
                    best_unused_score = score
                    best_unused_item = item

        # Prefer an unused position if it scores above threshold,
        # otherwise fall back to best overall (may share a location).
        if best_unused_item is not None and best_unused_score >= threshold:
            chosen_item = best_unused_item
            chosen_score = best_unused_score
        elif best_item is not None and best_score >= threshold:
            chosen_item = best_item
            chosen_score = best_score
        else:
            continue

        pos_key = (chosen_item.source_filename, chosen_item.page, chosen_item.bbox[0], chosen_item.bbox[1])
        used_positions.add(pos_key)

        if using_citation:
            citation_hits += 1
        results.append(FieldProvenance(
            field_path=field_path,
            extracted_value=value_str,
            matched_text=chosen_item.text[:200],  # truncate very long table blobs
            match_score=round(chosen_score / 100.0, 3),
            source_filename=chosen_item.source_filename,
            location=Location(
                page=chosen_item.page,
                bbox=_padded_bbox(chosen_item.bbox),
            ),
        ))

    total = _count_total_fields(record)
    logger.info(
        "Provenance: %d / %d fields located (%d via citation quotes, %d via fuzzy fallback) "
        "β€” trusted corpora: %s",
        len(results), total,
        citation_hits, len(results) - citation_hits,
        [c.source_filename for c in trusted_corpora],
    )
    return results


# ---------------------------------------------------------------------------
# Field-walking helpers
# ---------------------------------------------------------------------------


def _walk_record(record: Any) -> Iterator[tuple[str, str]]:
    """Yield (field_path, string_value) for all non-None leaf values in the record."""
    data = record.model_dump(exclude_none=True)
    yield from _walk_dict(data, "")


def _walk_dict(d: dict, prefix: str) -> Iterator[tuple[str, str]]:
    for key, val in d.items():
        # Skip whole sections that produce unreliable or irrelevant matches
        top_key = prefix.split(".")[0].split("[")[0] if prefix else key
        if key in _SKIP_SECTION_NAMES or top_key in _SKIP_SECTION_NAMES:
            continue
        path = f"{prefix}.{key}" if prefix else key
        if isinstance(val, dict):
            yield from _walk_dict(val, path)
        elif isinstance(val, list):
            yield from _walk_list(val, path)
        elif val is not None:
            yield path, str(val)


def _walk_list(lst: list, prefix: str) -> Iterator[tuple[str, str]]:
    for i, item in enumerate(lst):
        path = f"{prefix}[{i}]"
        if isinstance(item, dict):
            yield from _walk_dict(item, path)
        elif item is not None:
            yield path, str(item)


def _count_total_fields(record: Any) -> int:
    data = record.model_dump(exclude_none=True)
    return sum(1 for _ in _walk_dict(data, ""))


# ISO 8601 date/datetime patterns β†’ UK DD/MM/YYYY
_ISO_DATE_RE = re.compile(r'^(\d{4})-(\d{2})-(\d{2})')


def _iso_to_uk_date(value: str) -> str | None:
    """Convert ISO date/datetime string to UK DD/MM/YYYY for document matching.

    Returns the UK-format string (e.g. "15/04/2026") if value looks like an
    ISO date, otherwise returns None.
    """
    m = _ISO_DATE_RE.match(value.strip())
    if m:
        yyyy, mm, dd = m.group(1), m.group(2), m.group(3)
        return f"{dd}/{mm}/{yyyy}"
    return None


def _padded_bbox(bbox: list[float]) -> list[float]:
    """Expand a tight Docling text bbox so highlights are clearly visible in the UI."""
    x0, y0, x1, y1 = bbox
    x0 = _clamp(x0 - _BBOX_PAD_X)
    y0 = _clamp(y0 - _BBOX_PAD_Y)
    x1 = _clamp(x1 + _BBOX_PAD_X)
    y1 = _clamp(y1 + _BBOX_PAD_Y)
    # Enforce minimum height so single-line text is always visible
    if (y1 - y0) < _BBOX_MIN_H:
        mid = (y0 + y1) / 2
        y0 = _clamp(mid - _BBOX_MIN_H / 2)
        y1 = _clamp(mid + _BBOX_MIN_H / 2)
    return [round(x0, 3), round(y0, 3), round(x1, 3), round(y1, 3)]