AI-PolicyTrace / src /pipeline.py
teja141290's picture
Deploy PolicyTrace Hugging Face Space
be54038
"""
pipeline.py — Shared PDF-routing and arbitration logic.
Both the CLI (main.py / DocumentPipeline) and the API (api.py / process_documents)
run the same extraction loop: route PDFs to Schedule/Certificate slots, call
the PolicyArbiter, and return the merged record plus any detected conflicts.
Extracting this logic here eliminates the duplication that previously existed
between those two entry-points and makes the behaviour easy to test in isolation.
Usage
-----
from pipeline import run_extraction_pipeline
golden, conflicts, corpora = run_extraction_pipeline(
pdf_paths=pdf_paths,
agent=agent,
with_provenance=True,
)
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any
from agents import ExtractionFailedError, InsuranceExtractionAgents
from arbiter import PolicyArbiter
from schema import ConflictEntry, DocumentType, UKMotorGoldenRecord
logger = logging.getLogger(__name__)
def run_extraction_pipeline(
pdf_paths: list[Path],
agent: InsuranceExtractionAgents,
*,
with_provenance: bool = False,
) -> tuple[UKMotorGoldenRecord, list[ConflictEntry], list[Any]]:
"""
Route PDFs to Schedule/Certificate slots, arbitrate, and return the results.
Parameters
----------
pdf_paths : list[Path]
Paths to the PDF documents to process.
agent : InsuranceExtractionAgents
Configured extraction agent (carries masker, debug_dir, prompts, etc.).
with_provenance : bool
When True, builds and returns ProvenanceCorpus objects for each PDF.
Set to True when running via the API (Visual Audit UI needs geometry data).
Set to False for the CLI path (faster, no corpus overhead).
Returns
-------
tuple[UKMotorGoldenRecord, list[ConflictEntry], list[ProvenanceCorpus]]
* golden_record — the merged authoritative policy record
* conflicts — fields where Schedule and Certificate disagreed
* corpora — ProvenanceCorpus objects (empty list when with_provenance=False)
Raises
------
RuntimeError
When neither a Schedule nor a Certificate could be extracted from any PDF.
"""
schedule_record: UKMotorGoldenRecord | None = None
schedule_filename = "unknown_schedule.pdf"
certificate_record: UKMotorGoldenRecord | None = None
certificate_filename = "unknown_certificate.pdf"
corpora: list[Any] = []
failed: list[str] = []
for pdf_path in pdf_paths:
try:
if with_provenance:
record, doc_type_str, corpus = agent.process_with_provenance(pdf_path)
if corpus is not None and corpus.items:
corpora.append(corpus)
else:
record, doc_type_str = agent.process(pdf_path)
logger.info(" ✓ %s → %s", pdf_path.name, doc_type_str)
if doc_type_str == DocumentType.SCHEDULE.value and schedule_record is None:
schedule_record = record
schedule_filename = pdf_path.name
elif doc_type_str == DocumentType.CERTIFICATE.value and certificate_record is None:
certificate_record = record
certificate_filename = pdf_path.name
else:
logger.info(" ~ %s (%s) — not used in merge", pdf_path.name, doc_type_str)
except ExtractionFailedError as exc:
logger.error(" ✗ Extraction failed for %s: %s", pdf_path.name, exc)
failed.append(pdf_path.name)
except Exception as exc: # noqa: BLE001
logger.error(" ✗ %s failed: %s", pdf_path.name, exc)
failed.append(pdf_path.name)
if failed:
logger.warning("Skipped %d document(s): %s", len(failed), failed)
if schedule_record is None and certificate_record is None:
raise RuntimeError(
"No Schedule or Certificate extracted. "
"Check GROQ_API_KEY and that the PDFs are readable."
)
if schedule_record is None:
logger.warning("No Schedule found — using empty record as fallback")
if certificate_record is None:
logger.warning("No Certificate found — using empty record as fallback")
schedule_record = schedule_record or UKMotorGoldenRecord()
certificate_record = certificate_record or UKMotorGoldenRecord()
logger.info("Merging Schedule + Certificate via PolicyArbiter…")
arbiter = PolicyArbiter()
golden, conflicts = arbiter.merge_records(
schedule_record, schedule_filename,
certificate_record, certificate_filename,
)
if conflicts:
logger.info(
"Arbiter detected %d conflict(s): %s",
len(conflicts),
[c.field for c in conflicts],
)
return golden, conflicts, corpora