from __future__ import annotations import uuid from datetime import datetime, timezone from io import BytesIO from typing import Any import openpyxl from .dedup import dedup_records from .errors import ExcelParseError from .mapper import compute_row_hash, map_row from .parsers import select_parser def _read_headers_from_workbook(workbook: openpyxl.Workbook) -> list[str]: ws = workbook.active max_scan = min(ws.max_row or 0, 20) for r in range(1, max_scan + 1): row_values = [ws.cell(row=r, column=c).value for c in range(1, (ws.max_column or 0) + 1)] non_empty = [v for v in row_values if v is not None and str(v).strip() != ""] if len(non_empty) >= 1: headers: list[str] = [] for idx, v in enumerate(row_values): s = "" if v is None else str(v).strip() headers.append(s if s else f"col_{idx + 1}") return headers return [] def import_excel_bytes( data: bytes, *, operator: str, source_name: str | None = None, filename: str | None = None, limit: int = 50, ) -> dict[str, Any]: try: workbook = openpyxl.load_workbook(BytesIO(data), data_only=True) except Exception as e: raise ExcelParseError(f"Excel 读取失败: {e}") from e headers = _read_headers_from_workbook(workbook) parser = select_parser(headers) parsed_rows = parser.parse(workbook) import_id = uuid.uuid4().hex imported_at = datetime.now(timezone.utc).isoformat() resolved_source_name = str(source_name or filename or "excel_import") records: list[dict[str, Any]] = [] errors = 0 mapped = 0 for row in parsed_rows: raw = dict(row.values) row_hash = compute_row_hash(raw) normalized, dedup_key, issues = map_row(raw) meta: dict[str, Any] = { "import_id": import_id, "imported_at": imported_at, "operator": operator, "source_name": resolved_source_name, "filename": filename, "parser": parser.name, "sheet": row.sheet, "row_number": row.row_number, "row_hash": row_hash, "dedup_key": dedup_key, } if issues: meta["issues"] = issues if normalized is None: errors += 1 else: mapped += 1 records.append({"raw": raw, "normalized": normalized, "meta": meta}) deduped, dedup_stats = dedup_records([r for r in records if r.get("normalized") is not None]) stats = { "import_id": import_id, "parser": parser.name, "total_rows": len(parsed_rows), "mapped_rows": mapped, "error_rows": errors, "duplicates": dedup_stats["duplicates"], "deduped_rows": dedup_stats["deduped"], } output_records = deduped[: max(0, limit)] return {"stats": stats, "records": output_records}