import re import zipfile import xml.etree.ElementTree as ET from pathlib import Path from utils import are_keys_sequential, toSnakeCase from docs import headerReplacements S = "{http://schemas.openxmlformats.org/spreadsheetml/2006/main}" R = "{http://schemas.openxmlformats.org/officeDocument/2006/relationships}" def _col_num(letters: str) -> int: n = 0 for ch in letters: n = n * 26 + (ord(ch.upper()) - 64) return n - 1 def _as_number(text: str) -> int | float: f = float(text) return int(f) if f.is_integer() else f def _shared_strings(z: zipfile.ZipFile) -> list: if "xl/sharedStrings.xml" not in z.namelist(): return [] root = ET.fromstring(z.read("xl/sharedStrings.xml")) return ["".join(t.text or "" for t in si.iter(f"{S}t")) for si in root.findall(f"{S}si")] def _worksheet_path(z: zipfile.ZipFile) -> str: wb = ET.fromstring(z.read("xl/workbook.xml")) sheet = next(iter(wb.iter(f"{S}sheet"))) rels = ET.fromstring(z.read("xl/_rels/workbook.xml.rels")) relmap = {r.get("Id"): r.get("Target") for r in rels} target = relmap.get(sheet.get(f"{R}id"), "worksheets/sheet1.xml") path = target[1:] if target.startswith("/") else "xl/" + target if path not in z.namelist(): path = sorted(n for n in z.namelist() if n.startswith("xl/worksheets/") and n.endswith(".xml"))[0] return path def _cell_value(c: ET.Element, shared: list) -> str | int | float: t = c.get("t") if t == "s": v = c.find(f"{S}v") return shared[int(v.text)] if v is not None else "" if t == "inlineStr": return "".join(x.text or "" for x in c.iter(f"{S}t")) if t in ("str", "b"): v = c.find(f"{S}v") return v.text if v is not None and v.text is not None else "" v = c.find(f"{S}v") if v is None or v.text is None: return "" return _as_number(v.text) def readLocalSheet(path: Path, file_path: str): with zipfile.ZipFile(path) as z: shared = _shared_strings(z) ws_path = _worksheet_path(z) ws = ET.fromstring(z.read(ws_path)) grid: dict = {} max_row = 0 for c in ws.iter(f"{S}c"): ref = c.get("r") if not ref: continue m = re.match(r"([A-Za-z]+)(\d+)", ref) col, row = _col_num(m.group(1)), int(m.group(2)) val = _cell_value(c, shared) if val != "": grid[(row, col)] = val max_row = max(max_row, row) header_width = max((col for (row, col) in grid if row == 1), default=-1) + 1 raw_header = [str(grid.get((1, k), "")) for k in range(header_width)] while raw_header and raw_header[-1] == "": raw_header.pop() ncols = len(raw_header) header = [] for h in raw_header: h = toSnakeCase(h) for replacement in headerReplacements: h = h.replace(replacement[0], replacement[1]) header.append(h) last_row = 1 for (row, col), val in grid.items(): if row >= 2 and col < ncols and val != "": last_row = max(last_row, row) rows = [ {header[k]: grid.get((r, k), "") for k in range(ncols)} for r in range(2, last_row + 1) ] # detect array fields: columns that have values in continuation rows (empty first col) array_fields = [] for row in rows: first_val = str(row.get(header[0], "")) if toSnakeCase(first_val) == "": for field in header[1:]: val = row.get(field, "") if val != "" and field not in array_fields: array_fields.append(field) tableData = {} rowKey = "" rowData: dict = {} for row in rows: first_val = str(row.get(header[0], "")) new_key = toSnakeCase(first_val) if new_key != "": rowKey = new_key rowData = {} for field in header[1:]: if field == "": continue val = row.get(field, "") cell = str(val) if isinstance(val, (int, float)) else val cell = cell if cell != "-" else None if field in array_fields: rowData[field] = [cell] if cell is not None else [] else: rowData[field] = cell elif rowKey != "": for field in header[1:]: if field == "": continue val = row.get(field, "") if val == "": continue cell = str(val) if isinstance(val, (int, float)) else val cell = cell if cell != "-" else None if field in array_fields and cell is not None: rowData.setdefault(field, []).append(cell) if rowKey != "": tableData[rowKey] = rowData if len([h for h in header if h != ""]) > 2 else (list(rowData.values())[0] if rowData else None) if are_keys_sequential(tableData): return list(tableData.values()) return tableData