File size: 11,331 Bytes
51e5253
 
 
 
 
 
 
 
 
8cddb04
96d8d92
 
 
 
51e5253
 
 
 
8cddb04
51e5253
 
 
 
 
860c252
51e5253
 
 
 
 
 
8cddb04
51e5253
 
 
 
8cddb04
51e5253
 
 
 
 
 
 
96d8d92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51e5253
 
 
 
 
 
860c252
 
51e5253
 
 
 
 
 
 
 
 
 
8cddb04
51e5253
 
 
 
 
 
 
 
 
 
 
 
 
8cddb04
51e5253
 
 
 
 
 
 
 
 
 
 
8cddb04
51e5253
 
8cddb04
51e5253
 
 
 
860c252
8cddb04
 
860c252
 
 
 
8cddb04
860c252
 
8cddb04
860c252
 
 
 
 
51e5253
 
 
 
860c252
 
 
 
 
51e5253
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96d8d92
51e5253
 
 
 
 
 
 
 
 
 
 
 
 
 
860c252
 
96d8d92
 
51e5253
 
 
 
 
96d8d92
 
 
 
 
 
 
51e5253
 
 
 
96d8d92
 
 
 
 
 
 
51e5253
860c252
 
 
 
8cddb04
51e5253
 
96d8d92
51e5253
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8cddb04
51e5253
 
 
 
 
860c252
 
 
51e5253
96d8d92
51e5253
 
 
 
8cddb04
51e5253
 
 
8cddb04
 
51e5253
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
from __future__ import annotations

import json
import traceback
from pathlib import Path
from uuid import uuid4

from pydantic import ValidationError

from app.domain.tabular_stream import iter_mapped_article_file_rows
from app.repositories.article_bulk_repository import (
    REFERENCE_MAIN_CONFLICT,
    ArticleBulkRepository,
)
from app.repositories.import_error_repository import ImportErrorRepository
from app.repositories.import_repository import ArticleUpsertRecord, ImportRepository
from app.schemas.article_import import ArticleImportMapping, ArticleImportOptions, ArticleImportRow
from app.schemas.import_job import ImportLineErrorCreate
from app.services.import_file_storage import ImportFileStorage


