Spaces:
Sleeping
Sleeping
File size: 1,853 Bytes
8cddb04 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock
import pytest
from app.services.article_import_processor import ArticleImportProcessor
@pytest.mark.unit
@pytest.mark.asyncio
async def test_process_single_pass_sets_total_rows_at_finish(tmp_path) -> None:
csv_path = tmp_path / "job.csv"
csv_path.write_text("Ref,Nom\nR1,Label\n", encoding="utf-8")
import_id = "test-import-id"
object_key = f"catalogues/{import_id}.csv"
job_row = SimpleNamespace(
type="articles",
mapping={"reference_number": "Ref", "label_fr": "Nom"},
options={"has_header": True, "delimiter": ",", "encoding": "utf-8"},
path=object_key,
)
import_repo = AsyncMock()
import_repo.get_by_id.return_value = job_row
error_repo = AsyncMock()
bulk_repo = AsyncMock()
async def fake_download(_key: str, iid: str) -> Path:
dest = tmp_path / f"{iid}.csv"
dest.write_bytes(csv_path.read_bytes())
return dest
file_storage = AsyncMock()
file_storage.download_to_temp.side_effect = fake_download
processor = ArticleImportProcessor(import_repo, error_repo, bulk_repo, file_storage)
await processor.process(import_id)
import_repo.set_total_rows.assert_not_called()
file_storage.download_to_temp.assert_awaited_once_with(object_key, import_id)
import_repo.set_processing.assert_awaited_once_with(import_id)
bulk_repo.upsert_batch.assert_awaited_once()
import_repo.set_finished.assert_awaited_once()
_args, kwargs = import_repo.set_finished.call_args
assert kwargs["status"] == "succeeded"
assert kwargs["row_count"] == 1
assert kwargs["total_rows"] == 1
assert kwargs["row_succeeded"] == 1
assert kwargs["row_failed"] == 0
assert (tmp_path / f"{import_id}.csv").exists() is False
|