File size: 2,936 Bytes
c481f8a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from __future__ import annotations

import uuid
from datetime import datetime, timezone
from io import BytesIO
from typing import Any

import openpyxl

from .dedup import dedup_records
from .errors import ExcelParseError
from .mapper import compute_row_hash, map_row
from .parsers import select_parser


def _read_headers_from_workbook(workbook: openpyxl.Workbook) -> list[str]:
    ws = workbook.active
    max_scan = min(ws.max_row or 0, 20)
    for r in range(1, max_scan + 1):
        row_values = [ws.cell(row=r, column=c).value for c in range(1, (ws.max_column or 0) + 1)]
        non_empty = [v for v in row_values if v is not None and str(v).strip() != ""]
        if len(non_empty) >= 1:
            headers: list[str] = []
            for idx, v in enumerate(row_values):
                s = "" if v is None else str(v).strip()
                headers.append(s if s else f"col_{idx + 1}")
            return headers
    return []


def import_excel_bytes(
    data: bytes,
    *,
    operator: str,
    source_name: str | None = None,
    filename: str | None = None,
    limit: int = 50,
) -> dict[str, Any]:
    try:
        workbook = openpyxl.load_workbook(BytesIO(data), data_only=True)
    except Exception as e:
        raise ExcelParseError(f"Excel 读取失败: {e}") from e

    headers = _read_headers_from_workbook(workbook)
    parser = select_parser(headers)
    parsed_rows = parser.parse(workbook)

    import_id = uuid.uuid4().hex
    imported_at = datetime.now(timezone.utc).isoformat()
    resolved_source_name = str(source_name or filename or "excel_import")

    records: list[dict[str, Any]] = []
    errors = 0
    mapped = 0

    for row in parsed_rows:
        raw = dict(row.values)
        row_hash = compute_row_hash(raw)
        normalized, dedup_key, issues = map_row(raw)
        meta: dict[str, Any] = {
            "import_id": import_id,
            "imported_at": imported_at,
            "operator": operator,
            "source_name": resolved_source_name,
            "filename": filename,
            "parser": parser.name,
            "sheet": row.sheet,
            "row_number": row.row_number,
            "row_hash": row_hash,
            "dedup_key": dedup_key,
        }
        if issues:
            meta["issues"] = issues
        if normalized is None:
            errors += 1
        else:
            mapped += 1
        records.append({"raw": raw, "normalized": normalized, "meta": meta})

    deduped, dedup_stats = dedup_records([r for r in records if r.get("normalized") is not None])

    stats = {
        "import_id": import_id,
        "parser": parser.name,
        "total_rows": len(parsed_rows),
        "mapped_rows": mapped,
        "error_rows": errors,
        "duplicates": dedup_stats["duplicates"],
        "deduped_rows": dedup_stats["deduped"],
    }

    output_records = deduped[: max(0, limit)]
    return {"stats": stats, "records": output_records}