| from __future__ import annotations |
|
|
| import uuid |
| from datetime import datetime, timezone |
| from io import BytesIO |
| from typing import Any |
|
|
| import openpyxl |
|
|
| from .dedup import dedup_records |
| from .errors import ExcelParseError |
| from .mapper import compute_row_hash, map_row |
| from .parsers import select_parser |
|
|
|
|
| def _read_headers_from_workbook(workbook: openpyxl.Workbook) -> list[str]: |
| ws = workbook.active |
| max_scan = min(ws.max_row or 0, 20) |
| for r in range(1, max_scan + 1): |
| row_values = [ws.cell(row=r, column=c).value for c in range(1, (ws.max_column or 0) + 1)] |
| non_empty = [v for v in row_values if v is not None and str(v).strip() != ""] |
| if len(non_empty) >= 1: |
| headers: list[str] = [] |
| for idx, v in enumerate(row_values): |
| s = "" if v is None else str(v).strip() |
| headers.append(s if s else f"col_{idx + 1}") |
| return headers |
| return [] |
|
|
|
|
| def import_excel_bytes( |
| data: bytes, |
| *, |
| operator: str, |
| source_name: str | None = None, |
| filename: str | None = None, |
| limit: int = 50, |
| ) -> dict[str, Any]: |
| try: |
| workbook = openpyxl.load_workbook(BytesIO(data), data_only=True) |
| except Exception as e: |
| raise ExcelParseError(f"Excel 读取失败: {e}") from e |
|
|
| headers = _read_headers_from_workbook(workbook) |
| parser = select_parser(headers) |
| parsed_rows = parser.parse(workbook) |
|
|
| import_id = uuid.uuid4().hex |
| imported_at = datetime.now(timezone.utc).isoformat() |
| resolved_source_name = str(source_name or filename or "excel_import") |
|
|
| records: list[dict[str, Any]] = [] |
| errors = 0 |
| mapped = 0 |
|
|
| for row in parsed_rows: |
| raw = dict(row.values) |
| row_hash = compute_row_hash(raw) |
| normalized, dedup_key, issues = map_row(raw) |
| meta: dict[str, Any] = { |
| "import_id": import_id, |
| "imported_at": imported_at, |
| "operator": operator, |
| "source_name": resolved_source_name, |
| "filename": filename, |
| "parser": parser.name, |
| "sheet": row.sheet, |
| "row_number": row.row_number, |
| "row_hash": row_hash, |
| "dedup_key": dedup_key, |
| } |
| if issues: |
| meta["issues"] = issues |
| if normalized is None: |
| errors += 1 |
| else: |
| mapped += 1 |
| records.append({"raw": raw, "normalized": normalized, "meta": meta}) |
|
|
| deduped, dedup_stats = dedup_records([r for r in records if r.get("normalized") is not None]) |
|
|
| stats = { |
| "import_id": import_id, |
| "parser": parser.name, |
| "total_rows": len(parsed_rows), |
| "mapped_rows": mapped, |
| "error_rows": errors, |
| "duplicates": dedup_stats["duplicates"], |
| "deduped_rows": dedup_stats["deduped"], |
| } |
|
|
| output_records = deduped[: max(0, limit)] |
| return {"stats": stats, "records": output_records} |
|
|