Spaces:
Running
Running
File size: 16,662 Bytes
be54038 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 | """
provenance.py β Post-extraction provenance mapping for the Visual Audit UI.
After the LLM extracts a flat Golden Record, this module walks the record and
fuzzy-matches each extracted value against a ProvenanceCorpus built from the
Docling document IR. The LLM is never asked to self-report geometry β that
would cause hallucinations; this module handles localisation as a pure
post-processing step.
Coordinate convention
βββββββββββββββββββββ
Docling bbox : PDF space β origin bottom-left, y increases upward, unit = pt
Stored bbox : Browser % β origin top-left, y increases downward, range 0β100
Conversion (per axis):
x0% = bbox.l / page_width * 100
y0% = (page_height - bbox.t) / page_height * 100 # top of element
x1% = bbox.r / page_width * 100
y1% = (page_height - bbox.b) / page_height * 100 # bottom of element
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from typing import Any, Iterator
logger = logging.getLogger(__name__)
# ββ Matching parameters ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
_MATCH_THRESHOLD = 78 # minimum rapidfuzz WRatio (0β100) for normalised-value fallback
_CITATION_THRESHOLD = 88 # minimum partial_ratio for LLM-supplied verbatim citation quotes
_MIN_VALUE_LEN = 4 # skip matching for values shorter than this (too ambiguous)
# Leaf field names whose values are boolean-like and would match too broadly
_SKIP_LEAF_NAMES = {
"is_main_driver", "protected", "has_security_device",
"tracker_fitted", "driving_other_cars",
}
# Top-level section names to skip entirely.
# `source_document` and `field_citations` are internal provenance fields β
# they don't contain verbatim PDF values so matching against them is meaningless.
_SKIP_SECTION_NAMES = {"source_document", "field_citations"}
# Document types whose corpora are unreliable for field-level matching.
# Policy Booklets contain generic boilerplate β matching against them produces
# false positives for almost every field ("Full", "UK", date digits, etc.).
_EXCLUDE_FROM_MATCHING: set[str] = {"PolicyBooklet", "Unknown"}
# Padding added to each bbox for display. The Docling bbox is a tight text
# box (~1% page height per line) which is hard to see. We expand it so the
# highlight is clearly visible without losing positional accuracy.
_BBOX_PAD_X = 0.4 # % to expand left/right
_BBOX_PAD_Y = 0.6 # % to expand top/bottom
_BBOX_MIN_H = 2.0 # % minimum height after padding
# ---------------------------------------------------------------------------
# Corpus data structures
# ---------------------------------------------------------------------------
@dataclass
class CorpusItem:
"""One text element from a Docling DoclingDocument, with browser % geometry."""
text: str
page: int
bbox: list[float] # [x0%, y0%, x1%, y1%] β top-left origin, 0β100
source_filename: str
class ProvenanceCorpus:
"""All extractable text elements from one PDF, with their page geometry."""
def __init__(self, source_filename: str = "", doc_type: str = "Unknown") -> None:
self.source_filename = source_filename
self.doc_type = doc_type # e.g. "Schedule", "Certificate", "PolicyBooklet"
self.items: list[CorpusItem] = []
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def add_from_docling(self, doc: Any, filename: str) -> None:
"""
Populate the corpus from a Docling DoclingDocument.
Safely handles API variations across docling versions β logs a warning
rather than propagating exceptions, so the calling pipeline stays alive
even if provenance extraction fails.
"""
self.source_filename = filename
try:
self._extract_items(doc, filename)
logger.debug(
"Corpus '%s': %d items, %d pages",
filename, len(self.items), self._count_pages(doc),
)
except Exception as exc: # noqa: BLE001
logger.warning(
"Provenance extraction skipped for '%s': %s", filename, exc
)
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _extract_items(self, doc: Any, filename: str) -> None:
page_sizes = _build_page_sizes(doc)
if not page_sizes:
logger.debug("No page size data for '%s' β provenance skipped", filename)
return
for item in _iter_items(doc):
text = _item_text(item)
if not text or len(text) < 2:
continue
for prov in getattr(item, "prov", []):
self._add_prov_item(prov, text, filename, page_sizes)
def _add_prov_item(
self,
prov: Any,
text: str,
filename: str,
page_sizes: dict[int, tuple[float, float]],
) -> None:
page_no = getattr(prov, "page_no", None)
if page_no is None:
return
page_no = int(page_no)
if page_no not in page_sizes:
return
pw, ph = page_sizes[page_no]
bbox = getattr(prov, "bbox", None)
if bbox is None:
return
l = float(getattr(bbox, "l", 0))
t_v = float(getattr(bbox, "t", ph)) # top in PDF space (high y value)
r = float(getattr(bbox, "r", pw))
b = float(getattr(bbox, "b", 0)) # bottom in PDF space (low y value)
# Convert: PDF (bottom-left origin, pts) β browser % (top-left origin)
x0 = _clamp(l / pw * 100)
y0 = _clamp((ph - t_v) / ph * 100) # top of element in browser coords
x1 = _clamp(r / pw * 100)
y1 = _clamp((ph - b) / ph * 100) # bottom of element in browser coords
self.items.append(CorpusItem(
text=text,
page=page_no,
bbox=[round(x0, 3), round(y0, 3), round(x1, 3), round(y1, 3)],
source_filename=filename,
))
@staticmethod
def _count_pages(doc: Any) -> int:
return len(getattr(doc, "pages", {}))
# ---------------------------------------------------------------------------
# Module-level helpers for corpus building
# ---------------------------------------------------------------------------
def _build_page_sizes(doc: Any) -> dict[int, tuple[float, float]]:
sizes: dict[int, tuple[float, float]] = {}
for page_no, page_item in getattr(doc, "pages", {}).items():
size = getattr(page_item, "size", None)
if size:
w = float(getattr(size, "width", 0))
h = float(getattr(size, "height", 0))
if w > 0 and h > 0:
sizes[int(page_no)] = (w, h)
return sizes
def _iter_items(doc: Any):
"""Yield all document items, trying iterate_items() first then .texts/.tables."""
try:
for item, _level in doc.iterate_items():
yield item
except AttributeError:
for item in getattr(doc, "texts", []):
yield item
for item in getattr(doc, "tables", []):
yield item
def _item_text(item: Any) -> str:
"""Extract a string from a Docling TextItem or TableItem."""
text = getattr(item, "text", None)
if text is not None:
return str(text).strip()
# TableItem: concatenate all cell text into one searchable blob
data = getattr(item, "data", None)
if data is not None:
cells = [
str(getattr(cell, "text", "")).strip()
for row in getattr(data, "grid", [])
for cell in row
]
return " | ".join(c for c in cells if c)
return ""
def _clamp(v: float) -> float:
return max(0.0, min(100.0, v))
# ---------------------------------------------------------------------------
# Field-level provenance builder (main public function)
# ---------------------------------------------------------------------------
def build_provenance(
record: Any, # UKMotorGoldenRecord
corpora: list[ProvenanceCorpus],
) -> list[Any]: # list[FieldProvenance]
"""
Walk the Golden Record and fuzzy-match each extracted value against all
trusted corpora (Schedule, Certificate, StatementOfFact).
Policy Booklet corpora are excluded β they contain generic boilerplate
that produces false positives for almost every field value.
Returns a ``FieldProvenance`` entry for every field that can be located
above the match threshold. Fields with no good corpus match are omitted β
the UI shows them as "No location data".
"""
from schema import FieldProvenance, Location # local import avoids circular dep
try:
from rapidfuzz import fuzz as rfuzz
except ImportError:
logger.warning(
"rapidfuzz not installed β provenance matching disabled. "
"Run: pip install rapidfuzz"
)
return []
# Filter to trusted corpora only (exclude Policy Booklet and Unknown docs)
trusted_corpora = [
c for c in corpora if c.doc_type not in _EXCLUDE_FROM_MATCHING
]
if not trusted_corpora:
logger.warning(
"No trusted corpora available β all %d corpus/corpora are excluded "
"(types: %s). Provenance will be empty.",
len(corpora),
[c.doc_type for c in corpora],
)
return []
# LLM-supplied verbatim source quotes: field_path β raw text phrase.
# These are always preferred over the normalised extracted value because
# the LLM copies them directly from the document (e.g. "15/04/2026 at 00:00
# hours" rather than the ISO "2026-04-15T00:00:00" we store in the record).
citation_map: dict[str, str] = dict(getattr(record, "field_citations", None) or {})
logger.info(" field_citations from LLM: %d entries", len(citation_map))
results: list[FieldProvenance] = []
citation_hits = 0
# Track assigned positions to avoid two fields pointing to the same corpus item.
# Key: (source_filename, page, x0, y0) β unpadded, original corpus position.
used_positions: set[tuple] = set()
for field_path, value_str in _walk_record(record):
leaf = field_path.split(".")[-1].strip("[]0123456789")
if leaf in _SKIP_LEAF_NAMES:
continue
# Prefer the verbatim citation quote; fall back to the normalised value.
# For ISO dates/datetimes also try UK DD/MM/YYYY format as a secondary fallback.
search_str = citation_map.get(field_path, value_str)
alt_search: str | None = None
if field_path not in citation_map:
alt_search = _iso_to_uk_date(value_str)
if len(search_str) < _MIN_VALUE_LEN:
continue
using_citation = field_path in citation_map
# When matching a citation quote use partial_ratio β the quote is a
# verbatim substring of the document and WRatio penalises length disparity.
# For normalised fallback values use WRatio to avoid short false matches.
score_fn = rfuzz.partial_ratio if using_citation else rfuzz.WRatio
threshold = _CITATION_THRESHOLD if using_citation else _MATCH_THRESHOLD
# Find best match, preferring positions not yet assigned to another field.
best_score = 0
best_item: CorpusItem | None = None
best_unused_score = 0
best_unused_item: CorpusItem | None = None
for corpus in trusted_corpora:
for item in corpus.items:
score = score_fn(search_str.lower(), item.text.lower())
# Also try UK-formatted date if available
if alt_search and score < threshold:
alt_score = rfuzz.partial_ratio(alt_search, item.text.lower())
if alt_score > score:
score = alt_score
pos_key = (item.source_filename, item.page, item.bbox[0], item.bbox[1])
if score > best_score:
best_score = score
best_item = item
if score > best_unused_score and pos_key not in used_positions:
best_unused_score = score
best_unused_item = item
# Prefer an unused position if it scores above threshold,
# otherwise fall back to best overall (may share a location).
if best_unused_item is not None and best_unused_score >= threshold:
chosen_item = best_unused_item
chosen_score = best_unused_score
elif best_item is not None and best_score >= threshold:
chosen_item = best_item
chosen_score = best_score
else:
continue
pos_key = (chosen_item.source_filename, chosen_item.page, chosen_item.bbox[0], chosen_item.bbox[1])
used_positions.add(pos_key)
if using_citation:
citation_hits += 1
results.append(FieldProvenance(
field_path=field_path,
extracted_value=value_str,
matched_text=chosen_item.text[:200], # truncate very long table blobs
match_score=round(chosen_score / 100.0, 3),
source_filename=chosen_item.source_filename,
location=Location(
page=chosen_item.page,
bbox=_padded_bbox(chosen_item.bbox),
),
))
total = _count_total_fields(record)
logger.info(
"Provenance: %d / %d fields located (%d via citation quotes, %d via fuzzy fallback) "
"β trusted corpora: %s",
len(results), total,
citation_hits, len(results) - citation_hits,
[c.source_filename for c in trusted_corpora],
)
return results
# ---------------------------------------------------------------------------
# Field-walking helpers
# ---------------------------------------------------------------------------
def _walk_record(record: Any) -> Iterator[tuple[str, str]]:
"""Yield (field_path, string_value) for all non-None leaf values in the record."""
data = record.model_dump(exclude_none=True)
yield from _walk_dict(data, "")
def _walk_dict(d: dict, prefix: str) -> Iterator[tuple[str, str]]:
for key, val in d.items():
# Skip whole sections that produce unreliable or irrelevant matches
top_key = prefix.split(".")[0].split("[")[0] if prefix else key
if key in _SKIP_SECTION_NAMES or top_key in _SKIP_SECTION_NAMES:
continue
path = f"{prefix}.{key}" if prefix else key
if isinstance(val, dict):
yield from _walk_dict(val, path)
elif isinstance(val, list):
yield from _walk_list(val, path)
elif val is not None:
yield path, str(val)
def _walk_list(lst: list, prefix: str) -> Iterator[tuple[str, str]]:
for i, item in enumerate(lst):
path = f"{prefix}[{i}]"
if isinstance(item, dict):
yield from _walk_dict(item, path)
elif item is not None:
yield path, str(item)
def _count_total_fields(record: Any) -> int:
data = record.model_dump(exclude_none=True)
return sum(1 for _ in _walk_dict(data, ""))
# ISO 8601 date/datetime patterns β UK DD/MM/YYYY
_ISO_DATE_RE = re.compile(r'^(\d{4})-(\d{2})-(\d{2})')
def _iso_to_uk_date(value: str) -> str | None:
"""Convert ISO date/datetime string to UK DD/MM/YYYY for document matching.
Returns the UK-format string (e.g. "15/04/2026") if value looks like an
ISO date, otherwise returns None.
"""
m = _ISO_DATE_RE.match(value.strip())
if m:
yyyy, mm, dd = m.group(1), m.group(2), m.group(3)
return f"{dd}/{mm}/{yyyy}"
return None
def _padded_bbox(bbox: list[float]) -> list[float]:
"""Expand a tight Docling text bbox so highlights are clearly visible in the UI."""
x0, y0, x1, y1 = bbox
x0 = _clamp(x0 - _BBOX_PAD_X)
y0 = _clamp(y0 - _BBOX_PAD_Y)
x1 = _clamp(x1 + _BBOX_PAD_X)
y1 = _clamp(y1 + _BBOX_PAD_Y)
# Enforce minimum height so single-line text is always visible
if (y1 - y0) < _BBOX_MIN_H:
mid = (y0 + y1) / 2
y0 = _clamp(mid - _BBOX_MIN_H / 2)
y1 = _clamp(mid + _BBOX_MIN_H / 2)
return [round(x0, 3), round(y0, 3), round(x1, 3), round(y1, 3)]
|