Spaces:
Running on Zero
Running on Zero
| import zipfile | |
| import xml.etree.ElementTree as ET | |
| from pathlib import Path | |
| from utils import are_keys_sequential, getMarkedFields, toSnakeCase | |
| W = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}" | |
| headerReplacements = [ | |
| ["next_message_id", "next"], | |
| ["option_next", "options_next"], | |
| ["option_next_id", "options_next"], | |
| ["options_next_id", "options_next"] | |
| ] | |
| avoidTransformMarkers = [{ | |
| "identifier": "pixabowl/posts", | |
| "fields": ["id"] | |
| }] | |
| splitMarkers = [{ | |
| "identifier": "sidetrails/connections/items", | |
| "fields": ["bio"] | |
| }] | |
| # --- local .docx -> Google Docs API body.content bridge --- | |
| def _para_text(p: ET.Element) -> str: | |
| parts = [] | |
| for node in p.iter(): | |
| if node.tag == f"{W}t": | |
| parts.append(node.text or '') | |
| elif node.tag == f"{W}br": | |
| parts.append('\n') | |
| return ''.join(parts) | |
| def _cell_content(tc: ET.Element) -> list: | |
| return [ | |
| {'paragraph': {'elements': [{'textRun': {'content': _para_text(p)}}]}} | |
| for p in tc.findall(f"{W}p") | |
| ] | |
| def _docx_to_body_content(path: Path) -> list: | |
| with zipfile.ZipFile(path) as z: | |
| root = ET.fromstring(z.read("word/document.xml")) | |
| body = root.find(f"{W}body") | |
| content = [] | |
| for child in body: | |
| if child.tag == f"{W}p": | |
| text = _para_text(child) | |
| content.append({'paragraph': {'elements': [{'textRun': {'content': text}}]}}) | |
| elif child.tag == f"{W}tbl": | |
| rows = [ | |
| {'tableCells': [{'content': _cell_content(tc)} for tc in tr.findall(f"{W}tc")]} | |
| for tr in child.findall(f"{W}tr") | |
| ] | |
| content.append({'table': {'tableRows': rows}}) | |
| return content | |
| # --- semantic data extraction (unchanged from owner's docs.py) --- | |
| def read_paragraph_element(element): | |
| text_run = element.get('textRun') | |
| if not text_run: | |
| return '' | |
| return text_run.get('content') | |
| def readCell(elements): | |
| text = '' | |
| for value in elements: | |
| if 'paragraph' in value: | |
| para_elements = value.get('paragraph').get('elements') | |
| for elem in para_elements: | |
| text += read_paragraph_element(elem) | |
| return text | |
| def getArrayFields(elements): | |
| arrayFields = [] | |
| for value in elements: | |
| if 'table' in value: | |
| header = [] | |
| table = value.get('table') | |
| multi = False | |
| for rowIndex, row in enumerate(table.get('tableRows')): | |
| cells = row.get('tableCells') | |
| for cellIndex, cell in enumerate(cells): | |
| cellContent = readCell(cell.get('content')).rstrip() | |
| if rowIndex == 0: | |
| cellContent = toSnakeCase(cellContent) | |
| for replacement in headerReplacements: | |
| cellContent = cellContent.replace(replacement[0], replacement[1]) | |
| header.append(cellContent) | |
| elif cellIndex == 0: | |
| multi = toSnakeCase(cellContent) == '' | |
| elif multi: | |
| if cellContent != '' and header[cellIndex] not in arrayFields: | |
| arrayFields.append(header[cellIndex]) | |
| return arrayFields | |
| def readDocElements(elements, splitFields, avoidTransformFields): | |
| fileData = [] | |
| arrayFields = getArrayFields(elements) | |
| for value in elements: | |
| if 'table' in value: | |
| tableData = {} | |
| header = [] | |
| table = value.get('table') | |
| rowData = {} | |
| rowKey = '' | |
| multi = False | |
| for rowIndex, row in enumerate(table.get('tableRows')): | |
| newRowKey = '' | |
| cells = row.get('tableCells') | |
| for cellIndex, cell in enumerate(cells): | |
| cellContent = readCell(cell.get('content')).rstrip() | |
| if rowIndex == 0: | |
| cellContent = toSnakeCase(cellContent) | |
| for replacement in headerReplacements: | |
| cellContent = cellContent.replace(replacement[0], replacement[1]) | |
| header.append(cellContent) | |
| elif cellIndex == 0: | |
| if avoidTransformFields is not None and header[cellIndex] in avoidTransformFields: | |
| newRowKey = cellContent | |
| else: | |
| newRowKey = toSnakeCase(cellContent) | |
| if newRowKey != '': | |
| multi = False | |
| rowKey = newRowKey | |
| rowData = {} | |
| else: | |
| multi = True | |
| else: | |
| if (multi and cellContent == '') or header[cellIndex] == '': | |
| continue | |
| cellContent = cellContent if cellContent != '-' else None | |
| if header[cellIndex] in arrayFields: | |
| rowData.setdefault(header[cellIndex], []) | |
| if cellContent is not None: | |
| rowData[header[cellIndex]].append(cellContent) | |
| else: | |
| if splitFields is not None and header[cellIndex] in splitFields: | |
| if cellContent != '': | |
| rowData[header[cellIndex]] = cellContent.splitlines() | |
| else: | |
| rowData[header[cellIndex]] = cellContent | |
| if rowIndex != 0 and rowKey != '': | |
| non_empty_cols = len(list(h for h in header if h != '')) | |
| tableData[rowKey] = rowData if non_empty_cols > 2 else (list(rowData.values())[0] if rowData else None) | |
| fileData.append(tableData) | |
| if len(fileData) == 1: | |
| fileData = fileData[0] | |
| if are_keys_sequential(fileData): | |
| fileData = list(fileData.values()) | |
| return fileData | |
| def readLocalDoc(path: Path, file_path: str): | |
| body_content = _docx_to_body_content(path) | |
| return readDocElements( | |
| body_content, | |
| getMarkedFields(file_path, splitMarkers), | |
| getMarkedFields(file_path, avoidTransformMarkers), | |
| ) | |