class ArticleImportProcessor:
    _BATCH_SIZE = 10_000
    _ERROR_FLUSH = 1_000
    _ROWS_PROCESSED_FLUSH = 250

    def __init__(
        self,
        import_repo: ImportRepository,
        error_repo: ImportErrorRepository,
        bulk_repo: ArticleBulkRepository,
        file_storage: ImportFileStorage,
    ) -> None:
        self._import_repo = import_repo
        self._error_repo = error_repo
        self._bulk_repo = bulk_repo
        self._file_storage = file_storage

    def _dedupe_batch(self, records: list[ArticleUpsertRecord]) -> list[ArticleUpsertRecord]:
        by_ref: dict[str, ArticleUpsertRecord] = {}
        for r in records:
            by_ref[r.reference_number] = r
        return list(by_ref.values())

    async def _apply_batch(
        self,
        import_id: str,
        deduped: list[ArticleUpsertRecord],
        error_buffer: list[ImportLineErrorCreate],
    ) -> tuple[int, int]:
        """Upsert batch; return (row_succeeded_delta, row_failed_delta)."""
        conflicts = await self._bulk_repo.upsert_batch(deduped)
        conflict_by_row = dict(conflicts)
        succeeded_delta = len(deduped) - len(conflicts)
        failed_delta = len(conflicts)
        records_by_row = {r.row_number: r for r in deduped}
        for row_number, error_code in conflict_by_row.items():
            rec = records_by_row.get(row_number)
            raw = (
                json.dumps(
                    {
                        "reference_number": rec.reference_number if rec else None,
                        "reference_new": rec.reference_new if rec else None,
                    },
                    ensure_ascii=False,
                )[:2000]
                if rec
                else None
            )
            error_buffer.append(
                ImportLineErrorCreate(
                    import_id=import_id,
                    row_number=row_number,
                    column_name="reference_new",
                    raw_value=raw,
                    error_code=error_code or REFERENCE_MAIN_CONFLICT,
                    error_message=(
                        "Target main reference is already used by another article "
                        "or duplicated in this batch"
                    )[:2000],
                )
            )
        if len(error_buffer) >= self._ERROR_FLUSH:
            await self._error_repo.create_many(list(error_buffer))
            await self._import_repo.add_row_counts(
                import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer)
            )
            error_buffer.clear()
        return succeeded_delta, failed_delta

    async def process(self, import_id: str) -> None:
        error_buffer: list[ImportLineErrorCreate] = []
        row_succeeded = 0
        row_failed = 0
        row_count: int | None = None
        path: Path | None = None
        counted_rows = 0
        pending_processed_flush = 0

        try:
            row = await self._import_repo.get_by_id(import_id)
            if row is None:
                return
            if row.type != "articles":
                await self._import_repo.set_finished(
                    import_id,
                    status="failed",
                    row_count=None,
                    total_rows=None,
                    row_succeeded=0,
                    row_failed=0,
                    error_message="Import job type is not articles",
                )
                return

            mapping = ArticleImportMapping.model_validate(row.mapping or {})
            options = ArticleImportOptions.model_validate(row.options or {})
            if not options.has_header:
                await self._import_repo.set_finished(
                    import_id,
                    status="failed",
                    row_count=None,
                    total_rows=None,
                    row_succeeded=0,
                    row_failed=0,
                    error_message="has_header=false is not supported for article imports",
                )
                return

            if not row.path:
                await self._import_repo.set_finished(
                    import_id,
                    status="failed",
                    row_count=None,
                    total_rows=None,
                    row_succeeded=0,
                    row_failed=0,
                    error_message="Missing import storage path",
                )
                return

            await self._import_repo.set_processing(import_id)
            try:
                path = await self._file_storage.download_to_temp(row.path, import_id)
            except Exception as dl_exc:  # noqa: BLE001
                await self._import_repo.set_finished(
                    import_id,
                    status="failed",
                    row_count=None,
                    total_rows=None,
                    row_succeeded=0,
                    row_failed=0,
                    error_message=f"Storage download failed: {type(dl_exc).__name__}: {dl_exc}"[
                        :8000
                    ],
                )
                return

            batch: list[ArticleUpsertRecord] = []

            for file_row_number, payload in iter_mapped_article_file_rows(path, mapping, options):
                counted_rows += 1
                pending_processed_flush += 1
                if pending_processed_flush >= self._ROWS_PROCESSED_FLUSH:
                    await self._import_repo.add_rows_processed(import_id, pending_processed_flush)
                    pending_processed_flush = 0

                try:
                    ar = ArticleImportRow.model_validate({"row_number": file_row_number, **payload})
                except ValidationError as exc:
                    row_failed += 1
                    error_buffer.append(
                        ImportLineErrorCreate(
                            import_id=import_id,
                            row_number=file_row_number,
                            column_name=None,
                            raw_value=json.dumps(payload, ensure_ascii=False)[:2000],
                            error_code="ROW_VALIDATION",
                            error_message=str(exc.errors())[:2000],
                        )
                    )
                    if len(error_buffer) >= self._ERROR_FLUSH:
                        await self._error_repo.create_many(list(error_buffer))
                        await self._import_repo.add_row_counts(
                            import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer)
                        )
                        error_buffer.clear()
                    continue

                batch.append(
                    ArticleUpsertRecord(
                        article_id=str(uuid4()),
                        row_number=ar.row_number,
                        reference_number=ar.reference_number,
                        label_fr=ar.label_fr,
                        label_en=ar.label_en,
                        category_id=str(ar.category_id) if ar.category_id is not None else None,
                        moq=ar.moq,
                        origin=ar.origin,
                        reference_old=ar.reference_old,
                        reference_new=ar.reference_new,
                    )
                )

                if len(batch) >= self._BATCH_SIZE:
                    deduped = self._dedupe_batch(batch)
                    ok_delta, fail_delta = await self._apply_batch(import_id, deduped, error_buffer)
                    row_succeeded += ok_delta
                    row_failed += fail_delta
                    if ok_delta > 0:
                        await self._import_repo.add_row_counts(
                            import_id, row_succeeded_delta=ok_delta, row_failed_delta=0
                        )
                    batch.clear()

            if batch:
                deduped = self._dedupe_batch(batch)
                ok_delta, fail_delta = await self._apply_batch(import_id, deduped, error_buffer)
                row_succeeded += ok_delta
                row_failed += fail_delta
                if ok_delta > 0:
                    await self._import_repo.add_row_counts(
                        import_id, row_succeeded_delta=ok_delta, row_failed_delta=0
                    )

            if pending_processed_flush > 0:
                await self._import_repo.add_rows_processed(import_id, pending_processed_flush)
                pending_processed_flush = 0

            row_count = counted_rows

            if error_buffer:
                await self._error_repo.create_many(list(error_buffer))
                await self._import_repo.add_row_counts(
                    import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer)
                )
                error_buffer.clear()

            final_status = "succeeded"
            if row_failed > 0 and row_succeeded > 0:
                final_status = "partial"
            elif row_failed > 0 and row_succeeded == 0:
                final_status = "failed"

            await self._import_repo.set_finished(
                import_id,
                status=final_status,
                row_count=row_count,
                total_rows=row_count,
                row_succeeded=row_succeeded,
                row_failed=row_failed,
                error_message=None,
            )
        except Exception as exc:  # noqa: BLE001
            if pending_processed_flush > 0:
                await self._import_repo.add_rows_processed(import_id, pending_processed_flush)
                pending_processed_flush = 0
            if error_buffer:
                await self._error_repo.create_many(list(error_buffer))
                await self._import_repo.add_row_counts(
                    import_id, row_succeeded_delta=0, row_failed_delta=len(error_buffer)
                )
                error_buffer.clear()
            final_row_count = counted_rows if counted_rows > 0 else row_count
            await self._import_repo.set_finished(
                import_id,
                status="failed",
                row_count=final_row_count,
                total_rows=final_row_count,
                row_succeeded=row_succeeded,
                row_failed=row_failed,
                error_message=f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}"[:8000],
            )
        finally:
            if path is not None:
                try:
                    path.unlink(missing_ok=True)
                except OSError:
                    pass