import zipfile import xml.etree.ElementTree as ET from pathlib import Path from utils import are_keys_sequential, getMarkedFields, toSnakeCase W = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}" headerReplacements = [ ["next_message_id", "next"], ["option_next", "options_next"], ["option_next_id", "options_next"], ["options_next_id", "options_next"] ] avoidTransformMarkers = [{ "identifier": "pixabowl/posts", "fields": ["id"] }] splitMarkers = [{ "identifier": "sidetrails/connections/items", "fields": ["bio"] }] # --- local .docx -> Google Docs API body.content bridge --- def _para_text(p: ET.Element) -> str: parts = [] for node in p.iter(): if node.tag == f"{W}t": parts.append(node.text or '') elif node.tag == f"{W}br": parts.append('\n') return ''.join(parts) def _cell_content(tc: ET.Element) -> list: return [ {'paragraph': {'elements': [{'textRun': {'content': _para_text(p)}}]}} for p in tc.findall(f"{W}p") ] def _docx_to_body_content(path: Path) -> list: with zipfile.ZipFile(path) as z: root = ET.fromstring(z.read("word/document.xml")) body = root.find(f"{W}body") content = [] for child in body: if child.tag == f"{W}p": text = _para_text(child) content.append({'paragraph': {'elements': [{'textRun': {'content': text}}]}}) elif child.tag == f"{W}tbl": rows = [ {'tableCells': [{'content': _cell_content(tc)} for tc in tr.findall(f"{W}tc")]} for tr in child.findall(f"{W}tr") ] content.append({'table': {'tableRows': rows}}) return content # --- semantic data extraction (unchanged from owner's docs.py) --- def read_paragraph_element(element): text_run = element.get('textRun') if not text_run: return '' return text_run.get('content') def readCell(elements): text = '' for value in elements: if 'paragraph' in value: para_elements = value.get('paragraph').get('elements') for elem in para_elements: text += read_paragraph_element(elem) return text def getArrayFields(elements): arrayFields = [] for value in elements: if 'table' in value: header = [] table = value.get('table') multi = False for rowIndex, row in enumerate(table.get('tableRows')): cells = row.get('tableCells') for cellIndex, cell in enumerate(cells): cellContent = readCell(cell.get('content')).rstrip() if rowIndex == 0: cellContent = toSnakeCase(cellContent) for replacement in headerReplacements: cellContent = cellContent.replace(replacement[0], replacement[1]) header.append(cellContent) elif cellIndex == 0: multi = toSnakeCase(cellContent) == '' elif multi: if cellContent != '' and header[cellIndex] not in arrayFields: arrayFields.append(header[cellIndex]) return arrayFields def readDocElements(elements, splitFields, avoidTransformFields): fileData = [] arrayFields = getArrayFields(elements) for value in elements: if 'table' in value: tableData = {} header = [] table = value.get('table') rowData = {} rowKey = '' multi = False for rowIndex, row in enumerate(table.get('tableRows')): newRowKey = '' cells = row.get('tableCells') for cellIndex, cell in enumerate(cells): cellContent = readCell(cell.get('content')).rstrip() if rowIndex == 0: cellContent = toSnakeCase(cellContent) for replacement in headerReplacements: cellContent = cellContent.replace(replacement[0], replacement[1]) header.append(cellContent) elif cellIndex == 0: if avoidTransformFields is not None and header[cellIndex] in avoidTransformFields: newRowKey = cellContent else: newRowKey = toSnakeCase(cellContent) if newRowKey != '': multi = False rowKey = newRowKey rowData = {} else: multi = True else: if (multi and cellContent == '') or header[cellIndex] == '': continue cellContent = cellContent if cellContent != '-' else None if header[cellIndex] in arrayFields: rowData.setdefault(header[cellIndex], []) if cellContent is not None: rowData[header[cellIndex]].append(cellContent) else: if splitFields is not None and header[cellIndex] in splitFields: if cellContent != '': rowData[header[cellIndex]] = cellContent.splitlines() else: rowData[header[cellIndex]] = cellContent if rowIndex != 0 and rowKey != '': non_empty_cols = len(list(h for h in header if h != '')) tableData[rowKey] = rowData if non_empty_cols > 2 else (list(rowData.values())[0] if rowData else None) fileData.append(tableData) if len(fileData) == 1: fileData = fileData[0] if are_keys_sequential(fileData): fileData = list(fileData.values()) return fileData def readLocalDoc(path: Path, file_path: str): body_content = _docx_to_body_content(path) return readDocElements( body_content, getMarkedFields(file_path, splitMarkers), getMarkedFields(file_path, avoidTransformMarkers), )