AI-PolicyTrace / src /provenance.py
teja141290's picture
Deploy PolicyTrace Hugging Face Space
be54038
"""
provenance.py — Post-extraction provenance mapping for the Visual Audit UI.
After the LLM extracts a flat Golden Record, this module walks the record and
fuzzy-matches each extracted value against a ProvenanceCorpus built from the
Docling document IR. The LLM is never asked to self-report geometry — that
would cause hallucinations; this module handles localisation as a pure
post-processing step.
Coordinate convention
─────────────────────
Docling bbox : PDF space — origin bottom-left, y increases upward, unit = pt
Stored bbox : Browser % — origin top-left, y increases downward, range 0–100
Conversion (per axis):
x0% = bbox.l / page_width * 100
y0% = (page_height - bbox.t) / page_height * 100 # top of element
x1% = bbox.r / page_width * 100
y1% = (page_height - bbox.b) / page_height * 100 # bottom of element
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from typing import Any, Iterator
logger = logging.getLogger(__name__)
# ── Matching parameters ──────────────────────────────────────────────────────
_MATCH_THRESHOLD = 78 # minimum rapidfuzz WRatio (0–100) for normalised-value fallback
_CITATION_THRESHOLD = 88 # minimum partial_ratio for LLM-supplied verbatim citation quotes
_MIN_VALUE_LEN = 4 # skip matching for values shorter than this (too ambiguous)
# Leaf field names whose values are boolean-like and would match too broadly
_SKIP_LEAF_NAMES = {
"is_main_driver", "protected", "has_security_device",
"tracker_fitted", "driving_other_cars",
}
# Top-level section names to skip entirely.
# `source_document` and `field_citations` are internal provenance fields —
# they don't contain verbatim PDF values so matching against them is meaningless.
_SKIP_SECTION_NAMES = {"source_document", "field_citations"}
# Document types whose corpora are unreliable for field-level matching.
# Policy Booklets contain generic boilerplate — matching against them produces
# false positives for almost every field ("Full", "UK", date digits, etc.).
_EXCLUDE_FROM_MATCHING: set[str] = {"PolicyBooklet", "Unknown"}
# Padding added to each bbox for display. The Docling bbox is a tight text
# box (~1% page height per line) which is hard to see. We expand it so the
# highlight is clearly visible without losing positional accuracy.
_BBOX_PAD_X = 0.4 # % to expand left/right
_BBOX_PAD_Y = 0.6 # % to expand top/bottom
_BBOX_MIN_H = 2.0 # % minimum height after padding
# ---------------------------------------------------------------------------
# Corpus data structures
# ---------------------------------------------------------------------------
@dataclass
class CorpusItem:
"""One text element from a Docling DoclingDocument, with browser % geometry."""
text: str
page: int
bbox: list[float] # [x0%, y0%, x1%, y1%] — top-left origin, 0–100
source_filename: str
class ProvenanceCorpus:
"""All extractable text elements from one PDF, with their page geometry."""
def __init__(self, source_filename: str = "", doc_type: str = "Unknown") -> None:
self.source_filename = source_filename
self.doc_type = doc_type # e.g. "Schedule", "Certificate", "PolicyBooklet"
self.items: list[CorpusItem] = []
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def add_from_docling(self, doc: Any, filename: str) -> None:
"""
Populate the corpus from a Docling DoclingDocument.
Safely handles API variations across docling versions — logs a warning
rather than propagating exceptions, so the calling pipeline stays alive
even if provenance extraction fails.
"""
self.source_filename = filename
try:
self._extract_items(doc, filename)
logger.debug(
"Corpus '%s': %d items, %d pages",
filename, len(self.items), self._count_pages(doc),
)
except Exception as exc: # noqa: BLE001
logger.warning(
"Provenance extraction skipped for '%s': %s", filename, exc
)
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _extract_items(self, doc: Any, filename: str) -> None:
page_sizes = _build_page_sizes(doc)
if not page_sizes:
logger.debug("No page size data for '%s' — provenance skipped", filename)
return
for item in _iter_items(doc):
text = _item_text(item)
if not text or len(text) < 2:
continue
for prov in getattr(item, "prov", []):
self._add_prov_item(prov, text, filename, page_sizes)
def _add_prov_item(
self,
prov: Any,
text: str,
filename: str,
page_sizes: dict[int, tuple[float, float]],
) -> None:
page_no = getattr(prov, "page_no", None)
if page_no is None:
return
page_no = int(page_no)
if page_no not in page_sizes:
return
pw, ph = page_sizes[page_no]
bbox = getattr(prov, "bbox", None)
if bbox is None:
return
l = float(getattr(bbox, "l", 0))
t_v = float(getattr(bbox, "t", ph)) # top in PDF space (high y value)
r = float(getattr(bbox, "r", pw))
b = float(getattr(bbox, "b", 0)) # bottom in PDF space (low y value)
# Convert: PDF (bottom-left origin, pts) → browser % (top-left origin)
x0 = _clamp(l / pw * 100)
y0 = _clamp((ph - t_v) / ph * 100) # top of element in browser coords
x1 = _clamp(r / pw * 100)
y1 = _clamp((ph - b) / ph * 100) # bottom of element in browser coords
self.items.append(CorpusItem(
text=text,
page=page_no,
bbox=[round(x0, 3), round(y0, 3), round(x1, 3), round(y1, 3)],
source_filename=filename,
))
@staticmethod
def _count_pages(doc: Any) -> int:
return len(getattr(doc, "pages", {}))
# ---------------------------------------------------------------------------
# Module-level helpers for corpus building
# ---------------------------------------------------------------------------
def _build_page_sizes(doc: Any) -> dict[int, tuple[float, float]]:
sizes: dict[int, tuple[float, float]] = {}
for page_no, page_item in getattr(doc, "pages", {}).items():
size = getattr(page_item, "size", None)
if size:
w = float(getattr(size, "width", 0))
h = float(getattr(size, "height", 0))
if w > 0 and h > 0:
sizes[int(page_no)] = (w, h)
return sizes
def _iter_items(doc: Any):
"""Yield all document items, trying iterate_items() first then .texts/.tables."""
try:
for item, _level in doc.iterate_items():
yield item
except AttributeError:
for item in getattr(doc, "texts", []):
yield item
for item in getattr(doc, "tables", []):
yield item
def _item_text(item: Any) -> str:
"""Extract a string from a Docling TextItem or TableItem."""
text = getattr(item, "text", None)
if text is not None:
return str(text).strip()
# TableItem: concatenate all cell text into one searchable blob
data = getattr(item, "data", None)
if data is not None:
cells = [
str(getattr(cell, "text", "")).strip()
for row in getattr(data, "grid", [])
for cell in row
]
return " | ".join(c for c in cells if c)
return ""
def _clamp(v: float) -> float:
return max(0.0, min(100.0, v))
# ---------------------------------------------------------------------------
# Field-level provenance builder (main public function)
# ---------------------------------------------------------------------------
def build_provenance(
record: Any, # UKMotorGoldenRecord
corpora: list[ProvenanceCorpus],
) -> list[Any]: # list[FieldProvenance]
"""
Walk the Golden Record and fuzzy-match each extracted value against all
trusted corpora (Schedule, Certificate, StatementOfFact).
Policy Booklet corpora are excluded — they contain generic boilerplate
that produces false positives for almost every field value.
Returns a ``FieldProvenance`` entry for every field that can be located
above the match threshold. Fields with no good corpus match are omitted —
the UI shows them as "No location data".
"""
from schema import FieldProvenance, Location # local import avoids circular dep
try:
from rapidfuzz import fuzz as rfuzz
except ImportError:
logger.warning(
"rapidfuzz not installed — provenance matching disabled. "
"Run: pip install rapidfuzz"
)
return []
# Filter to trusted corpora only (exclude Policy Booklet and Unknown docs)
trusted_corpora = [
c for c in corpora if c.doc_type not in _EXCLUDE_FROM_MATCHING
]
if not trusted_corpora:
logger.warning(
"No trusted corpora available — all %d corpus/corpora are excluded "
"(types: %s). Provenance will be empty.",
len(corpora),
[c.doc_type for c in corpora],
)
return []
# LLM-supplied verbatim source quotes: field_path → raw text phrase.
# These are always preferred over the normalised extracted value because
# the LLM copies them directly from the document (e.g. "15/04/2026 at 00:00
# hours" rather than the ISO "2026-04-15T00:00:00" we store in the record).
citation_map: dict[str, str] = dict(getattr(record, "field_citations", None) or {})
logger.info(" field_citations from LLM: %d entries", len(citation_map))
results: list[FieldProvenance] = []
citation_hits = 0
# Track assigned positions to avoid two fields pointing to the same corpus item.
# Key: (source_filename, page, x0, y0) — unpadded, original corpus position.
used_positions: set[tuple] = set()
for field_path, value_str in _walk_record(record):
leaf = field_path.split(".")[-1].strip("[]0123456789")
if leaf in _SKIP_LEAF_NAMES:
continue
# Prefer the verbatim citation quote; fall back to the normalised value.
# For ISO dates/datetimes also try UK DD/MM/YYYY format as a secondary fallback.
search_str = citation_map.get(field_path, value_str)
alt_search: str | None = None
if field_path not in citation_map:
alt_search = _iso_to_uk_date(value_str)
if len(search_str) < _MIN_VALUE_LEN:
continue
using_citation = field_path in citation_map
# When matching a citation quote use partial_ratio — the quote is a
# verbatim substring of the document and WRatio penalises length disparity.
# For normalised fallback values use WRatio to avoid short false matches.
score_fn = rfuzz.partial_ratio if using_citation else rfuzz.WRatio
threshold = _CITATION_THRESHOLD if using_citation else _MATCH_THRESHOLD
# Find best match, preferring positions not yet assigned to another field.
best_score = 0
best_item: CorpusItem | None = None
best_unused_score = 0
best_unused_item: CorpusItem | None = None
for corpus in trusted_corpora:
for item in corpus.items:
score = score_fn(search_str.lower(), item.text.lower())
# Also try UK-formatted date if available
if alt_search and score < threshold:
alt_score = rfuzz.partial_ratio(alt_search, item.text.lower())
if alt_score > score:
score = alt_score
pos_key = (item.source_filename, item.page, item.bbox[0], item.bbox[1])
if score > best_score:
best_score = score
best_item = item
if score > best_unused_score and pos_key not in used_positions:
best_unused_score = score
best_unused_item = item
# Prefer an unused position if it scores above threshold,
# otherwise fall back to best overall (may share a location).
if best_unused_item is not None and best_unused_score >= threshold:
chosen_item = best_unused_item
chosen_score = best_unused_score
elif best_item is not None and best_score >= threshold:
chosen_item = best_item
chosen_score = best_score
else:
continue
pos_key = (chosen_item.source_filename, chosen_item.page, chosen_item.bbox[0], chosen_item.bbox[1])
used_positions.add(pos_key)
if using_citation:
citation_hits += 1
results.append(FieldProvenance(
field_path=field_path,
extracted_value=value_str,
matched_text=chosen_item.text[:200], # truncate very long table blobs
match_score=round(chosen_score / 100.0, 3),
source_filename=chosen_item.source_filename,
location=Location(
page=chosen_item.page,
bbox=_padded_bbox(chosen_item.bbox),
),
))
total = _count_total_fields(record)
logger.info(
"Provenance: %d / %d fields located (%d via citation quotes, %d via fuzzy fallback) "
"— trusted corpora: %s",
len(results), total,
citation_hits, len(results) - citation_hits,
[c.source_filename for c in trusted_corpora],
)
return results
# ---------------------------------------------------------------------------
# Field-walking helpers
# ---------------------------------------------------------------------------
def _walk_record(record: Any) -> Iterator[tuple[str, str]]:
"""Yield (field_path, string_value) for all non-None leaf values in the record."""
data = record.model_dump(exclude_none=True)
yield from _walk_dict(data, "")
def _walk_dict(d: dict, prefix: str) -> Iterator[tuple[str, str]]:
for key, val in d.items():
# Skip whole sections that produce unreliable or irrelevant matches
top_key = prefix.split(".")[0].split("[")[0] if prefix else key
if key in _SKIP_SECTION_NAMES or top_key in _SKIP_SECTION_NAMES:
continue
path = f"{prefix}.{key}" if prefix else key
if isinstance(val, dict):
yield from _walk_dict(val, path)
elif isinstance(val, list):
yield from _walk_list(val, path)
elif val is not None:
yield path, str(val)
def _walk_list(lst: list, prefix: str) -> Iterator[tuple[str, str]]:
for i, item in enumerate(lst):
path = f"{prefix}[{i}]"
if isinstance(item, dict):
yield from _walk_dict(item, path)
elif item is not None:
yield path, str(item)
def _count_total_fields(record: Any) -> int:
data = record.model_dump(exclude_none=True)
return sum(1 for _ in _walk_dict(data, ""))
# ISO 8601 date/datetime patterns → UK DD/MM/YYYY
_ISO_DATE_RE = re.compile(r'^(\d{4})-(\d{2})-(\d{2})')
def _iso_to_uk_date(value: str) -> str | None:
"""Convert ISO date/datetime string to UK DD/MM/YYYY for document matching.
Returns the UK-format string (e.g. "15/04/2026") if value looks like an
ISO date, otherwise returns None.
"""
m = _ISO_DATE_RE.match(value.strip())
if m:
yyyy, mm, dd = m.group(1), m.group(2), m.group(3)
return f"{dd}/{mm}/{yyyy}"
return None
def _padded_bbox(bbox: list[float]) -> list[float]:
"""Expand a tight Docling text bbox so highlights are clearly visible in the UI."""
x0, y0, x1, y1 = bbox
x0 = _clamp(x0 - _BBOX_PAD_X)
y0 = _clamp(y0 - _BBOX_PAD_Y)
x1 = _clamp(x1 + _BBOX_PAD_X)
y1 = _clamp(y1 + _BBOX_PAD_Y)
# Enforce minimum height so single-line text is always visible
if (y1 - y0) < _BBOX_MIN_H:
mid = (y0 + y1) / 2
y0 = _clamp(mid - _BBOX_MIN_H / 2)
y1 = _clamp(mid + _BBOX_MIN_H / 2)
return [round(x0, 3), round(y0, 3), round(x1, 3), round(y1, 3)]