Spaces:
Running
Running
| """ | |
| pipeline.py — Shared PDF-routing and arbitration logic. | |
| Both the CLI (main.py / DocumentPipeline) and the API (api.py / process_documents) | |
| run the same extraction loop: route PDFs to Schedule/Certificate slots, call | |
| the PolicyArbiter, and return the merged record plus any detected conflicts. | |
| Extracting this logic here eliminates the duplication that previously existed | |
| between those two entry-points and makes the behaviour easy to test in isolation. | |
| Usage | |
| ----- | |
| from pipeline import run_extraction_pipeline | |
| golden, conflicts, corpora = run_extraction_pipeline( | |
| pdf_paths=pdf_paths, | |
| agent=agent, | |
| with_provenance=True, | |
| ) | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from pathlib import Path | |
| from typing import Any | |
| from agents import ExtractionFailedError, InsuranceExtractionAgents | |
| from arbiter import PolicyArbiter | |
| from schema import ConflictEntry, DocumentType, UKMotorGoldenRecord | |
| logger = logging.getLogger(__name__) | |
| def run_extraction_pipeline( | |
| pdf_paths: list[Path], | |
| agent: InsuranceExtractionAgents, | |
| *, | |
| with_provenance: bool = False, | |
| ) -> tuple[UKMotorGoldenRecord, list[ConflictEntry], list[Any]]: | |
| """ | |
| Route PDFs to Schedule/Certificate slots, arbitrate, and return the results. | |
| Parameters | |
| ---------- | |
| pdf_paths : list[Path] | |
| Paths to the PDF documents to process. | |
| agent : InsuranceExtractionAgents | |
| Configured extraction agent (carries masker, debug_dir, prompts, etc.). | |
| with_provenance : bool | |
| When True, builds and returns ProvenanceCorpus objects for each PDF. | |
| Set to True when running via the API (Visual Audit UI needs geometry data). | |
| Set to False for the CLI path (faster, no corpus overhead). | |
| Returns | |
| ------- | |
| tuple[UKMotorGoldenRecord, list[ConflictEntry], list[ProvenanceCorpus]] | |
| * golden_record — the merged authoritative policy record | |
| * conflicts — fields where Schedule and Certificate disagreed | |
| * corpora — ProvenanceCorpus objects (empty list when with_provenance=False) | |
| Raises | |
| ------ | |
| RuntimeError | |
| When neither a Schedule nor a Certificate could be extracted from any PDF. | |
| """ | |
| schedule_record: UKMotorGoldenRecord | None = None | |
| schedule_filename = "unknown_schedule.pdf" | |
| certificate_record: UKMotorGoldenRecord | None = None | |
| certificate_filename = "unknown_certificate.pdf" | |
| corpora: list[Any] = [] | |
| failed: list[str] = [] | |
| for pdf_path in pdf_paths: | |
| try: | |
| if with_provenance: | |
| record, doc_type_str, corpus = agent.process_with_provenance(pdf_path) | |
| if corpus is not None and corpus.items: | |
| corpora.append(corpus) | |
| else: | |
| record, doc_type_str = agent.process(pdf_path) | |
| logger.info(" ✓ %s → %s", pdf_path.name, doc_type_str) | |
| if doc_type_str == DocumentType.SCHEDULE.value and schedule_record is None: | |
| schedule_record = record | |
| schedule_filename = pdf_path.name | |
| elif doc_type_str == DocumentType.CERTIFICATE.value and certificate_record is None: | |
| certificate_record = record | |
| certificate_filename = pdf_path.name | |
| else: | |
| logger.info(" ~ %s (%s) — not used in merge", pdf_path.name, doc_type_str) | |
| except ExtractionFailedError as exc: | |
| logger.error(" ✗ Extraction failed for %s: %s", pdf_path.name, exc) | |
| failed.append(pdf_path.name) | |
| except Exception as exc: # noqa: BLE001 | |
| logger.error(" ✗ %s failed: %s", pdf_path.name, exc) | |
| failed.append(pdf_path.name) | |
| if failed: | |
| logger.warning("Skipped %d document(s): %s", len(failed), failed) | |
| if schedule_record is None and certificate_record is None: | |
| raise RuntimeError( | |
| "No Schedule or Certificate extracted. " | |
| "Check GROQ_API_KEY and that the PDFs are readable." | |
| ) | |
| if schedule_record is None: | |
| logger.warning("No Schedule found — using empty record as fallback") | |
| if certificate_record is None: | |
| logger.warning("No Certificate found — using empty record as fallback") | |
| schedule_record = schedule_record or UKMotorGoldenRecord() | |
| certificate_record = certificate_record or UKMotorGoldenRecord() | |
| logger.info("Merging Schedule + Certificate via PolicyArbiter…") | |
| arbiter = PolicyArbiter() | |
| golden, conflicts = arbiter.merge_records( | |
| schedule_record, schedule_filename, | |
| certificate_record, certificate_filename, | |
| ) | |
| if conflicts: | |
| logger.info( | |
| "Arbiter detected %d conflict(s): %s", | |
| len(conflicts), | |
| [c.field for c in conflicts], | |
| ) | |
| return golden, conflicts, corpora | |