Spaces:
Running
Running
| """The reusable core pipeline: ``process_document(path) -> ExtractionResult``. | |
| This is the one piece of logic both entry points share (architecture section 1). | |
| It chains the six stages of the data flow (architecture section 7) -- detect | |
| modality, acquire a payload, call the backend, validate, score, route -- and | |
| returns a result object. It performs **no** side effects: no file moves, no | |
| database writes (CLAUDE.md architectural rule 1). The watcher persists and moves | |
| files; the web demo renders and discards; the core only computes. | |
| Two seams keep the core decoupled from its environment: | |
| - **The backend** (``ExtractionBackend``) is injected or built from config via | |
| the factory. The core never imports a provider SDK (rule 2). | |
| - **Acquisition** (``Acquire``) -- turning a path into a ``DocumentPayload`` with | |
| Docling/OCR -- is a callable the core depends on but does not implement here. | |
| The real parser is build-plan phases 2.2/2.3 (a supervised TOMORROW task); for | |
| now the default raises a clear error and callers inject an acquirer (the smoke | |
| test a fixed payload, the web demo its uploaded bytes). | |
| Robustness follows the precision posture and rule 6 ("the loop never dies on one | |
| document"): a failure anywhere in the per-document pipeline is caught, logged | |
| with full context, and turned into a ``review`` result rather than propagating. | |
| A missing field is cheap (review catches it); a crash that halts the batch, or a | |
| confidently-wrong number written on auto-accept, are the costly outcomes this | |
| guards against. Misconfiguration (an unbuildable backend) still fails fast, | |
| before the per-document loop -- that is a startup error, not a document error. | |
| See ``docs/02_architecture.md`` sections 3 and 7. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from collections.abc import Callable | |
| from dataclasses import dataclass | |
| from datetime import date | |
| from pathlib import Path | |
| from doc_agent.backends.base import ( | |
| DocumentPayload, | |
| ExtractionBackend, | |
| create_backend, | |
| ) | |
| from doc_agent.config import Settings, load_config | |
| from doc_agent.parsing.detect import Modality, detect_modality | |
| from doc_agent.routing.score import route, score | |
| from doc_agent.schema.models import Decision, Document | |
| from doc_agent.validation.rules import ValidationReport, validate | |
| logger = logging.getLogger(__name__) | |
| # The acquisition seam: map a document path + its detected modality to the | |
| # payload a backend consumes. The real implementation (Docling for native PDFs, | |
| # OCR for images) is build-plan phases 2.2/2.3 -- a supervised TOMORROW task -- | |
| # so the core depends on this interface but does not implement it here, keeping | |
| # acquisition swappable and the core testable offline. | |
| Acquire = Callable[[Path, Modality], DocumentPayload] | |
| class ExtractionResult: | |
| """The outcome of running the core pipeline over one document. | |
| A plain value object: it carries everything an entry point needs to persist, | |
| render, or queue the document, and nothing it needs to recompute. The | |
| populated ``document`` (with ``field_confidence``, ``validation``, and | |
| ``decision`` filled in) is the record to store; the remaining fields surface | |
| the routing rationale for logging and the web demo's explanation panel. | |
| Attributes: | |
| document: The validated ``Document`` with its pipeline fields populated. | |
| On an error this is an empty document (so downstream code can rely on | |
| the shape); inspect ``error`` to distinguish. | |
| report: The structured validation report (hard/soft rule outcomes). | |
| confidence: The document-level confidence score in ``[0, 1]``. | |
| decision: The routing decision ("accept" | "review"); mirrors | |
| ``document.decision``. | |
| modality: The detected parse path, or ``None`` if detection itself failed. | |
| backend_name: The backend that produced (or would have produced) the data. | |
| source_path: The original document path, for logging/diagnostics. | |
| model_signal: The aggregated backend confidence fed to scoring, or | |
| ``None`` when the backend exposed none (scoring treated it as neutral). | |
| error: ``None`` on success; a short message when the document was routed | |
| to review because a stage raised (rule 6). | |
| """ | |
| document: Document | |
| report: ValidationReport | |
| confidence: float | |
| decision: Decision | |
| modality: Modality | None | |
| backend_name: str | |
| source_path: Path | None = None | |
| model_signal: float | None = None | |
| error: str | None = None | |
| def accepted(self) -> bool: | |
| """Whether the document was auto-accepted (decision == "accept").""" | |
| return self.decision == "accept" | |
| # MIME types for image modality, keyed by lower-case file extension. | |
| _MIME_BY_SUFFIX: dict[str, str] = { | |
| ".jpg": "image/jpeg", | |
| ".jpeg": "image/jpeg", | |
| ".png": "image/png", | |
| ".gif": "image/gif", | |
| ".webp": "image/webp", | |
| ".tif": "image/tiff", | |
| ".tiff": "image/tiff", | |
| ".bmp": "image/bmp", | |
| } | |
| def _load_image_payload(path: Path) -> DocumentPayload: | |
| """Load an image file's raw bytes for vision-direct extraction. | |
| Args: | |
| path: Path to the image file to read. | |
| Returns: | |
| A ``DocumentPayload`` with ``image_bytes`` and ``image_mime`` set. | |
| """ | |
| mime = _MIME_BY_SUFFIX.get(path.suffix.lower(), "image/jpeg") | |
| return DocumentPayload( | |
| modality="image", | |
| source_path=path, | |
| image_bytes=path.read_bytes(), | |
| image_mime=mime, | |
| ) | |
| def _make_acquire(settings: Settings) -> Acquire: | |
| """Create the default acquire callable wired to current settings. | |
| Handles ``image`` + ``vision_direct`` by reading raw bytes (T2). All other | |
| paths (``native_pdf``, ``ocr_then_text``) raise until T3/T4/T5 wire them in. | |
| Args: | |
| settings: Validated runtime configuration (``image_strategy`` is read). | |
| Returns: | |
| An ``Acquire`` callable that maps (path, modality) to a payload. | |
| """ | |
| def _acquire(path: Path, modality: Modality) -> DocumentPayload: | |
| if modality == "image" and settings.image_strategy == "vision_direct": | |
| return _load_image_payload(path) | |
| if modality == "native_pdf": | |
| from doc_agent.parsing.docling_parser import parse_pdf | |
| text = parse_pdf(path) | |
| return DocumentPayload( | |
| modality="native_pdf", | |
| source_path=path, | |
| text=text, | |
| metadata={"parser": "docling"}, | |
| ) | |
| raise NotImplementedError( | |
| f"Acquisition for modality={modality!r} with " | |
| f"IMAGE_STRATEGY={settings.image_strategy!r} is not yet wired. " | |
| "The OCR path (image+ocr_then_text) needs T4. " | |
| "Inject an acquire callable or use IMAGE_STRATEGY=vision_direct with an image." | |
| ) | |
| return _acquire | |
| def _aggregate_model_signal(field_confidence: dict[str, float] | None) -> float | None: | |
| """Reduce a backend's per-field confidence to one document-level signal. | |
| Scoring consumes a single scalar (``routing.score.score``), so the per-field | |
| map a backend exposes is averaged into one number. The mean is a documented, | |
| tunable default; the operative precision lever is the arithmetic cross-check | |
| in validation, not this signal, and the auto-accept threshold is set | |
| empirically by the evaluation harness. An absent or empty map returns | |
| ``None`` so the scorer falls back to its neutral prior rather than inventing | |
| confidence. | |
| Args: | |
| field_confidence: The backend's per-field confidence, or ``None``. | |
| Returns: | |
| The mean confidence in ``[0, 1]``, or ``None`` when no signal was given. | |
| """ | |
| if not field_confidence: | |
| return None | |
| values = list(field_confidence.values()) | |
| return sum(values) / len(values) | |
| def process_document( | |
| path: str | Path, | |
| *, | |
| settings: Settings | None = None, | |
| backend: ExtractionBackend | None = None, | |
| acquire: Acquire | None = None, | |
| today: date | None = None, | |
| ) -> ExtractionResult: | |
| """Run the full extraction pipeline over one document. | |
| Chains detect -> acquire -> ``backend.extract`` -> validate -> score -> | |
| route and returns an :class:`ExtractionResult`. The function has no side | |
| effects (no file moves, no persistence); callers own those. Any failure in | |
| the per-document stages is caught and turned into a ``review`` result with | |
| the error recorded, so one bad document never crashes a batch and the web | |
| demo never shows a stack trace (rule 6, precision posture). | |
| Backend selection follows the factory: an injected ``backend`` is used as-is | |
| (tests, an entry point forcing one); otherwise it is built from ``settings``. | |
| A misconfigured/unbuildable backend raises *before* the per-document try | |
| block -- that is a startup error and should fail fast, not silently route a | |
| document to review. | |
| Args: | |
| path: The document to process. Only the path is needed; the file is | |
| opened (if at all) by the injected acquirer, not here. | |
| settings: Validated configuration; loaded from the environment when | |
| ``None``. Supplies ``confidence_threshold`` and, when ``backend`` is | |
| not injected, the backend selection. | |
| backend: An explicit backend to use; built from ``settings`` via the | |
| factory when ``None``. | |
| acquire: The acquisition callable mapping ``(path, modality)`` to a | |
| payload; defaults to a placeholder that raises until the real parser | |
| is wired in (phases 2.2/2.3). | |
| today: Reference date for the validation future-date check; defaults to | |
| ``date.today()``. Injected in tests for determinism. | |
| Returns: | |
| An :class:`ExtractionResult` with a populated document and a decision. | |
| Never raises for a document-level problem -- those route to review. | |
| Raises: | |
| ConfigError: If configuration is invalid or the configured backend | |
| cannot be built (a startup error, surfaced before processing). | |
| """ | |
| settings = settings or load_config() | |
| backend = backend or create_backend(settings) | |
| acquire = acquire or _make_acquire(settings) | |
| reference_date = today if today is not None else date.today() | |
| source_path = Path(path) | |
| backend_name = backend.name | |
| modality: Modality | None = None | |
| try: | |
| modality = detect_modality(source_path) | |
| payload = acquire(source_path, modality) | |
| backend_result = backend.extract(payload, Document) | |
| # Structured output is enforced, not regex-parsed (rule 4): the backend's | |
| # data dict is validated into a Document, which raises on malformed input | |
| # and routes the document to review via the except below. | |
| document = Document.model_validate(backend_result.data) | |
| report = validate(document, today=reference_date) | |
| model_signal = _aggregate_model_signal(backend_result.field_confidence) | |
| confidence = score(document, report, model_signal) | |
| decision = route(confidence, report, threshold=settings.confidence_threshold) | |
| populated = document.model_copy( | |
| update={ | |
| "field_confidence": dict(backend_result.field_confidence or {}), | |
| "validation": report.to_dict(), | |
| "decision": decision, | |
| } | |
| ) | |
| logger.info( | |
| "processed document path=%s modality=%s backend=%s decision=%s " | |
| "confidence=%.3f model_signal=%s hard_failures=%s soft_failures=%s", | |
| source_path, | |
| modality, | |
| backend_name, | |
| decision, | |
| confidence, | |
| f"{model_signal:.3f}" if model_signal is not None else "none", | |
| [r.code for r in report.hard_failures], | |
| [r.code for r in report.soft_failures], | |
| ) | |
| return ExtractionResult( | |
| document=populated, | |
| report=report, | |
| confidence=confidence, | |
| decision=decision, | |
| modality=modality, | |
| backend_name=backend_name, | |
| source_path=source_path, | |
| model_signal=model_signal, | |
| ) | |
| except Exception as exc: # noqa: BLE001 -- rule 6: never let one document halt the loop. | |
| return _review_on_error(source_path, modality, backend_name, exc, reference_date) | |
| def _review_on_error( | |
| source_path: Path, | |
| modality: Modality | None, | |
| backend_name: str, | |
| exc: Exception, | |
| today: date, | |
| ) -> ExtractionResult: | |
| """Build a ``review`` result for a document whose pipeline stage raised. | |
| Implements rule 6: log the failure with full context (so the bug is visible) | |
| and route to review with the reason recorded, rather than propagating. The | |
| document is an empty one validated through the same rules (so its shape and | |
| ``validation`` report are consistent with the success path); an absent | |
| ``total`` hard-fails ``H4``, which already forces review. | |
| Args: | |
| source_path: The document that failed. | |
| modality: The detected modality, or ``None`` if detection failed first. | |
| backend_name: The backend in use, for the log record. | |
| exc: The exception raised by a pipeline stage. | |
| today: Reference date passed through to validation. | |
| Returns: | |
| An :class:`ExtractionResult` with ``decision="review"`` and ``error`` set. | |
| """ | |
| logger.error( | |
| "document processing failed; routing to review path=%s modality=%s backend=%s error=%s", | |
| source_path, | |
| modality, | |
| backend_name, | |
| exc, | |
| exc_info=True, | |
| ) | |
| document = Document() | |
| report = validate(document, today=today) | |
| populated = document.model_copy( | |
| update={ | |
| "validation": {**report.to_dict(), "error": str(exc)}, | |
| "decision": "review", | |
| } | |
| ) | |
| return ExtractionResult( | |
| document=populated, | |
| report=report, | |
| confidence=0.0, | |
| decision="review", | |
| modality=modality, | |
| backend_name=backend_name, | |
| source_path=source_path, | |
| model_signal=None, | |
| error=str(exc), | |
| ) | |