Vernacular / sheets.py
bhardwaj08sarthak's picture
Update sheets.py
a718a43 verified
Raw
History Blame Contribute Delete
4.6 kB
import re
import zipfile
import xml.etree.ElementTree as ET
from pathlib import Path
from utils import are_keys_sequential, toSnakeCase
from docs import headerReplacements
S = "{http://schemas.openxmlformats.org/spreadsheetml/2006/main}"
R = "{http://schemas.openxmlformats.org/officeDocument/2006/relationships}"
def _col_num(letters: str) -> int:
n = 0
for ch in letters:
n = n * 26 + (ord(ch.upper()) - 64)
return n - 1
def _as_number(text: str) -> int | float:
f = float(text)
return int(f) if f.is_integer() else f
def _shared_strings(z: zipfile.ZipFile) -> list:
if "xl/sharedStrings.xml" not in z.namelist():
return []
root = ET.fromstring(z.read("xl/sharedStrings.xml"))
return ["".join(t.text or "" for t in si.iter(f"{S}t"))
for si in root.findall(f"{S}si")]
def _worksheet_path(z: zipfile.ZipFile) -> str:
wb = ET.fromstring(z.read("xl/workbook.xml"))
sheet = next(iter(wb.iter(f"{S}sheet")))
rels = ET.fromstring(z.read("xl/_rels/workbook.xml.rels"))
relmap = {r.get("Id"): r.get("Target") for r in rels}
target = relmap.get(sheet.get(f"{R}id"), "worksheets/sheet1.xml")
path = target[1:] if target.startswith("/") else "xl/" + target
if path not in z.namelist():
path = sorted(n for n in z.namelist()
if n.startswith("xl/worksheets/") and n.endswith(".xml"))[0]
return path
def _cell_value(c: ET.Element, shared: list) -> str | int | float:
t = c.get("t")
if t == "s":
v = c.find(f"{S}v")
return shared[int(v.text)] if v is not None else ""
if t == "inlineStr":
return "".join(x.text or "" for x in c.iter(f"{S}t"))
if t in ("str", "b"):
v = c.find(f"{S}v")
return v.text if v is not None and v.text is not None else ""
v = c.find(f"{S}v")
if v is None or v.text is None:
return ""
return _as_number(v.text)
def readLocalSheet(path: Path, file_path: str):
with zipfile.ZipFile(path) as z:
shared = _shared_strings(z)
ws_path = _worksheet_path(z)
ws = ET.fromstring(z.read(ws_path))
grid: dict = {}
max_row = 0
for c in ws.iter(f"{S}c"):
ref = c.get("r")
if not ref:
continue
m = re.match(r"([A-Za-z]+)(\d+)", ref)
col, row = _col_num(m.group(1)), int(m.group(2))
val = _cell_value(c, shared)
if val != "":
grid[(row, col)] = val
max_row = max(max_row, row)
header_width = max((col for (row, col) in grid if row == 1), default=-1) + 1
raw_header = [str(grid.get((1, k), "")) for k in range(header_width)]
while raw_header and raw_header[-1] == "":
raw_header.pop()
ncols = len(raw_header)
header = []
for h in raw_header:
h = toSnakeCase(h)
for replacement in headerReplacements:
h = h.replace(replacement[0], replacement[1])
header.append(h)
last_row = 1
for (row, col), val in grid.items():
if row >= 2 and col < ncols and val != "":
last_row = max(last_row, row)
rows = [
{header[k]: grid.get((r, k), "") for k in range(ncols)}
for r in range(2, last_row + 1)
]
# detect array fields: columns that have values in continuation rows (empty first col)
array_fields = []
for row in rows:
first_val = str(row.get(header[0], ""))
if toSnakeCase(first_val) == "":
for field in header[1:]:
val = row.get(field, "")
if val != "" and field not in array_fields:
array_fields.append(field)
tableData = {}
rowKey = ""
rowData: dict = {}
for row in rows:
first_val = str(row.get(header[0], ""))
new_key = toSnakeCase(first_val)
if new_key != "":
rowKey = new_key
rowData = {}
for field in header[1:]:
if field == "":
continue
val = row.get(field, "")
cell = str(val) if isinstance(val, (int, float)) else val
cell = cell if cell != "-" else None
if field in array_fields:
rowData[field] = [cell] if cell is not None else []
else:
rowData[field] = cell
elif rowKey != "":
for field in header[1:]:
if field == "":
continue
val = row.get(field, "")
if val == "":
continue
cell = str(val) if isinstance(val, (int, float)) else val
cell = cell if cell != "-" else None
if field in array_fields and cell is not None:
rowData.setdefault(field, []).append(cell)
if rowKey != "":
tableData[rowKey] = rowData if len([h for h in header if h != ""]) > 2 else (list(rowData.values())[0] if rowData else None)
if are_keys_sequential(tableData):
return list(tableData.values())
return tableData