document-extract-agent / tests /test_watcher.py
kennethzychew's picture
fix(lint): clear 5 ruff F401/F841 errors found by verifier
d4cf766
Raw
History Blame Contribute Delete
8.82 kB
"""Unit tests for the folder watcher / batch runner (build-plan task 4.2).
All tests are fully offline: they use ``tmp_path`` for isolated directories,
the ``StubBackend`` for deterministic extraction, and inject an ``acquire``
callable that skips real parsing. No network, no Docling, no real models.
Acceptance criteria (T8):
- A mixed batch (PDF + image + corrupt file) all process without stopping.
- Valid files move to processed/ or review/ based on their decision.
- A corrupt file (unreadable) routes to review/ with a logged reason.
- A duplicate (same content hash) is persisted only once.
- Unsupported file types are skipped, not crashed on.
"""
from __future__ import annotations
from datetime import date
from pathlib import Path
from unittest.mock import patch
from doc_agent.backends.base import DocumentPayload
from doc_agent.backends.stub import DEFAULT_STUB_DOCUMENT, StubBackend
from doc_agent.config import load_config
from doc_agent.ingest.watcher import _process_one, process_inbox
from doc_agent.store.db import record_count
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _settings(tmp_path: Path):
"""Settings pointing all paths into a tmp_path tree."""
return load_config(
extraction_backend="ollama",
image_strategy="ocr_then_text",
inbox_dir=str(tmp_path / "inbox"),
processed_dir=str(tmp_path / "processed"),
review_dir=str(tmp_path / "review"),
db_path=str(tmp_path / "agent.db"),
)
def _stub_backend_accept():
return StubBackend()
def _stub_backend_review():
"""Stub whose data fails H2 so the pipeline routes to review."""
broken = dict(DEFAULT_STUB_DOCUMENT)
broken["total"] = 999.99
return StubBackend(data=broken, field_confidence={})
def _make_pdf(inbox: Path, name: str = "doc.pdf") -> Path:
"""Write a minimal valid-looking PDF file."""
p = inbox / name
p.write_bytes(b"%PDF-1.4 fake content")
return p
def _make_image(inbox: Path, name: str = "photo.jpeg") -> Path:
p = inbox / name
p.write_bytes(b"\xff\xd8\xff fake jpeg")
return p
def _make_corrupt(inbox: Path, name: str = "corrupt.pdf") -> Path:
"""Write a file that will cause file_sha256 to fail (directory trick via mock)."""
p = inbox / name
p.write_bytes(b"") # exists but empty; sha256 succeeds; we corrupt at process time
return p
def _patched_process_inbox(settings, backend, today=date(2024, 6, 1)):
"""Run process_inbox with an injected backend and acquire stub."""
def _acquire(path: Path, modality):
return DocumentPayload(modality=modality, source_path=path, text="stub text")
with patch("doc_agent.ingest.watcher.create_backend", return_value=backend), \
patch("doc_agent.core._make_acquire", return_value=_acquire), \
patch("doc_agent.core.date") as mock_date:
mock_date.today.return_value = today
return process_inbox(settings)
# ---------------------------------------------------------------------------
# _process_one: per-document behaviour
# ---------------------------------------------------------------------------
def test_accepted_document_moves_to_processed(tmp_path: Path) -> None:
"""An auto-accepted document ends up in processed/ and is persisted."""
settings = _settings(tmp_path)
inbox = Path(settings.inbox_dir)
inbox.mkdir(parents=True)
src = _make_pdf(inbox)
def _acquire(path, modality):
return DocumentPayload(modality=modality, source_path=path, text="stub")
with patch("doc_agent.core._make_acquire", return_value=_acquire):
_process_one(src, settings, _stub_backend_accept())
assert not src.exists()
assert (Path(settings.processed_dir) / src.name).exists()
assert record_count(Path(settings.db_path)) == 1
def test_review_document_moves_to_review(tmp_path: Path) -> None:
"""A document routed to review ends up in review/ and is NOT persisted."""
settings = _settings(tmp_path)
inbox = Path(settings.inbox_dir)
inbox.mkdir(parents=True)
src = _make_pdf(inbox)
def _acquire(path, modality):
return DocumentPayload(modality=modality, source_path=path, text="stub")
with patch("doc_agent.core._make_acquire", return_value=_acquire):
_process_one(src, settings, _stub_backend_review())
assert not src.exists()
assert (Path(settings.review_dir) / src.name).exists()
assert record_count(Path(settings.db_path)) == 0
def test_corrupt_file_routes_to_review_loop_continues(tmp_path: Path) -> None:
"""A file that raises during hashing is moved to review; no exception propagates."""
settings = _settings(tmp_path)
inbox = Path(settings.inbox_dir)
inbox.mkdir(parents=True)
src = _make_pdf(inbox, "corrupt.pdf")
with patch("doc_agent.ingest.watcher.file_sha256", side_effect=OSError("unreadable")):
_process_one(src, settings, _stub_backend_accept()) # must not raise
assert not src.exists()
assert (Path(settings.review_dir) / src.name).exists()
assert record_count(Path(settings.db_path)) == 0
def test_already_moved_file_is_skipped_silently(tmp_path: Path) -> None:
"""_process_one is a no-op when the file no longer exists (race guard)."""
settings = _settings(tmp_path)
ghost = tmp_path / "inbox" / "ghost.pdf" # never created
_process_one(ghost, settings, _stub_backend_accept()) # must not raise
assert record_count(Path(settings.db_path)) == 0
# ---------------------------------------------------------------------------
# process_inbox: batch mode
# ---------------------------------------------------------------------------
def test_batch_processes_mixed_files(tmp_path: Path) -> None:
"""A batch of PDF + image files all process; counts add up."""
settings = _settings(tmp_path)
inbox = Path(settings.inbox_dir)
inbox.mkdir(parents=True)
_make_pdf(inbox, "a.pdf")
_make_image(inbox, "b.jpeg")
counts = _patched_process_inbox(settings, _stub_backend_accept())
assert counts["processed"] == 2
assert counts["skipped"] == 0
def test_batch_unsupported_files_are_skipped(tmp_path: Path) -> None:
"""Files with unsupported extensions are skipped, not crashed on."""
settings = _settings(tmp_path)
inbox = Path(settings.inbox_dir)
inbox.mkdir(parents=True)
(inbox / "readme.txt").write_text("hello")
(inbox / "data.csv").write_text("a,b")
_make_pdf(inbox, "invoice.pdf")
counts = _patched_process_inbox(settings, _stub_backend_accept())
assert counts["skipped"] == 2
assert counts["processed"] == 1
def test_batch_corrupt_file_does_not_stop_loop(tmp_path: Path) -> None:
"""A corrupt file routes to review and the rest of the batch continues."""
settings = _settings(tmp_path)
inbox = Path(settings.inbox_dir)
inbox.mkdir(parents=True)
_make_pdf(inbox, "good.pdf")
_make_pdf(inbox, "corrupt.pdf")
_make_image(inbox, "photo.jpeg")
def _acquire(path, modality):
return DocumentPayload(modality=modality, source_path=path, text="stub")
def _flaky_hash(path):
if "corrupt" in path.name:
raise OSError("disk error")
from doc_agent.utils.hash import file_sha256 as _real
return _real(path)
with patch("doc_agent.ingest.watcher.create_backend", return_value=_stub_backend_accept()), \
patch("doc_agent.core._make_acquire", return_value=_acquire), \
patch("doc_agent.ingest.watcher.file_sha256", side_effect=_flaky_hash):
counts = process_inbox(settings)
assert counts["processed"] == 3 # all three attempted
assert (Path(settings.review_dir) / "corrupt.pdf").exists()
def test_batch_duplicate_hash_persisted_once(tmp_path: Path) -> None:
"""Two files with identical content produce only one DB record."""
settings = _settings(tmp_path)
inbox = Path(settings.inbox_dir)
inbox.mkdir(parents=True)
content = b"%PDF-1.4 identical"
(inbox / "a.pdf").write_bytes(content)
(inbox / "b.pdf").write_bytes(content)
counts = _patched_process_inbox(settings, _stub_backend_accept())
assert counts["processed"] == 2
assert record_count(Path(settings.db_path)) == 1
def test_batch_creates_directories(tmp_path: Path) -> None:
"""process_inbox creates inbox, processed, and review dirs if absent."""
settings = _settings(tmp_path)
# Directories do not exist yet.
assert not Path(settings.inbox_dir).exists()
_patched_process_inbox(settings, _stub_backend_accept())
assert Path(settings.inbox_dir).exists()
assert Path(settings.processed_dir).exists()
assert Path(settings.review_dir).exists()