"""Unit tests for the folder watcher / batch runner (build-plan task 4.2). All tests are fully offline: they use ``tmp_path`` for isolated directories, the ``StubBackend`` for deterministic extraction, and inject an ``acquire`` callable that skips real parsing. No network, no Docling, no real models. Acceptance criteria (T8): - A mixed batch (PDF + image + corrupt file) all process without stopping. - Valid files move to processed/ or review/ based on their decision. - A corrupt file (unreadable) routes to review/ with a logged reason. - A duplicate (same content hash) is persisted only once. - Unsupported file types are skipped, not crashed on. """ from __future__ import annotations from datetime import date from pathlib import Path from unittest.mock import patch from doc_agent.backends.base import DocumentPayload from doc_agent.backends.stub import DEFAULT_STUB_DOCUMENT, StubBackend from doc_agent.config import load_config from doc_agent.ingest.watcher import _process_one, process_inbox from doc_agent.store.db import record_count # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- def _settings(tmp_path: Path): """Settings pointing all paths into a tmp_path tree.""" return load_config( extraction_backend="ollama", image_strategy="ocr_then_text", inbox_dir=str(tmp_path / "inbox"), processed_dir=str(tmp_path / "processed"), review_dir=str(tmp_path / "review"), db_path=str(tmp_path / "agent.db"), ) def _stub_backend_accept(): return StubBackend() def _stub_backend_review(): """Stub whose data fails H2 so the pipeline routes to review.""" broken = dict(DEFAULT_STUB_DOCUMENT) broken["total"] = 999.99 return StubBackend(data=broken, field_confidence={}) def _make_pdf(inbox: Path, name: str = "doc.pdf") -> Path: """Write a minimal valid-looking PDF file.""" p = inbox / name p.write_bytes(b"%PDF-1.4 fake content") return p def _make_image(inbox: Path, name: str = "photo.jpeg") -> Path: p = inbox / name p.write_bytes(b"\xff\xd8\xff fake jpeg") return p def _make_corrupt(inbox: Path, name: str = "corrupt.pdf") -> Path: """Write a file that will cause file_sha256 to fail (directory trick via mock).""" p = inbox / name p.write_bytes(b"") # exists but empty; sha256 succeeds; we corrupt at process time return p def _patched_process_inbox(settings, backend, today=date(2024, 6, 1)): """Run process_inbox with an injected backend and acquire stub.""" def _acquire(path: Path, modality): return DocumentPayload(modality=modality, source_path=path, text="stub text") with patch("doc_agent.ingest.watcher.create_backend", return_value=backend), \ patch("doc_agent.core._make_acquire", return_value=_acquire), \ patch("doc_agent.core.date") as mock_date: mock_date.today.return_value = today return process_inbox(settings) # --------------------------------------------------------------------------- # _process_one: per-document behaviour # --------------------------------------------------------------------------- def test_accepted_document_moves_to_processed(tmp_path: Path) -> None: """An auto-accepted document ends up in processed/ and is persisted.""" settings = _settings(tmp_path) inbox = Path(settings.inbox_dir) inbox.mkdir(parents=True) src = _make_pdf(inbox) def _acquire(path, modality): return DocumentPayload(modality=modality, source_path=path, text="stub") with patch("doc_agent.core._make_acquire", return_value=_acquire): _process_one(src, settings, _stub_backend_accept()) assert not src.exists() assert (Path(settings.processed_dir) / src.name).exists() assert record_count(Path(settings.db_path)) == 1 def test_review_document_moves_to_review(tmp_path: Path) -> None: """A document routed to review ends up in review/ and is NOT persisted.""" settings = _settings(tmp_path) inbox = Path(settings.inbox_dir) inbox.mkdir(parents=True) src = _make_pdf(inbox) def _acquire(path, modality): return DocumentPayload(modality=modality, source_path=path, text="stub") with patch("doc_agent.core._make_acquire", return_value=_acquire): _process_one(src, settings, _stub_backend_review()) assert not src.exists() assert (Path(settings.review_dir) / src.name).exists() assert record_count(Path(settings.db_path)) == 0 def test_corrupt_file_routes_to_review_loop_continues(tmp_path: Path) -> None: """A file that raises during hashing is moved to review; no exception propagates.""" settings = _settings(tmp_path) inbox = Path(settings.inbox_dir) inbox.mkdir(parents=True) src = _make_pdf(inbox, "corrupt.pdf") with patch("doc_agent.ingest.watcher.file_sha256", side_effect=OSError("unreadable")): _process_one(src, settings, _stub_backend_accept()) # must not raise assert not src.exists() assert (Path(settings.review_dir) / src.name).exists() assert record_count(Path(settings.db_path)) == 0 def test_already_moved_file_is_skipped_silently(tmp_path: Path) -> None: """_process_one is a no-op when the file no longer exists (race guard).""" settings = _settings(tmp_path) ghost = tmp_path / "inbox" / "ghost.pdf" # never created _process_one(ghost, settings, _stub_backend_accept()) # must not raise assert record_count(Path(settings.db_path)) == 0 # --------------------------------------------------------------------------- # process_inbox: batch mode # --------------------------------------------------------------------------- def test_batch_processes_mixed_files(tmp_path: Path) -> None: """A batch of PDF + image files all process; counts add up.""" settings = _settings(tmp_path) inbox = Path(settings.inbox_dir) inbox.mkdir(parents=True) _make_pdf(inbox, "a.pdf") _make_image(inbox, "b.jpeg") counts = _patched_process_inbox(settings, _stub_backend_accept()) assert counts["processed"] == 2 assert counts["skipped"] == 0 def test_batch_unsupported_files_are_skipped(tmp_path: Path) -> None: """Files with unsupported extensions are skipped, not crashed on.""" settings = _settings(tmp_path) inbox = Path(settings.inbox_dir) inbox.mkdir(parents=True) (inbox / "readme.txt").write_text("hello") (inbox / "data.csv").write_text("a,b") _make_pdf(inbox, "invoice.pdf") counts = _patched_process_inbox(settings, _stub_backend_accept()) assert counts["skipped"] == 2 assert counts["processed"] == 1 def test_batch_corrupt_file_does_not_stop_loop(tmp_path: Path) -> None: """A corrupt file routes to review and the rest of the batch continues.""" settings = _settings(tmp_path) inbox = Path(settings.inbox_dir) inbox.mkdir(parents=True) _make_pdf(inbox, "good.pdf") _make_pdf(inbox, "corrupt.pdf") _make_image(inbox, "photo.jpeg") def _acquire(path, modality): return DocumentPayload(modality=modality, source_path=path, text="stub") def _flaky_hash(path): if "corrupt" in path.name: raise OSError("disk error") from doc_agent.utils.hash import file_sha256 as _real return _real(path) with patch("doc_agent.ingest.watcher.create_backend", return_value=_stub_backend_accept()), \ patch("doc_agent.core._make_acquire", return_value=_acquire), \ patch("doc_agent.ingest.watcher.file_sha256", side_effect=_flaky_hash): counts = process_inbox(settings) assert counts["processed"] == 3 # all three attempted assert (Path(settings.review_dir) / "corrupt.pdf").exists() def test_batch_duplicate_hash_persisted_once(tmp_path: Path) -> None: """Two files with identical content produce only one DB record.""" settings = _settings(tmp_path) inbox = Path(settings.inbox_dir) inbox.mkdir(parents=True) content = b"%PDF-1.4 identical" (inbox / "a.pdf").write_bytes(content) (inbox / "b.pdf").write_bytes(content) counts = _patched_process_inbox(settings, _stub_backend_accept()) assert counts["processed"] == 2 assert record_count(Path(settings.db_path)) == 1 def test_batch_creates_directories(tmp_path: Path) -> None: """process_inbox creates inbox, processed, and review dirs if absent.""" settings = _settings(tmp_path) # Directories do not exist yet. assert not Path(settings.inbox_dir).exists() _patched_process_inbox(settings, _stub_backend_accept()) assert Path(settings.inbox_dir).exists() assert Path(settings.processed_dir).exists() assert Path(settings.review_dir).exists()