Spaces:
Running
Running
| """ | |
| provenance.py — Post-extraction provenance mapping for the Visual Audit UI. | |
| After the LLM extracts a flat Golden Record, this module walks the record and | |
| fuzzy-matches each extracted value against a ProvenanceCorpus built from the | |
| Docling document IR. The LLM is never asked to self-report geometry — that | |
| would cause hallucinations; this module handles localisation as a pure | |
| post-processing step. | |
| Coordinate convention | |
| ───────────────────── | |
| Docling bbox : PDF space — origin bottom-left, y increases upward, unit = pt | |
| Stored bbox : Browser % — origin top-left, y increases downward, range 0–100 | |
| Conversion (per axis): | |
| x0% = bbox.l / page_width * 100 | |
| y0% = (page_height - bbox.t) / page_height * 100 # top of element | |
| x1% = bbox.r / page_width * 100 | |
| y1% = (page_height - bbox.b) / page_height * 100 # bottom of element | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import re | |
| from dataclasses import dataclass | |
| from typing import Any, Iterator | |
| logger = logging.getLogger(__name__) | |
| # ── Matching parameters ────────────────────────────────────────────────────── | |
| _MATCH_THRESHOLD = 78 # minimum rapidfuzz WRatio (0–100) for normalised-value fallback | |
| _CITATION_THRESHOLD = 88 # minimum partial_ratio for LLM-supplied verbatim citation quotes | |
| _MIN_VALUE_LEN = 4 # skip matching for values shorter than this (too ambiguous) | |
| # Leaf field names whose values are boolean-like and would match too broadly | |
| _SKIP_LEAF_NAMES = { | |
| "is_main_driver", "protected", "has_security_device", | |
| "tracker_fitted", "driving_other_cars", | |
| } | |
| # Top-level section names to skip entirely. | |
| # `source_document` and `field_citations` are internal provenance fields — | |
| # they don't contain verbatim PDF values so matching against them is meaningless. | |
| _SKIP_SECTION_NAMES = {"source_document", "field_citations"} | |
| # Document types whose corpora are unreliable for field-level matching. | |
| # Policy Booklets contain generic boilerplate — matching against them produces | |
| # false positives for almost every field ("Full", "UK", date digits, etc.). | |
| _EXCLUDE_FROM_MATCHING: set[str] = {"PolicyBooklet", "Unknown"} | |
| # Padding added to each bbox for display. The Docling bbox is a tight text | |
| # box (~1% page height per line) which is hard to see. We expand it so the | |
| # highlight is clearly visible without losing positional accuracy. | |
| _BBOX_PAD_X = 0.4 # % to expand left/right | |
| _BBOX_PAD_Y = 0.6 # % to expand top/bottom | |
| _BBOX_MIN_H = 2.0 # % minimum height after padding | |
| # --------------------------------------------------------------------------- | |
| # Corpus data structures | |
| # --------------------------------------------------------------------------- | |
| class CorpusItem: | |
| """One text element from a Docling DoclingDocument, with browser % geometry.""" | |
| text: str | |
| page: int | |
| bbox: list[float] # [x0%, y0%, x1%, y1%] — top-left origin, 0–100 | |
| source_filename: str | |
| class ProvenanceCorpus: | |
| """All extractable text elements from one PDF, with their page geometry.""" | |
| def __init__(self, source_filename: str = "", doc_type: str = "Unknown") -> None: | |
| self.source_filename = source_filename | |
| self.doc_type = doc_type # e.g. "Schedule", "Certificate", "PolicyBooklet" | |
| self.items: list[CorpusItem] = [] | |
| # ------------------------------------------------------------------ | |
| # Public API | |
| # ------------------------------------------------------------------ | |
| def add_from_docling(self, doc: Any, filename: str) -> None: | |
| """ | |
| Populate the corpus from a Docling DoclingDocument. | |
| Safely handles API variations across docling versions — logs a warning | |
| rather than propagating exceptions, so the calling pipeline stays alive | |
| even if provenance extraction fails. | |
| """ | |
| self.source_filename = filename | |
| try: | |
| self._extract_items(doc, filename) | |
| logger.debug( | |
| "Corpus '%s': %d items, %d pages", | |
| filename, len(self.items), self._count_pages(doc), | |
| ) | |
| except Exception as exc: # noqa: BLE001 | |
| logger.warning( | |
| "Provenance extraction skipped for '%s': %s", filename, exc | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Private helpers | |
| # ------------------------------------------------------------------ | |
| def _extract_items(self, doc: Any, filename: str) -> None: | |
| page_sizes = _build_page_sizes(doc) | |
| if not page_sizes: | |
| logger.debug("No page size data for '%s' — provenance skipped", filename) | |
| return | |
| for item in _iter_items(doc): | |
| text = _item_text(item) | |
| if not text or len(text) < 2: | |
| continue | |
| for prov in getattr(item, "prov", []): | |
| self._add_prov_item(prov, text, filename, page_sizes) | |
| def _add_prov_item( | |
| self, | |
| prov: Any, | |
| text: str, | |
| filename: str, | |
| page_sizes: dict[int, tuple[float, float]], | |
| ) -> None: | |
| page_no = getattr(prov, "page_no", None) | |
| if page_no is None: | |
| return | |
| page_no = int(page_no) | |
| if page_no not in page_sizes: | |
| return | |
| pw, ph = page_sizes[page_no] | |
| bbox = getattr(prov, "bbox", None) | |
| if bbox is None: | |
| return | |
| l = float(getattr(bbox, "l", 0)) | |
| t_v = float(getattr(bbox, "t", ph)) # top in PDF space (high y value) | |
| r = float(getattr(bbox, "r", pw)) | |
| b = float(getattr(bbox, "b", 0)) # bottom in PDF space (low y value) | |
| # Convert: PDF (bottom-left origin, pts) → browser % (top-left origin) | |
| x0 = _clamp(l / pw * 100) | |
| y0 = _clamp((ph - t_v) / ph * 100) # top of element in browser coords | |
| x1 = _clamp(r / pw * 100) | |
| y1 = _clamp((ph - b) / ph * 100) # bottom of element in browser coords | |
| self.items.append(CorpusItem( | |
| text=text, | |
| page=page_no, | |
| bbox=[round(x0, 3), round(y0, 3), round(x1, 3), round(y1, 3)], | |
| source_filename=filename, | |
| )) | |
| def _count_pages(doc: Any) -> int: | |
| return len(getattr(doc, "pages", {})) | |
| # --------------------------------------------------------------------------- | |
| # Module-level helpers for corpus building | |
| # --------------------------------------------------------------------------- | |
| def _build_page_sizes(doc: Any) -> dict[int, tuple[float, float]]: | |
| sizes: dict[int, tuple[float, float]] = {} | |
| for page_no, page_item in getattr(doc, "pages", {}).items(): | |
| size = getattr(page_item, "size", None) | |
| if size: | |
| w = float(getattr(size, "width", 0)) | |
| h = float(getattr(size, "height", 0)) | |
| if w > 0 and h > 0: | |
| sizes[int(page_no)] = (w, h) | |
| return sizes | |
| def _iter_items(doc: Any): | |
| """Yield all document items, trying iterate_items() first then .texts/.tables.""" | |
| try: | |
| for item, _level in doc.iterate_items(): | |
| yield item | |
| except AttributeError: | |
| for item in getattr(doc, "texts", []): | |
| yield item | |
| for item in getattr(doc, "tables", []): | |
| yield item | |
| def _item_text(item: Any) -> str: | |
| """Extract a string from a Docling TextItem or TableItem.""" | |
| text = getattr(item, "text", None) | |
| if text is not None: | |
| return str(text).strip() | |
| # TableItem: concatenate all cell text into one searchable blob | |
| data = getattr(item, "data", None) | |
| if data is not None: | |
| cells = [ | |
| str(getattr(cell, "text", "")).strip() | |
| for row in getattr(data, "grid", []) | |
| for cell in row | |
| ] | |
| return " | ".join(c for c in cells if c) | |
| return "" | |
| def _clamp(v: float) -> float: | |
| return max(0.0, min(100.0, v)) | |
| # --------------------------------------------------------------------------- | |
| # Field-level provenance builder (main public function) | |
| # --------------------------------------------------------------------------- | |
| def build_provenance( | |
| record: Any, # UKMotorGoldenRecord | |
| corpora: list[ProvenanceCorpus], | |
| ) -> list[Any]: # list[FieldProvenance] | |
| """ | |
| Walk the Golden Record and fuzzy-match each extracted value against all | |
| trusted corpora (Schedule, Certificate, StatementOfFact). | |
| Policy Booklet corpora are excluded — they contain generic boilerplate | |
| that produces false positives for almost every field value. | |
| Returns a ``FieldProvenance`` entry for every field that can be located | |
| above the match threshold. Fields with no good corpus match are omitted — | |
| the UI shows them as "No location data". | |
| """ | |
| from schema import FieldProvenance, Location # local import avoids circular dep | |
| try: | |
| from rapidfuzz import fuzz as rfuzz | |
| except ImportError: | |
| logger.warning( | |
| "rapidfuzz not installed — provenance matching disabled. " | |
| "Run: pip install rapidfuzz" | |
| ) | |
| return [] | |
| # Filter to trusted corpora only (exclude Policy Booklet and Unknown docs) | |
| trusted_corpora = [ | |
| c for c in corpora if c.doc_type not in _EXCLUDE_FROM_MATCHING | |
| ] | |
| if not trusted_corpora: | |
| logger.warning( | |
| "No trusted corpora available — all %d corpus/corpora are excluded " | |
| "(types: %s). Provenance will be empty.", | |
| len(corpora), | |
| [c.doc_type for c in corpora], | |
| ) | |
| return [] | |
| # LLM-supplied verbatim source quotes: field_path → raw text phrase. | |
| # These are always preferred over the normalised extracted value because | |
| # the LLM copies them directly from the document (e.g. "15/04/2026 at 00:00 | |
| # hours" rather than the ISO "2026-04-15T00:00:00" we store in the record). | |
| citation_map: dict[str, str] = dict(getattr(record, "field_citations", None) or {}) | |
| logger.info(" field_citations from LLM: %d entries", len(citation_map)) | |
| results: list[FieldProvenance] = [] | |
| citation_hits = 0 | |
| # Track assigned positions to avoid two fields pointing to the same corpus item. | |
| # Key: (source_filename, page, x0, y0) — unpadded, original corpus position. | |
| used_positions: set[tuple] = set() | |
| for field_path, value_str in _walk_record(record): | |
| leaf = field_path.split(".")[-1].strip("[]0123456789") | |
| if leaf in _SKIP_LEAF_NAMES: | |
| continue | |
| # Prefer the verbatim citation quote; fall back to the normalised value. | |
| # For ISO dates/datetimes also try UK DD/MM/YYYY format as a secondary fallback. | |
| search_str = citation_map.get(field_path, value_str) | |
| alt_search: str | None = None | |
| if field_path not in citation_map: | |
| alt_search = _iso_to_uk_date(value_str) | |
| if len(search_str) < _MIN_VALUE_LEN: | |
| continue | |
| using_citation = field_path in citation_map | |
| # When matching a citation quote use partial_ratio — the quote is a | |
| # verbatim substring of the document and WRatio penalises length disparity. | |
| # For normalised fallback values use WRatio to avoid short false matches. | |
| score_fn = rfuzz.partial_ratio if using_citation else rfuzz.WRatio | |
| threshold = _CITATION_THRESHOLD if using_citation else _MATCH_THRESHOLD | |
| # Find best match, preferring positions not yet assigned to another field. | |
| best_score = 0 | |
| best_item: CorpusItem | None = None | |
| best_unused_score = 0 | |
| best_unused_item: CorpusItem | None = None | |
| for corpus in trusted_corpora: | |
| for item in corpus.items: | |
| score = score_fn(search_str.lower(), item.text.lower()) | |
| # Also try UK-formatted date if available | |
| if alt_search and score < threshold: | |
| alt_score = rfuzz.partial_ratio(alt_search, item.text.lower()) | |
| if alt_score > score: | |
| score = alt_score | |
| pos_key = (item.source_filename, item.page, item.bbox[0], item.bbox[1]) | |
| if score > best_score: | |
| best_score = score | |
| best_item = item | |
| if score > best_unused_score and pos_key not in used_positions: | |
| best_unused_score = score | |
| best_unused_item = item | |
| # Prefer an unused position if it scores above threshold, | |
| # otherwise fall back to best overall (may share a location). | |
| if best_unused_item is not None and best_unused_score >= threshold: | |
| chosen_item = best_unused_item | |
| chosen_score = best_unused_score | |
| elif best_item is not None and best_score >= threshold: | |
| chosen_item = best_item | |
| chosen_score = best_score | |
| else: | |
| continue | |
| pos_key = (chosen_item.source_filename, chosen_item.page, chosen_item.bbox[0], chosen_item.bbox[1]) | |
| used_positions.add(pos_key) | |
| if using_citation: | |
| citation_hits += 1 | |
| results.append(FieldProvenance( | |
| field_path=field_path, | |
| extracted_value=value_str, | |
| matched_text=chosen_item.text[:200], # truncate very long table blobs | |
| match_score=round(chosen_score / 100.0, 3), | |
| source_filename=chosen_item.source_filename, | |
| location=Location( | |
| page=chosen_item.page, | |
| bbox=_padded_bbox(chosen_item.bbox), | |
| ), | |
| )) | |
| total = _count_total_fields(record) | |
| logger.info( | |
| "Provenance: %d / %d fields located (%d via citation quotes, %d via fuzzy fallback) " | |
| "— trusted corpora: %s", | |
| len(results), total, | |
| citation_hits, len(results) - citation_hits, | |
| [c.source_filename for c in trusted_corpora], | |
| ) | |
| return results | |
| # --------------------------------------------------------------------------- | |
| # Field-walking helpers | |
| # --------------------------------------------------------------------------- | |
| def _walk_record(record: Any) -> Iterator[tuple[str, str]]: | |
| """Yield (field_path, string_value) for all non-None leaf values in the record.""" | |
| data = record.model_dump(exclude_none=True) | |
| yield from _walk_dict(data, "") | |
| def _walk_dict(d: dict, prefix: str) -> Iterator[tuple[str, str]]: | |
| for key, val in d.items(): | |
| # Skip whole sections that produce unreliable or irrelevant matches | |
| top_key = prefix.split(".")[0].split("[")[0] if prefix else key | |
| if key in _SKIP_SECTION_NAMES or top_key in _SKIP_SECTION_NAMES: | |
| continue | |
| path = f"{prefix}.{key}" if prefix else key | |
| if isinstance(val, dict): | |
| yield from _walk_dict(val, path) | |
| elif isinstance(val, list): | |
| yield from _walk_list(val, path) | |
| elif val is not None: | |
| yield path, str(val) | |
| def _walk_list(lst: list, prefix: str) -> Iterator[tuple[str, str]]: | |
| for i, item in enumerate(lst): | |
| path = f"{prefix}[{i}]" | |
| if isinstance(item, dict): | |
| yield from _walk_dict(item, path) | |
| elif item is not None: | |
| yield path, str(item) | |
| def _count_total_fields(record: Any) -> int: | |
| data = record.model_dump(exclude_none=True) | |
| return sum(1 for _ in _walk_dict(data, "")) | |
| # ISO 8601 date/datetime patterns → UK DD/MM/YYYY | |
| _ISO_DATE_RE = re.compile(r'^(\d{4})-(\d{2})-(\d{2})') | |
| def _iso_to_uk_date(value: str) -> str | None: | |
| """Convert ISO date/datetime string to UK DD/MM/YYYY for document matching. | |
| Returns the UK-format string (e.g. "15/04/2026") if value looks like an | |
| ISO date, otherwise returns None. | |
| """ | |
| m = _ISO_DATE_RE.match(value.strip()) | |
| if m: | |
| yyyy, mm, dd = m.group(1), m.group(2), m.group(3) | |
| return f"{dd}/{mm}/{yyyy}" | |
| return None | |
| def _padded_bbox(bbox: list[float]) -> list[float]: | |
| """Expand a tight Docling text bbox so highlights are clearly visible in the UI.""" | |
| x0, y0, x1, y1 = bbox | |
| x0 = _clamp(x0 - _BBOX_PAD_X) | |
| y0 = _clamp(y0 - _BBOX_PAD_Y) | |
| x1 = _clamp(x1 + _BBOX_PAD_X) | |
| y1 = _clamp(y1 + _BBOX_PAD_Y) | |
| # Enforce minimum height so single-line text is always visible | |
| if (y1 - y0) < _BBOX_MIN_H: | |
| mid = (y0 + y1) / 2 | |
| y0 = _clamp(mid - _BBOX_MIN_H / 2) | |
| y1 = _clamp(mid + _BBOX_MIN_H / 2) | |
| return [round(x0, 3), round(y0, 3), round(x1, 3), round(y1, 3)] | |