kennethzychew's picture
feat(docling): Docling parser + native-PDF acquire path (T3)
e2eec89
Raw
History Blame Contribute Delete
14.2 kB
"""The reusable core pipeline: ``process_document(path) -> ExtractionResult``.
This is the one piece of logic both entry points share (architecture section 1).
It chains the six stages of the data flow (architecture section 7) -- detect
modality, acquire a payload, call the backend, validate, score, route -- and
returns a result object. It performs **no** side effects: no file moves, no
database writes (CLAUDE.md architectural rule 1). The watcher persists and moves
files; the web demo renders and discards; the core only computes.
Two seams keep the core decoupled from its environment:
- **The backend** (``ExtractionBackend``) is injected or built from config via
the factory. The core never imports a provider SDK (rule 2).
- **Acquisition** (``Acquire``) -- turning a path into a ``DocumentPayload`` with
Docling/OCR -- is a callable the core depends on but does not implement here.
The real parser is build-plan phases 2.2/2.3 (a supervised TOMORROW task); for
now the default raises a clear error and callers inject an acquirer (the smoke
test a fixed payload, the web demo its uploaded bytes).
Robustness follows the precision posture and rule 6 ("the loop never dies on one
document"): a failure anywhere in the per-document pipeline is caught, logged
with full context, and turned into a ``review`` result rather than propagating.
A missing field is cheap (review catches it); a crash that halts the batch, or a
confidently-wrong number written on auto-accept, are the costly outcomes this
guards against. Misconfiguration (an unbuildable backend) still fails fast,
before the per-document loop -- that is a startup error, not a document error.
See ``docs/02_architecture.md`` sections 3 and 7.
"""
from __future__ import annotations
import logging
from collections.abc import Callable
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from doc_agent.backends.base import (
DocumentPayload,
ExtractionBackend,
create_backend,
)
from doc_agent.config import Settings, load_config
from doc_agent.parsing.detect import Modality, detect_modality
from doc_agent.routing.score import route, score
from doc_agent.schema.models import Decision, Document
from doc_agent.validation.rules import ValidationReport, validate
logger = logging.getLogger(__name__)
# The acquisition seam: map a document path + its detected modality to the
# payload a backend consumes. The real implementation (Docling for native PDFs,
# OCR for images) is build-plan phases 2.2/2.3 -- a supervised TOMORROW task --
# so the core depends on this interface but does not implement it here, keeping
# acquisition swappable and the core testable offline.
Acquire = Callable[[Path, Modality], DocumentPayload]
@dataclass(frozen=True)
class ExtractionResult:
"""The outcome of running the core pipeline over one document.
A plain value object: it carries everything an entry point needs to persist,
render, or queue the document, and nothing it needs to recompute. The
populated ``document`` (with ``field_confidence``, ``validation``, and
``decision`` filled in) is the record to store; the remaining fields surface
the routing rationale for logging and the web demo's explanation panel.
Attributes:
document: The validated ``Document`` with its pipeline fields populated.
On an error this is an empty document (so downstream code can rely on
the shape); inspect ``error`` to distinguish.
report: The structured validation report (hard/soft rule outcomes).
confidence: The document-level confidence score in ``[0, 1]``.
decision: The routing decision ("accept" | "review"); mirrors
``document.decision``.
modality: The detected parse path, or ``None`` if detection itself failed.
backend_name: The backend that produced (or would have produced) the data.
source_path: The original document path, for logging/diagnostics.
model_signal: The aggregated backend confidence fed to scoring, or
``None`` when the backend exposed none (scoring treated it as neutral).
error: ``None`` on success; a short message when the document was routed
to review because a stage raised (rule 6).
"""
document: Document
report: ValidationReport
confidence: float
decision: Decision
modality: Modality | None
backend_name: str
source_path: Path | None = None
model_signal: float | None = None
error: str | None = None
@property
def accepted(self) -> bool:
"""Whether the document was auto-accepted (decision == "accept")."""
return self.decision == "accept"
# MIME types for image modality, keyed by lower-case file extension.
_MIME_BY_SUFFIX: dict[str, str] = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
".tif": "image/tiff",
".tiff": "image/tiff",
".bmp": "image/bmp",
}
def _load_image_payload(path: Path) -> DocumentPayload:
"""Load an image file's raw bytes for vision-direct extraction.
Args:
path: Path to the image file to read.
Returns:
A ``DocumentPayload`` with ``image_bytes`` and ``image_mime`` set.
"""
mime = _MIME_BY_SUFFIX.get(path.suffix.lower(), "image/jpeg")
return DocumentPayload(
modality="image",
source_path=path,
image_bytes=path.read_bytes(),
image_mime=mime,
)
def _make_acquire(settings: Settings) -> Acquire:
"""Create the default acquire callable wired to current settings.
Handles ``image`` + ``vision_direct`` by reading raw bytes (T2). All other
paths (``native_pdf``, ``ocr_then_text``) raise until T3/T4/T5 wire them in.
Args:
settings: Validated runtime configuration (``image_strategy`` is read).
Returns:
An ``Acquire`` callable that maps (path, modality) to a payload.
"""
def _acquire(path: Path, modality: Modality) -> DocumentPayload:
if modality == "image" and settings.image_strategy == "vision_direct":
return _load_image_payload(path)
if modality == "native_pdf":
from doc_agent.parsing.docling_parser import parse_pdf
text = parse_pdf(path)
return DocumentPayload(
modality="native_pdf",
source_path=path,
text=text,
metadata={"parser": "docling"},
)
raise NotImplementedError(
f"Acquisition for modality={modality!r} with "
f"IMAGE_STRATEGY={settings.image_strategy!r} is not yet wired. "
"The OCR path (image+ocr_then_text) needs T4. "
"Inject an acquire callable or use IMAGE_STRATEGY=vision_direct with an image."
)
return _acquire
def _aggregate_model_signal(field_confidence: dict[str, float] | None) -> float | None:
"""Reduce a backend's per-field confidence to one document-level signal.
Scoring consumes a single scalar (``routing.score.score``), so the per-field
map a backend exposes is averaged into one number. The mean is a documented,
tunable default; the operative precision lever is the arithmetic cross-check
in validation, not this signal, and the auto-accept threshold is set
empirically by the evaluation harness. An absent or empty map returns
``None`` so the scorer falls back to its neutral prior rather than inventing
confidence.
Args:
field_confidence: The backend's per-field confidence, or ``None``.
Returns:
The mean confidence in ``[0, 1]``, or ``None`` when no signal was given.
"""
if not field_confidence:
return None
values = list(field_confidence.values())
return sum(values) / len(values)
def process_document(
path: str | Path,
*,
settings: Settings | None = None,
backend: ExtractionBackend | None = None,
acquire: Acquire | None = None,
today: date | None = None,
) -> ExtractionResult:
"""Run the full extraction pipeline over one document.
Chains detect -> acquire -> ``backend.extract`` -> validate -> score ->
route and returns an :class:`ExtractionResult`. The function has no side
effects (no file moves, no persistence); callers own those. Any failure in
the per-document stages is caught and turned into a ``review`` result with
the error recorded, so one bad document never crashes a batch and the web
demo never shows a stack trace (rule 6, precision posture).
Backend selection follows the factory: an injected ``backend`` is used as-is
(tests, an entry point forcing one); otherwise it is built from ``settings``.
A misconfigured/unbuildable backend raises *before* the per-document try
block -- that is a startup error and should fail fast, not silently route a
document to review.
Args:
path: The document to process. Only the path is needed; the file is
opened (if at all) by the injected acquirer, not here.
settings: Validated configuration; loaded from the environment when
``None``. Supplies ``confidence_threshold`` and, when ``backend`` is
not injected, the backend selection.
backend: An explicit backend to use; built from ``settings`` via the
factory when ``None``.
acquire: The acquisition callable mapping ``(path, modality)`` to a
payload; defaults to a placeholder that raises until the real parser
is wired in (phases 2.2/2.3).
today: Reference date for the validation future-date check; defaults to
``date.today()``. Injected in tests for determinism.
Returns:
An :class:`ExtractionResult` with a populated document and a decision.
Never raises for a document-level problem -- those route to review.
Raises:
ConfigError: If configuration is invalid or the configured backend
cannot be built (a startup error, surfaced before processing).
"""
settings = settings or load_config()
backend = backend or create_backend(settings)
acquire = acquire or _make_acquire(settings)
reference_date = today if today is not None else date.today()
source_path = Path(path)
backend_name = backend.name
modality: Modality | None = None
try:
modality = detect_modality(source_path)
payload = acquire(source_path, modality)
backend_result = backend.extract(payload, Document)
# Structured output is enforced, not regex-parsed (rule 4): the backend's
# data dict is validated into a Document, which raises on malformed input
# and routes the document to review via the except below.
document = Document.model_validate(backend_result.data)
report = validate(document, today=reference_date)
model_signal = _aggregate_model_signal(backend_result.field_confidence)
confidence = score(document, report, model_signal)
decision = route(confidence, report, threshold=settings.confidence_threshold)
populated = document.model_copy(
update={
"field_confidence": dict(backend_result.field_confidence or {}),
"validation": report.to_dict(),
"decision": decision,
}
)
logger.info(
"processed document path=%s modality=%s backend=%s decision=%s "
"confidence=%.3f model_signal=%s hard_failures=%s soft_failures=%s",
source_path,
modality,
backend_name,
decision,
confidence,
f"{model_signal:.3f}" if model_signal is not None else "none",
[r.code for r in report.hard_failures],
[r.code for r in report.soft_failures],
)
return ExtractionResult(
document=populated,
report=report,
confidence=confidence,
decision=decision,
modality=modality,
backend_name=backend_name,
source_path=source_path,
model_signal=model_signal,
)
except Exception as exc: # noqa: BLE001 -- rule 6: never let one document halt the loop.
return _review_on_error(source_path, modality, backend_name, exc, reference_date)
def _review_on_error(
source_path: Path,
modality: Modality | None,
backend_name: str,
exc: Exception,
today: date,
) -> ExtractionResult:
"""Build a ``review`` result for a document whose pipeline stage raised.
Implements rule 6: log the failure with full context (so the bug is visible)
and route to review with the reason recorded, rather than propagating. The
document is an empty one validated through the same rules (so its shape and
``validation`` report are consistent with the success path); an absent
``total`` hard-fails ``H4``, which already forces review.
Args:
source_path: The document that failed.
modality: The detected modality, or ``None`` if detection failed first.
backend_name: The backend in use, for the log record.
exc: The exception raised by a pipeline stage.
today: Reference date passed through to validation.
Returns:
An :class:`ExtractionResult` with ``decision="review"`` and ``error`` set.
"""
logger.error(
"document processing failed; routing to review path=%s modality=%s backend=%s error=%s",
source_path,
modality,
backend_name,
exc,
exc_info=True,
)
document = Document()
report = validate(document, today=today)
populated = document.model_copy(
update={
"validation": {**report.to_dict(), "error": str(exc)},
"decision": "review",
}
)
return ExtractionResult(
document=populated,
report=report,
confidence=0.0,
decision="review",
modality=modality,
backend_name=backend_name,
source_path=source_path,
model_signal=None,
error=str(exc),
)