Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| import traceback | |
| from pathlib import Path | |
| from uuid import uuid4 | |
| from pydantic import ValidationError | |
| from app.domain.tabular_stream import iter_mapped_article_file_rows | |
| from app.repositories.article_bulk_repository import ( | |
| REFERENCE_MAIN_CONFLICT, | |
| ArticleBulkRepository, | |
| ) | |
| from app.repositories.import_error_repository import ImportErrorRepository | |
| from app.repositories.import_repository import ArticleUpsertRecord, ImportRepository | |
| from app.schemas.article_import import ArticleImportMapping, ArticleImportOptions, ArticleImportRow | |
| from app.schemas.import_job import ImportLineErrorCreate | |
| from app.services.import_file_storage import ImportFileStorage | |
| class ArticleImportProcessor: | |
| _BATCH_SIZE = 10_000 | |
| _ERROR_FLUSH = 1_000 | |
| _ROWS_PROCESSED_FLUSH = 250 | |
| def __init__( | |
| self, | |
| import_repo: ImportRepository, | |
| error_repo: ImportErrorRepository, | |
| bulk_repo: ArticleBulkRepository, | |
| file_storage: ImportFileStorage, | |
| ) -> None: | |
| self._import_repo = import_repo | |
| self._error_repo = error_repo | |
| self._bulk_repo = bulk_repo | |
| self._file_storage = file_storage | |
| def _dedupe_batch(self, records: list[ArticleUpsertRecord]) -> list[ArticleUpsertRecord]: | |
| by_ref: dict[str, ArticleUpsertRecord] = {} | |
| for r in records: | |
| by_ref[r.reference_number] = r | |
| return list(by_ref.values()) | |
| async def _apply_batch( | |
| self, | |
| import_id: str, | |
| deduped: list[ArticleUpsertRecord], | |
| error_buffer: list[ImportLineErrorCreate], | |
| ) -> tuple[int, int]: | |
| """Upsert batch; return (row_succeeded_delta, row_failed_delta).""" | |
| conflicts = await self._bulk_repo.upsert_batch(deduped) | |
| conflict_by_row = dict(conflicts) | |
| succeeded_delta = len(deduped) - len(conflicts) | |
| failed_delta = len(conflicts) | |
| records_by_row = {r.row_number: r for r in deduped} | |
| for row_number, error_code in conflict_by_row.items(): | |
| rec = records_by_row.get(row_number) | |
| raw = ( | |
| json.dumps( | |
| { | |
| "reference_number": rec.reference_number if rec else None, | |
| "reference_new": rec.reference_new if rec else None, | |
| }, | |
| ensure_ascii=False, | |
| )[:2000] | |
| if rec | |
| else None | |
| ) | |
| error_buffer.append( | |
| ImportLineErrorCreate( | |
| import_id=import_id, | |
| row_number=row_number, | |
| column_name="reference_new", | |
| raw_value=raw, | |
| error_code=error_code or REFERENCE_MAIN_CONFLICT, | |
| error_message=( | |
| "Target main reference is already used by another article " | |
| "or duplicated in this batch" | |
| )[:2000], | |
| ) | |
| ) | |
| if len(error_buffer) >= self._ERROR_FLUSH: | |
| await self._error_repo.create_many(list(error_buffer)) | |
| await self._import_repo.add_row_counts( | |
| import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer) | |
| ) | |
| error_buffer.clear() | |
| return succeeded_delta, failed_delta | |
| async def process(self, import_id: str) -> None: | |
| error_buffer: list[ImportLineErrorCreate] = [] | |
| row_succeeded = 0 | |
| row_failed = 0 | |
| row_count: int | None = None | |
| path: Path | None = None | |
| counted_rows = 0 | |
| pending_processed_flush = 0 | |
| try: | |
| row = await self._import_repo.get_by_id(import_id) | |
| if row is None: | |
| return | |
| if row.type != "articles": | |
| await self._import_repo.set_finished( | |
| import_id, | |
| status="failed", | |
| row_count=None, | |
| total_rows=None, | |
| row_succeeded=0, | |
| row_failed=0, | |
| error_message="Import job type is not articles", | |
| ) | |
| return | |
| mapping = ArticleImportMapping.model_validate(row.mapping or {}) | |
| options = ArticleImportOptions.model_validate(row.options or {}) | |
| if not options.has_header: | |
| await self._import_repo.set_finished( | |
| import_id, | |
| status="failed", | |
| row_count=None, | |
| total_rows=None, | |
| row_succeeded=0, | |
| row_failed=0, | |
| error_message="has_header=false is not supported for article imports", | |
| ) | |
| return | |
| if not row.path: | |
| await self._import_repo.set_finished( | |
| import_id, | |
| status="failed", | |
| row_count=None, | |
| total_rows=None, | |
| row_succeeded=0, | |
| row_failed=0, | |
| error_message="Missing import storage path", | |
| ) | |
| return | |
| await self._import_repo.set_processing(import_id) | |
| try: | |
| path = await self._file_storage.download_to_temp(row.path, import_id) | |
| except Exception as dl_exc: # noqa: BLE001 | |
| await self._import_repo.set_finished( | |
| import_id, | |
| status="failed", | |
| row_count=None, | |
| total_rows=None, | |
| row_succeeded=0, | |
| row_failed=0, | |
| error_message=f"Storage download failed: {type(dl_exc).__name__}: {dl_exc}"[ | |
| :8000 | |
| ], | |
| ) | |
| return | |
| batch: list[ArticleUpsertRecord] = [] | |
| for file_row_number, payload in iter_mapped_article_file_rows(path, mapping, options): | |
| counted_rows += 1 | |
| pending_processed_flush += 1 | |
| if pending_processed_flush >= self._ROWS_PROCESSED_FLUSH: | |
| await self._import_repo.add_rows_processed(import_id, pending_processed_flush) | |
| pending_processed_flush = 0 | |
| try: | |
| ar = ArticleImportRow.model_validate({"row_number": file_row_number, **payload}) | |
| except ValidationError as exc: | |
| row_failed += 1 | |
| error_buffer.append( | |
| ImportLineErrorCreate( | |
| import_id=import_id, | |
| row_number=file_row_number, | |
| column_name=None, | |
| raw_value=json.dumps(payload, ensure_ascii=False)[:2000], | |
| error_code="ROW_VALIDATION", | |
| error_message=str(exc.errors())[:2000], | |
| ) | |
| ) | |
| if len(error_buffer) >= self._ERROR_FLUSH: | |
| await self._error_repo.create_many(list(error_buffer)) | |
| await self._import_repo.add_row_counts( | |
| import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer) | |
| ) | |
| error_buffer.clear() | |
| continue | |
| batch.append( | |
| ArticleUpsertRecord( | |
| article_id=str(uuid4()), | |
| row_number=ar.row_number, | |
| reference_number=ar.reference_number, | |
| label_fr=ar.label_fr, | |
| label_en=ar.label_en, | |
| category_id=str(ar.category_id) if ar.category_id is not None else None, | |
| moq=ar.moq, | |
| origin=ar.origin, | |
| reference_old=ar.reference_old, | |
| reference_new=ar.reference_new, | |
| ) | |
| ) | |
| if len(batch) >= self._BATCH_SIZE: | |
| deduped = self._dedupe_batch(batch) | |
| ok_delta, fail_delta = await self._apply_batch(import_id, deduped, error_buffer) | |
| row_succeeded += ok_delta | |
| row_failed += fail_delta | |
| if ok_delta > 0: | |
| await self._import_repo.add_row_counts( | |
| import_id, row_succeeded_delta=ok_delta, row_failed_delta=0 | |
| ) | |
| batch.clear() | |
| if batch: | |
| deduped = self._dedupe_batch(batch) | |
| ok_delta, fail_delta = await self._apply_batch(import_id, deduped, error_buffer) | |
| row_succeeded += ok_delta | |
| row_failed += fail_delta | |
| if ok_delta > 0: | |
| await self._import_repo.add_row_counts( | |
| import_id, row_succeeded_delta=ok_delta, row_failed_delta=0 | |
| ) | |
| if pending_processed_flush > 0: | |
| await self._import_repo.add_rows_processed(import_id, pending_processed_flush) | |
| pending_processed_flush = 0 | |
| row_count = counted_rows | |
| if error_buffer: | |
| await self._error_repo.create_many(list(error_buffer)) | |
| await self._import_repo.add_row_counts( | |
| import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer) | |
| ) | |
| error_buffer.clear() | |
| final_status = "succeeded" | |
| if row_failed > 0 and row_succeeded > 0: | |
| final_status = "partial" | |
| elif row_failed > 0 and row_succeeded == 0: | |
| final_status = "failed" | |
| await self._import_repo.set_finished( | |
| import_id, | |
| status=final_status, | |
| row_count=row_count, | |
| total_rows=row_count, | |
| row_succeeded=row_succeeded, | |
| row_failed=row_failed, | |
| error_message=None, | |
| ) | |
| except Exception as exc: # noqa: BLE001 | |
| if pending_processed_flush > 0: | |
| await self._import_repo.add_rows_processed(import_id, pending_processed_flush) | |
| pending_processed_flush = 0 | |
| if error_buffer: | |
| await self._error_repo.create_many(list(error_buffer)) | |
| await self._import_repo.add_row_counts( | |
| import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer) | |
| ) | |
| error_buffer.clear() | |
| final_row_count = counted_rows if counted_rows > 0 else row_count | |
| await self._import_repo.set_finished( | |
| import_id, | |
| status="failed", | |
| row_count=final_row_count, | |
| total_rows=final_row_count, | |
| row_succeeded=row_succeeded, | |
| row_failed=row_failed, | |
| error_message=f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}"[:8000], | |
| ) | |
| finally: | |
| if path is not None: | |
| try: | |
| path.unlink(missing_ok=True) | |
| except OSError: | |
| pass | |