afp-backend / app /services /article_import_processor.py
cdupland
feat: enhance article import functionality with reference management
96d8d92
from __future__ import annotations
import json
import traceback
from pathlib import Path
from uuid import uuid4
from pydantic import ValidationError
from app.domain.tabular_stream import iter_mapped_article_file_rows
from app.repositories.article_bulk_repository import (
REFERENCE_MAIN_CONFLICT,
ArticleBulkRepository,
)
from app.repositories.import_error_repository import ImportErrorRepository
from app.repositories.import_repository import ArticleUpsertRecord, ImportRepository
from app.schemas.article_import import ArticleImportMapping, ArticleImportOptions, ArticleImportRow
from app.schemas.import_job import ImportLineErrorCreate
from app.services.import_file_storage import ImportFileStorage
class ArticleImportProcessor:
_BATCH_SIZE = 10_000
_ERROR_FLUSH = 1_000
_ROWS_PROCESSED_FLUSH = 250
def __init__(
self,
import_repo: ImportRepository,
error_repo: ImportErrorRepository,
bulk_repo: ArticleBulkRepository,
file_storage: ImportFileStorage,
) -> None:
self._import_repo = import_repo
self._error_repo = error_repo
self._bulk_repo = bulk_repo
self._file_storage = file_storage
def _dedupe_batch(self, records: list[ArticleUpsertRecord]) -> list[ArticleUpsertRecord]:
by_ref: dict[str, ArticleUpsertRecord] = {}
for r in records:
by_ref[r.reference_number] = r
return list(by_ref.values())
async def _apply_batch(
self,
import_id: str,
deduped: list[ArticleUpsertRecord],
error_buffer: list[ImportLineErrorCreate],
) -> tuple[int, int]:
"""Upsert batch; return (row_succeeded_delta, row_failed_delta)."""
conflicts = await self._bulk_repo.upsert_batch(deduped)
conflict_by_row = dict(conflicts)
succeeded_delta = len(deduped) - len(conflicts)
failed_delta = len(conflicts)
records_by_row = {r.row_number: r for r in deduped}
for row_number, error_code in conflict_by_row.items():
rec = records_by_row.get(row_number)
raw = (
json.dumps(
{
"reference_number": rec.reference_number if rec else None,
"reference_new": rec.reference_new if rec else None,
},
ensure_ascii=False,
)[:2000]
if rec
else None
)
error_buffer.append(
ImportLineErrorCreate(
import_id=import_id,
row_number=row_number,
column_name="reference_new",
raw_value=raw,
error_code=error_code or REFERENCE_MAIN_CONFLICT,
error_message=(
"Target main reference is already used by another article "
"or duplicated in this batch"
)[:2000],
)
)
if len(error_buffer) >= self._ERROR_FLUSH:
await self._error_repo.create_many(list(error_buffer))
await self._import_repo.add_row_counts(
import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer)
)
error_buffer.clear()
return succeeded_delta, failed_delta
async def process(self, import_id: str) -> None:
error_buffer: list[ImportLineErrorCreate] = []
row_succeeded = 0
row_failed = 0
row_count: int | None = None
path: Path | None = None
counted_rows = 0
pending_processed_flush = 0
try:
row = await self._import_repo.get_by_id(import_id)
if row is None:
return
if row.type != "articles":
await self._import_repo.set_finished(
import_id,
status="failed",
row_count=None,
total_rows=None,
row_succeeded=0,
row_failed=0,
error_message="Import job type is not articles",
)
return
mapping = ArticleImportMapping.model_validate(row.mapping or {})
options = ArticleImportOptions.model_validate(row.options or {})
if not options.has_header:
await self._import_repo.set_finished(
import_id,
status="failed",
row_count=None,
total_rows=None,
row_succeeded=0,
row_failed=0,
error_message="has_header=false is not supported for article imports",
)
return
if not row.path:
await self._import_repo.set_finished(
import_id,
status="failed",
row_count=None,
total_rows=None,
row_succeeded=0,
row_failed=0,
error_message="Missing import storage path",
)
return
await self._import_repo.set_processing(import_id)
try:
path = await self._file_storage.download_to_temp(row.path, import_id)
except Exception as dl_exc: # noqa: BLE001
await self._import_repo.set_finished(
import_id,
status="failed",
row_count=None,
total_rows=None,
row_succeeded=0,
row_failed=0,
error_message=f"Storage download failed: {type(dl_exc).__name__}: {dl_exc}"[
:8000
],
)
return
batch: list[ArticleUpsertRecord] = []
for file_row_number, payload in iter_mapped_article_file_rows(path, mapping, options):
counted_rows += 1
pending_processed_flush += 1
if pending_processed_flush >= self._ROWS_PROCESSED_FLUSH:
await self._import_repo.add_rows_processed(import_id, pending_processed_flush)
pending_processed_flush = 0
try:
ar = ArticleImportRow.model_validate({"row_number": file_row_number, **payload})
except ValidationError as exc:
row_failed += 1
error_buffer.append(
ImportLineErrorCreate(
import_id=import_id,
row_number=file_row_number,
column_name=None,
raw_value=json.dumps(payload, ensure_ascii=False)[:2000],
error_code="ROW_VALIDATION",
error_message=str(exc.errors())[:2000],
)
)
if len(error_buffer) >= self._ERROR_FLUSH:
await self._error_repo.create_many(list(error_buffer))
await self._import_repo.add_row_counts(
import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer)
)
error_buffer.clear()
continue
batch.append(
ArticleUpsertRecord(
article_id=str(uuid4()),
row_number=ar.row_number,
reference_number=ar.reference_number,
label_fr=ar.label_fr,
label_en=ar.label_en,
category_id=str(ar.category_id) if ar.category_id is not None else None,
moq=ar.moq,
origin=ar.origin,
reference_old=ar.reference_old,
reference_new=ar.reference_new,
)
)
if len(batch) >= self._BATCH_SIZE:
deduped = self._dedupe_batch(batch)
ok_delta, fail_delta = await self._apply_batch(import_id, deduped, error_buffer)
row_succeeded += ok_delta
row_failed += fail_delta
if ok_delta > 0:
await self._import_repo.add_row_counts(
import_id, row_succeeded_delta=ok_delta, row_failed_delta=0
)
batch.clear()
if batch:
deduped = self._dedupe_batch(batch)
ok_delta, fail_delta = await self._apply_batch(import_id, deduped, error_buffer)
row_succeeded += ok_delta
row_failed += fail_delta
if ok_delta > 0:
await self._import_repo.add_row_counts(
import_id, row_succeeded_delta=ok_delta, row_failed_delta=0
)
if pending_processed_flush > 0:
await self._import_repo.add_rows_processed(import_id, pending_processed_flush)
pending_processed_flush = 0
row_count = counted_rows
if error_buffer:
await self._error_repo.create_many(list(error_buffer))
await self._import_repo.add_row_counts(
import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer)
)
error_buffer.clear()
final_status = "succeeded"
if row_failed > 0 and row_succeeded > 0:
final_status = "partial"
elif row_failed > 0 and row_succeeded == 0:
final_status = "failed"
await self._import_repo.set_finished(
import_id,
status=final_status,
row_count=row_count,
total_rows=row_count,
row_succeeded=row_succeeded,
row_failed=row_failed,
error_message=None,
)
except Exception as exc: # noqa: BLE001
if pending_processed_flush > 0:
await self._import_repo.add_rows_processed(import_id, pending_processed_flush)
pending_processed_flush = 0
if error_buffer:
await self._error_repo.create_many(list(error_buffer))
await self._import_repo.add_row_counts(
import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer)
)
error_buffer.clear()
final_row_count = counted_rows if counted_rows > 0 else row_count
await self._import_repo.set_finished(
import_id,
status="failed",
row_count=final_row_count,
total_rows=final_row_count,
row_succeeded=row_succeeded,
row_failed=row_failed,
error_message=f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}"[:8000],
)
finally:
if path is not None:
try:
path.unlink(missing_ok=True)
except OSError:
pass