from __future__ import annotations import json import traceback from pathlib import Path from uuid import uuid4 from pydantic import ValidationError from app.domain.tabular_stream import iter_mapped_article_file_rows from app.repositories.article_bulk_repository import ( REFERENCE_MAIN_CONFLICT, ArticleBulkRepository, ) from app.repositories.import_error_repository import ImportErrorRepository from app.repositories.import_repository import ArticleUpsertRecord, ImportRepository from app.schemas.article_import import ArticleImportMapping, ArticleImportOptions, ArticleImportRow from app.schemas.import_job import ImportLineErrorCreate from app.services.import_file_storage import ImportFileStorage class ArticleImportProcessor: _BATCH_SIZE = 10_000 _ERROR_FLUSH = 1_000 _ROWS_PROCESSED_FLUSH = 250 def __init__( self, import_repo: ImportRepository, error_repo: ImportErrorRepository, bulk_repo: ArticleBulkRepository, file_storage: ImportFileStorage, ) -> None: self._import_repo = import_repo self._error_repo = error_repo self._bulk_repo = bulk_repo self._file_storage = file_storage def _dedupe_batch(self, records: list[ArticleUpsertRecord]) -> list[ArticleUpsertRecord]: by_ref: dict[str, ArticleUpsertRecord] = {} for r in records: by_ref[r.reference_number] = r return list(by_ref.values()) async def _apply_batch( self, import_id: str, deduped: list[ArticleUpsertRecord], error_buffer: list[ImportLineErrorCreate], ) -> tuple[int, int]: """Upsert batch; return (row_succeeded_delta, row_failed_delta).""" conflicts = await self._bulk_repo.upsert_batch(deduped) conflict_by_row = dict(conflicts) succeeded_delta = len(deduped) - len(conflicts) failed_delta = len(conflicts) records_by_row = {r.row_number: r for r in deduped} for row_number, error_code in conflict_by_row.items(): rec = records_by_row.get(row_number) raw = ( json.dumps( { "reference_number": rec.reference_number if rec else None, "reference_new": rec.reference_new if rec else None, }, ensure_ascii=False, )[:2000] if rec else None ) error_buffer.append( ImportLineErrorCreate( import_id=import_id, row_number=row_number, column_name="reference_new", raw_value=raw, error_code=error_code or REFERENCE_MAIN_CONFLICT, error_message=( "Target main reference is already used by another article " "or duplicated in this batch" )[:2000], ) ) if len(error_buffer) >= self._ERROR_FLUSH: await self._error_repo.create_many(list(error_buffer)) await self._import_repo.add_row_counts( import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer) ) error_buffer.clear() return succeeded_delta, failed_delta async def process(self, import_id: str) -> None: error_buffer: list[ImportLineErrorCreate] = [] row_succeeded = 0 row_failed = 0 row_count: int | None = None path: Path | None = None counted_rows = 0 pending_processed_flush = 0 try: row = await self._import_repo.get_by_id(import_id) if row is None: return if row.type != "articles": await self._import_repo.set_finished( import_id, status="failed", row_count=None, total_rows=None, row_succeeded=0, row_failed=0, error_message="Import job type is not articles", ) return mapping = ArticleImportMapping.model_validate(row.mapping or {}) options = ArticleImportOptions.model_validate(row.options or {}) if not options.has_header: await self._import_repo.set_finished( import_id, status="failed", row_count=None, total_rows=None, row_succeeded=0, row_failed=0, error_message="has_header=false is not supported for article imports", ) return if not row.path: await self._import_repo.set_finished( import_id, status="failed", row_count=None, total_rows=None, row_succeeded=0, row_failed=0, error_message="Missing import storage path", ) return await self._import_repo.set_processing(import_id) try: path = await self._file_storage.download_to_temp(row.path, import_id) except Exception as dl_exc: # noqa: BLE001 await self._import_repo.set_finished( import_id, status="failed", row_count=None, total_rows=None, row_succeeded=0, row_failed=0, error_message=f"Storage download failed: {type(dl_exc).__name__}: {dl_exc}"[ :8000 ], ) return batch: list[ArticleUpsertRecord] = [] for file_row_number, payload in iter_mapped_article_file_rows(path, mapping, options): counted_rows += 1 pending_processed_flush += 1 if pending_processed_flush >= self._ROWS_PROCESSED_FLUSH: await self._import_repo.add_rows_processed(import_id, pending_processed_flush) pending_processed_flush = 0 try: ar = ArticleImportRow.model_validate({"row_number": file_row_number, **payload}) except ValidationError as exc: row_failed += 1 error_buffer.append( ImportLineErrorCreate( import_id=import_id, row_number=file_row_number, column_name=None, raw_value=json.dumps(payload, ensure_ascii=False)[:2000], error_code="ROW_VALIDATION", error_message=str(exc.errors())[:2000], ) ) if len(error_buffer) >= self._ERROR_FLUSH: await self._error_repo.create_many(list(error_buffer)) await self._import_repo.add_row_counts( import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer) ) error_buffer.clear() continue batch.append( ArticleUpsertRecord( article_id=str(uuid4()), row_number=ar.row_number, reference_number=ar.reference_number, label_fr=ar.label_fr, label_en=ar.label_en, category_id=str(ar.category_id) if ar.category_id is not None else None, moq=ar.moq, origin=ar.origin, reference_old=ar.reference_old, reference_new=ar.reference_new, ) ) if len(batch) >= self._BATCH_SIZE: deduped = self._dedupe_batch(batch) ok_delta, fail_delta = await self._apply_batch(import_id, deduped, error_buffer) row_succeeded += ok_delta row_failed += fail_delta if ok_delta > 0: await self._import_repo.add_row_counts( import_id, row_succeeded_delta=ok_delta, row_failed_delta=0 ) batch.clear() if batch: deduped = self._dedupe_batch(batch) ok_delta, fail_delta = await self._apply_batch(import_id, deduped, error_buffer) row_succeeded += ok_delta row_failed += fail_delta if ok_delta > 0: await self._import_repo.add_row_counts( import_id, row_succeeded_delta=ok_delta, row_failed_delta=0 ) if pending_processed_flush > 0: await self._import_repo.add_rows_processed(import_id, pending_processed_flush) pending_processed_flush = 0 row_count = counted_rows if error_buffer: await self._error_repo.create_many(list(error_buffer)) await self._import_repo.add_row_counts( import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer) ) error_buffer.clear() final_status = "succeeded" if row_failed > 0 and row_succeeded > 0: final_status = "partial" elif row_failed > 0 and row_succeeded == 0: final_status = "failed" await self._import_repo.set_finished( import_id, status=final_status, row_count=row_count, total_rows=row_count, row_succeeded=row_succeeded, row_failed=row_failed, error_message=None, ) except Exception as exc: # noqa: BLE001 if pending_processed_flush > 0: await self._import_repo.add_rows_processed(import_id, pending_processed_flush) pending_processed_flush = 0 if error_buffer: await self._error_repo.create_many(list(error_buffer)) await self._import_repo.add_row_counts( import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer) ) error_buffer.clear() final_row_count = counted_rows if counted_rows > 0 else row_count await self._import_repo.set_finished( import_id, status="failed", row_count=final_row_count, total_rows=final_row_count, row_succeeded=row_succeeded, row_failed=row_failed, error_message=f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}"[:8000], ) finally: if path is not None: try: path.unlink(missing_ok=True) except OSError: pass