XHS / importer /excel.py
Trae Bot
Upload Spider_XHS project
c481f8a
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from io import BytesIO
from typing import Any
import openpyxl
from .dedup import dedup_records
from .errors import ExcelParseError
from .mapper import compute_row_hash, map_row
from .parsers import select_parser
def _read_headers_from_workbook(workbook: openpyxl.Workbook) -> list[str]:
ws = workbook.active
max_scan = min(ws.max_row or 0, 20)
for r in range(1, max_scan + 1):
row_values = [ws.cell(row=r, column=c).value for c in range(1, (ws.max_column or 0) + 1)]
non_empty = [v for v in row_values if v is not None and str(v).strip() != ""]
if len(non_empty) >= 1:
headers: list[str] = []
for idx, v in enumerate(row_values):
s = "" if v is None else str(v).strip()
headers.append(s if s else f"col_{idx + 1}")
return headers
return []
def import_excel_bytes(
data: bytes,
*,
operator: str,
source_name: str | None = None,
filename: str | None = None,
limit: int = 50,
) -> dict[str, Any]:
try:
workbook = openpyxl.load_workbook(BytesIO(data), data_only=True)
except Exception as e:
raise ExcelParseError(f"Excel 读取失败: {e}") from e
headers = _read_headers_from_workbook(workbook)
parser = select_parser(headers)
parsed_rows = parser.parse(workbook)
import_id = uuid.uuid4().hex
imported_at = datetime.now(timezone.utc).isoformat()
resolved_source_name = str(source_name or filename or "excel_import")
records: list[dict[str, Any]] = []
errors = 0
mapped = 0
for row in parsed_rows:
raw = dict(row.values)
row_hash = compute_row_hash(raw)
normalized, dedup_key, issues = map_row(raw)
meta: dict[str, Any] = {
"import_id": import_id,
"imported_at": imported_at,
"operator": operator,
"source_name": resolved_source_name,
"filename": filename,
"parser": parser.name,
"sheet": row.sheet,
"row_number": row.row_number,
"row_hash": row_hash,
"dedup_key": dedup_key,
}
if issues:
meta["issues"] = issues
if normalized is None:
errors += 1
else:
mapped += 1
records.append({"raw": raw, "normalized": normalized, "meta": meta})
deduped, dedup_stats = dedup_records([r for r in records if r.get("normalized") is not None])
stats = {
"import_id": import_id,
"parser": parser.name,
"total_rows": len(parsed_rows),
"mapped_rows": mapped,
"error_rows": errors,
"duplicates": dedup_stats["duplicates"],
"deduped_rows": dedup_stats["deduped"],
}
output_records = deduped[: max(0, limit)]
return {"stats": stats, "records": output_records}