Vernacular / docs.py
bhardwaj08sarthak's picture
Update docs.py
41a19f0 verified
Raw
History Blame Contribute Delete
5.46 kB
import zipfile
import xml.etree.ElementTree as ET
from pathlib import Path
from utils import are_keys_sequential, getMarkedFields, toSnakeCase
W = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}"
headerReplacements = [
["next_message_id", "next"],
["option_next", "options_next"],
["option_next_id", "options_next"],
["options_next_id", "options_next"]
]
avoidTransformMarkers = [{
"identifier": "pixabowl/posts",
"fields": ["id"]
}]
splitMarkers = [{
"identifier": "sidetrails/connections/items",
"fields": ["bio"]
}]
# --- local .docx -> Google Docs API body.content bridge ---
def _para_text(p: ET.Element) -> str:
parts = []
for node in p.iter():
if node.tag == f"{W}t":
parts.append(node.text or '')
elif node.tag == f"{W}br":
parts.append('\n')
return ''.join(parts)
def _cell_content(tc: ET.Element) -> list:
return [
{'paragraph': {'elements': [{'textRun': {'content': _para_text(p)}}]}}
for p in tc.findall(f"{W}p")
]
def _docx_to_body_content(path: Path) -> list:
with zipfile.ZipFile(path) as z:
root = ET.fromstring(z.read("word/document.xml"))
body = root.find(f"{W}body")
content = []
for child in body:
if child.tag == f"{W}p":
text = _para_text(child)
content.append({'paragraph': {'elements': [{'textRun': {'content': text}}]}})
elif child.tag == f"{W}tbl":
rows = [
{'tableCells': [{'content': _cell_content(tc)} for tc in tr.findall(f"{W}tc")]}
for tr in child.findall(f"{W}tr")
]
content.append({'table': {'tableRows': rows}})
return content
# --- semantic data extraction (unchanged from owner's docs.py) ---
def read_paragraph_element(element):
text_run = element.get('textRun')
if not text_run:
return ''
return text_run.get('content')
def readCell(elements):
text = ''
for value in elements:
if 'paragraph' in value:
para_elements = value.get('paragraph').get('elements')
for elem in para_elements:
text += read_paragraph_element(elem)
return text
def getArrayFields(elements):
arrayFields = []
for value in elements:
if 'table' in value:
header = []
table = value.get('table')
multi = False
for rowIndex, row in enumerate(table.get('tableRows')):
cells = row.get('tableCells')
for cellIndex, cell in enumerate(cells):
cellContent = readCell(cell.get('content')).rstrip()
if rowIndex == 0:
cellContent = toSnakeCase(cellContent)
for replacement in headerReplacements:
cellContent = cellContent.replace(replacement[0], replacement[1])
header.append(cellContent)
elif cellIndex == 0:
multi = toSnakeCase(cellContent) == ''
elif multi:
if cellContent != '' and header[cellIndex] not in arrayFields:
arrayFields.append(header[cellIndex])
return arrayFields
def readDocElements(elements, splitFields, avoidTransformFields):
fileData = []
arrayFields = getArrayFields(elements)
for value in elements:
if 'table' in value:
tableData = {}
header = []
table = value.get('table')
rowData = {}
rowKey = ''
multi = False
for rowIndex, row in enumerate(table.get('tableRows')):
newRowKey = ''
cells = row.get('tableCells')
for cellIndex, cell in enumerate(cells):
cellContent = readCell(cell.get('content')).rstrip()
if rowIndex == 0:
cellContent = toSnakeCase(cellContent)
for replacement in headerReplacements:
cellContent = cellContent.replace(replacement[0], replacement[1])
header.append(cellContent)
elif cellIndex == 0:
if avoidTransformFields is not None and header[cellIndex] in avoidTransformFields:
newRowKey = cellContent
else:
newRowKey = toSnakeCase(cellContent)
if newRowKey != '':
multi = False
rowKey = newRowKey
rowData = {}
else:
multi = True
else:
if (multi and cellContent == '') or header[cellIndex] == '':
continue
cellContent = cellContent if cellContent != '-' else None
if header[cellIndex] in arrayFields:
rowData.setdefault(header[cellIndex], [])
if cellContent is not None:
rowData[header[cellIndex]].append(cellContent)
else:
if splitFields is not None and header[cellIndex] in splitFields:
if cellContent != '':
rowData[header[cellIndex]] = cellContent.splitlines()
else:
rowData[header[cellIndex]] = cellContent
if rowIndex != 0 and rowKey != '':
non_empty_cols = len(list(h for h in header if h != ''))
tableData[rowKey] = rowData if non_empty_cols > 2 else (list(rowData.values())[0] if rowData else None)
fileData.append(tableData)
if len(fileData) == 1:
fileData = fileData[0]
if are_keys_sequential(fileData):
fileData = list(fileData.values())
return fileData
def readLocalDoc(path: Path, file_path: str):
body_content = _docx_to_body_content(path)
return readDocElements(
body_content,
getMarkedFields(file_path, splitMarkers),
getMarkedFields(file_path, avoidTransformMarkers),
)