Spaces:
Sleeping
Sleeping
File size: 5,461 Bytes
ac9eab1 3fdb7b0 ac9eab1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | import zipfile
import xml.etree.ElementTree as ET
from pathlib import Path
from utils import are_keys_sequential, getMarkedFields, toSnakeCase
W = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}"
headerReplacements = [
["next_message_id", "next"],
["option_next", "options_next"],
["option_next_id", "options_next"],
["options_next_id", "options_next"]
]
avoidTransformMarkers = [{
"identifier": "pixabowl/posts",
"fields": ["id"]
}]
splitMarkers = [{
"identifier": "sidetrails/connections/items",
"fields": ["bio"]
}]
# --- local .docx -> Google Docs API body.content bridge ---
def _para_text(p: ET.Element) -> str:
parts = []
for node in p.iter():
if node.tag == f"{W}t":
parts.append(node.text or '')
elif node.tag == f"{W}br":
parts.append('\n')
return ''.join(parts)
def _cell_content(tc: ET.Element) -> list:
return [
{'paragraph': {'elements': [{'textRun': {'content': _para_text(p)}}]}}
for p in tc.findall(f"{W}p")
]
def _docx_to_body_content(path: Path) -> list:
with zipfile.ZipFile(path) as z:
root = ET.fromstring(z.read("word/document.xml"))
body = root.find(f"{W}body")
content = []
for child in body:
if child.tag == f"{W}p":
text = _para_text(child)
content.append({'paragraph': {'elements': [{'textRun': {'content': text}}]}})
elif child.tag == f"{W}tbl":
rows = [
{'tableCells': [{'content': _cell_content(tc)} for tc in tr.findall(f"{W}tc")]}
for tr in child.findall(f"{W}tr")
]
content.append({'table': {'tableRows': rows}})
return content
# --- semantic data extraction (unchanged from owner's docs.py) ---
def read_paragraph_element(element):
text_run = element.get('textRun')
if not text_run:
return ''
return text_run.get('content')
def readCell(elements):
text = ''
for value in elements:
if 'paragraph' in value:
para_elements = value.get('paragraph').get('elements')
for elem in para_elements:
text += read_paragraph_element(elem)
return text
def getArrayFields(elements):
arrayFields = []
for value in elements:
if 'table' in value:
header = []
table = value.get('table')
multi = False
for rowIndex, row in enumerate(table.get('tableRows')):
cells = row.get('tableCells')
for cellIndex, cell in enumerate(cells):
cellContent = readCell(cell.get('content')).rstrip()
if rowIndex == 0:
cellContent = toSnakeCase(cellContent)
for replacement in headerReplacements:
cellContent = cellContent.replace(replacement[0], replacement[1])
header.append(cellContent)
elif cellIndex == 0:
multi = toSnakeCase(cellContent) == ''
elif multi:
if cellContent != '' and header[cellIndex] not in arrayFields:
arrayFields.append(header[cellIndex])
return arrayFields
def readDocElements(elements, splitFields, avoidTransformFields):
fileData = []
arrayFields = getArrayFields(elements)
for value in elements:
if 'table' in value:
tableData = {}
header = []
table = value.get('table')
rowData = {}
rowKey = ''
multi = False
for rowIndex, row in enumerate(table.get('tableRows')):
newRowKey = ''
cells = row.get('tableCells')
for cellIndex, cell in enumerate(cells):
cellContent = readCell(cell.get('content')).rstrip()
if rowIndex == 0:
cellContent = toSnakeCase(cellContent)
for replacement in headerReplacements:
cellContent = cellContent.replace(replacement[0], replacement[1])
header.append(cellContent)
elif cellIndex == 0:
if avoidTransformFields is not None and header[cellIndex] in avoidTransformFields:
newRowKey = cellContent
else:
newRowKey = toSnakeCase(cellContent)
if newRowKey != '':
multi = False
rowKey = newRowKey
rowData = {}
else:
multi = True
else:
if (multi and cellContent == '') or header[cellIndex] == '':
continue
cellContent = cellContent if cellContent != '-' else None
if header[cellIndex] in arrayFields:
rowData.setdefault(header[cellIndex], [])
if cellContent is not None:
rowData[header[cellIndex]].append(cellContent)
else:
if splitFields is not None and header[cellIndex] in splitFields:
if cellContent != '':
rowData[header[cellIndex]] = cellContent.splitlines()
else:
rowData[header[cellIndex]] = cellContent
if rowIndex != 0 and rowKey != '':
non_empty_cols = len(list(h for h in header if h != ''))
tableData[rowKey] = rowData if non_empty_cols > 2 else (list(rowData.values())[0] if rowData else None)
fileData.append(tableData)
if len(fileData) == 1:
fileData = fileData[0]
if are_keys_sequential(fileData):
fileData = list(fileData.values())
return fileData
def readLocalDoc(path: Path, file_path: str):
body_content = _docx_to_body_content(path)
return readDocElements(
body_content,
getMarkedFields(file_path, splitMarkers),
getMarkedFields(file_path, avoidTransformMarkers),
)
|