Spaces:
Running on Zero
Running on Zero
| import re | |
| import zipfile | |
| import xml.etree.ElementTree as ET | |
| from pathlib import Path | |
| from utils import are_keys_sequential, toSnakeCase | |
| from docs import headerReplacements | |
| S = "{http://schemas.openxmlformats.org/spreadsheetml/2006/main}" | |
| R = "{http://schemas.openxmlformats.org/officeDocument/2006/relationships}" | |
| def _col_num(letters: str) -> int: | |
| n = 0 | |
| for ch in letters: | |
| n = n * 26 + (ord(ch.upper()) - 64) | |
| return n - 1 | |
| def _as_number(text: str) -> int | float: | |
| f = float(text) | |
| return int(f) if f.is_integer() else f | |
| def _shared_strings(z: zipfile.ZipFile) -> list: | |
| if "xl/sharedStrings.xml" not in z.namelist(): | |
| return [] | |
| root = ET.fromstring(z.read("xl/sharedStrings.xml")) | |
| return ["".join(t.text or "" for t in si.iter(f"{S}t")) | |
| for si in root.findall(f"{S}si")] | |
| def _worksheet_path(z: zipfile.ZipFile) -> str: | |
| wb = ET.fromstring(z.read("xl/workbook.xml")) | |
| sheet = next(iter(wb.iter(f"{S}sheet"))) | |
| rels = ET.fromstring(z.read("xl/_rels/workbook.xml.rels")) | |
| relmap = {r.get("Id"): r.get("Target") for r in rels} | |
| target = relmap.get(sheet.get(f"{R}id"), "worksheets/sheet1.xml") | |
| path = target[1:] if target.startswith("/") else "xl/" + target | |
| if path not in z.namelist(): | |
| path = sorted(n for n in z.namelist() | |
| if n.startswith("xl/worksheets/") and n.endswith(".xml"))[0] | |
| return path | |
| def _cell_value(c: ET.Element, shared: list) -> str | int | float: | |
| t = c.get("t") | |
| if t == "s": | |
| v = c.find(f"{S}v") | |
| return shared[int(v.text)] if v is not None else "" | |
| if t == "inlineStr": | |
| return "".join(x.text or "" for x in c.iter(f"{S}t")) | |
| if t in ("str", "b"): | |
| v = c.find(f"{S}v") | |
| return v.text if v is not None and v.text is not None else "" | |
| v = c.find(f"{S}v") | |
| if v is None or v.text is None: | |
| return "" | |
| return _as_number(v.text) | |
| def readLocalSheet(path: Path, file_path: str): | |
| with zipfile.ZipFile(path) as z: | |
| shared = _shared_strings(z) | |
| ws_path = _worksheet_path(z) | |
| ws = ET.fromstring(z.read(ws_path)) | |
| grid: dict = {} | |
| max_row = 0 | |
| for c in ws.iter(f"{S}c"): | |
| ref = c.get("r") | |
| if not ref: | |
| continue | |
| m = re.match(r"([A-Za-z]+)(\d+)", ref) | |
| col, row = _col_num(m.group(1)), int(m.group(2)) | |
| val = _cell_value(c, shared) | |
| if val != "": | |
| grid[(row, col)] = val | |
| max_row = max(max_row, row) | |
| header_width = max((col for (row, col) in grid if row == 1), default=-1) + 1 | |
| raw_header = [str(grid.get((1, k), "")) for k in range(header_width)] | |
| while raw_header and raw_header[-1] == "": | |
| raw_header.pop() | |
| ncols = len(raw_header) | |
| header = [] | |
| for h in raw_header: | |
| h = toSnakeCase(h) | |
| for replacement in headerReplacements: | |
| h = h.replace(replacement[0], replacement[1]) | |
| header.append(h) | |
| last_row = 1 | |
| for (row, col), val in grid.items(): | |
| if row >= 2 and col < ncols and val != "": | |
| last_row = max(last_row, row) | |
| rows = [ | |
| {header[k]: grid.get((r, k), "") for k in range(ncols)} | |
| for r in range(2, last_row + 1) | |
| ] | |
| # detect array fields: columns that have values in continuation rows (empty first col) | |
| array_fields = [] | |
| for row in rows: | |
| first_val = str(row.get(header[0], "")) | |
| if toSnakeCase(first_val) == "": | |
| for field in header[1:]: | |
| val = row.get(field, "") | |
| if val != "" and field not in array_fields: | |
| array_fields.append(field) | |
| tableData = {} | |
| rowKey = "" | |
| rowData: dict = {} | |
| for row in rows: | |
| first_val = str(row.get(header[0], "")) | |
| new_key = toSnakeCase(first_val) | |
| if new_key != "": | |
| rowKey = new_key | |
| rowData = {} | |
| for field in header[1:]: | |
| if field == "": | |
| continue | |
| val = row.get(field, "") | |
| cell = str(val) if isinstance(val, (int, float)) else val | |
| cell = cell if cell != "-" else None | |
| if field in array_fields: | |
| rowData[field] = [cell] if cell is not None else [] | |
| else: | |
| rowData[field] = cell | |
| elif rowKey != "": | |
| for field in header[1:]: | |
| if field == "": | |
| continue | |
| val = row.get(field, "") | |
| if val == "": | |
| continue | |
| cell = str(val) if isinstance(val, (int, float)) else val | |
| cell = cell if cell != "-" else None | |
| if field in array_fields and cell is not None: | |
| rowData.setdefault(field, []).append(cell) | |
| if rowKey != "": | |
| tableData[rowKey] = rowData if len([h for h in header if h != ""]) > 2 else (list(rowData.values())[0] if rowData else None) | |
| if are_keys_sequential(tableData): | |
| return list(tableData.values()) | |
| return tableData | |