ApplyCRs / scripts /ts_applicator.py
heymenn's picture
fix minor errors
2b96123
#!/usr/bin/env python3
"""
ts_applicator.py β€” Apply a CR change manifest to a TS DOCX as tracked changes.
Reads a JSON manifest produced by cr_parser.py and applies every change
to the target TS using docx_helpers tracked-change primitives.
Usage:
python3 ts_applicator.py <ts.docx> <manifest.json> [--author NAME] [--output path]
# or import: from ts_applicator import apply_manifest
"""
import argparse
import json
import re
import sys
from pathlib import Path
import docx
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
sys.path.insert(0, str(Path(__file__).parent))
_MIN_LEN_ALLCOL_FALLBACK = 8 # old text shorter than this is too ambiguous for any-column search
_WARN_CONF = 0.8 # confidence below this emits WARN instead of OK
from docx_helpers import (
RevCounter,
tracked_modify_para,
tracked_insert_paras_after,
AUTHOR as DEFAULT_AUTHOR,
DATE as DEFAULT_DATE,
)
# ── Text normalisation ────────────────────────────────────────────────────────
_UNICODE_REPLACEMENTS = (
('\xa0', ' '), # non-breaking space
('\u202f', ' '), # narrow no-break space
('\u2007', ' '), # figure space
('\u2060', ''), # word joiner (invisible)
('\u200b', ''), # zero-width space
('\u00ad', ''), # soft hyphen (invisible)
('\u2011', '-'), # non-breaking hyphen
('\u2013', '-'), # en dash
('\u2014', '-'), # em dash
('\u2212', '-'), # minus sign
('\u2018', "'"), # left single quote
('\u2019', "'"), # right single quote
('\u201c', '"'), # left double quote
('\u201d', '"'), # right double quote
('\u2026', '...'), # horizontal ellipsis β†’ three dots
)
def _norm(text):
"""Normalise common Unicode invisible/whitespace/punctuation variants for comparison."""
for old, new in _UNICODE_REPLACEMENTS:
text = text.replace(old, new)
return text.strip()
def _norm_ws(text):
"""
Strip all whitespace for structural matching.
ETSI TS files store structured paragraphs (references, abbreviations,
headings) with a TAB between the code and the body text, e.g.:
'[27]\\tGlobalPlatform: ...'
'CLT\\tContactLess Tunnelling'
'8.3\\tRAM implementation over HTTPS'
The CR's text extraction concatenates runs directly, losing the tab:
'[27]GlobalPlatform: ...'
'CLTContactLess Tunnelling'
'8.3RAM implementation over HTTPS'
Removing all whitespace from both sides before comparing solves this.
Used as a third-level fallback (confidence 0.8) after exact and NBSP-norm.
"""
for old, new in _UNICODE_REPLACEMENTS:
text = text.replace(old, new)
return re.sub(r'\s+', '', text)
def _norm_alnum(text):
"""Keep only lowercase alphanumeric characters β€” last-resort matching.
Strips all punctuation, spaces, and Unicode variants so that only the
raw word/number content is compared. Used as a confidence-0.6 fallback
in _find_row when even whitespace-stripped matching fails (e.g. different
bracket styles, quote variants, or punctuation differences between the CR
and the TS).
"""
return re.sub(r'[^a-z0-9]', '', text.lower())
def _clean_prefix(text: str) -> str:
"""Return the longest leading substring that contains only standard printable
ASCII characters (ord 32–126).
Non-breaking spaces, curly quotes, and other Unicode characters embedded
mid-text (e.g. between spec number components like 'TS\xa0102\xa0226')
make the full anchor unmatchable. The clean prefix β€” the part before the
first such character β€” is still reliable and specific enough to locate the
correct row.
"""
end = 0
for ch in text:
if ord(ch) < 32 or ord(ch) > 126:
break
end += 1
return text[:end].strip()
# ── Document search helpers ───────────────────────────────────────────────────
def _full_para_text(para):
"""All text content including w:t (normal/inserted) and w:delText (deleted runs)."""
el = para._element
return ''.join(t.text or '' for t in el.findall('.//' + qn('w:t'))) + \
''.join(t.text or '' for t in el.findall('.//' + qn('w:delText')))
def _original_para_text(para):
"""Reconstruct paragraph text as it was before tracked changes.
Iterates in document order, keeping:
- w:t runs that are NOT inside a w:ins element (stable text)
- w:delText runs (deleted-but-original text)
Skipping:
- w:t runs inside w:ins (newly inserted text)
This allows anchors that reference original phrasing (e.g. 'SCP81Connection')
to still match after a tracked '1'β†’'X' replacement has been applied to that
paragraph β€” where _full_para_text would return the concatenation out of order.
"""
el = para._element
result = []
for node in el.iter():
if node.tag == qn('w:t'):
# Skip if this w:t is wrapped in a w:ins element
is_inserted = False
for anc in node.iterancestors():
if anc is el:
break
if anc.tag == qn('w:ins'):
is_inserted = True
break
if not is_inserted:
result.append(node.text or '')
elif node.tag == qn('w:delText'):
result.append(node.text or '')
return ''.join(result)
def _match_paragraphs(paragraphs, search_text, prefer_not_in_table=False):
"""Core 5-tier matching logic. Operates on any iterable of Paragraph objects.
Returns (para, confidence) or (None, 0.0)."""
norm_search = _norm(search_text)
ws_search = _norm_ws(search_text)
candidates_exact = []
candidates_norm = []
candidates_ws = []
candidates_orig = []
candidates_del = []
for para in paragraphs:
pt = para.text
if search_text in pt:
candidates_exact.append(para)
elif norm_search and norm_search in _norm(pt):
candidates_norm.append(para)
elif ws_search and ws_search in _norm_ws(pt):
candidates_ws.append(para)
else:
orig_pt = _original_para_text(para)
if (search_text in orig_pt
or (norm_search and norm_search in _norm(orig_pt))):
candidates_orig.append(para)
elif ws_search and ws_search in _norm_ws(orig_pt):
candidates_orig.append(para)
else:
full_pt = _full_para_text(para)
if search_text in full_pt:
candidates_del.append(para)
elif ws_search and ws_search in _norm_ws(full_pt):
candidates_del.append(para)
def _in_table(para):
p = para._element
return any(a.tag == qn('w:tc') for a in p.iterancestors())
if not prefer_not_in_table:
for pool, conf in [(candidates_exact, 1.0), (candidates_norm, 0.9),
(candidates_ws, 0.8), (candidates_orig, 0.7),
(candidates_del, 0.6)]:
if pool:
return pool[0], conf
return None, 0.0
best_table_match = (None, 0.0)
for pool, conf in [(candidates_exact, 1.0), (candidates_norm, 0.9),
(candidates_ws, 0.8), (candidates_orig, 0.7),
(candidates_del, 0.6)]:
if not pool:
continue
body_only = [p for p in pool if not _in_table(p)]
if body_only:
return body_only[0], conf
if best_table_match[0] is None:
best_table_match = (pool[0], conf)
return best_table_match if best_table_match[0] is not None else (None, 0.0)
def _find_para(doc, search_text, prefer_not_in_table=False):
"""Find the first paragraph containing search_text across the entire doc.
Five-tier matching (see _match_paragraphs). Returns (para, confidence)."""
return _match_paragraphs(doc.paragraphs, search_text, prefer_not_in_table)
# ── Section-aware anchor search ───────────────────────────────────────────────
_HEADING_NUM_RE = re.compile(r'^(\d+(?:\.\d+)*)\s+\S')
def _para_heading_number(para):
"""Dotted section number if this paragraph is a real TS heading, else None.
Requires the paragraph style to start with 'Heading' (case-insensitive) β€” this
rejects false positives from TOC entries (style 'toc N'), address lines in the
front matter (style 'FP'), change history labels (style 'B3'), etc. ETSI/3GPP
TS documents always style real headings as 'Heading 1'..'Heading N'."""
style_name = (para.style.name if para.style is not None else '') or ''
if not style_name.lower().startswith('heading'):
return None
m = _HEADING_NUM_RE.match(para.text.strip())
return m.group(1) if m else None
def _is_descendant_section(child, parent):
"""True if `child` is `parent` or nested under it (by dotted-prefix)."""
return child == parent or child.startswith(parent + '.')
def _section_range(doc, target):
"""Return (start_idx, end_idx) into doc.paragraphs spanning the target section.
start = index of the heading whose number == target.
end = index of the next heading whose number is NOT a descendant of target
(or len(doc.paragraphs) if none).
Returns (None, None) if target heading not found. Recomputed per-call."""
paras = doc.paragraphs
start = None
for i, p in enumerate(paras):
n = _para_heading_number(p)
if n is None:
continue
if start is None and n == target:
start = i
continue
if start is not None and not _is_descendant_section(n, target):
return (start, i)
return (start, len(paras)) if start is not None else (None, None)
def _enclosing_heading(doc, para):
"""Walk backward from para to the first preceding heading paragraph.
Returns the heading Paragraph or None. Used for HINT lines."""
paras = doc.paragraphs
target_elem = para._element
start_idx = None
for i, p in enumerate(paras):
if p._element is target_elem:
start_idx = i
break
if start_idx is None:
return None
for i in range(start_idx, -1, -1):
if _para_heading_number(paras[i]) is not None:
return paras[i]
return None
def _find_para_in_section(doc, search_text, section_number, prefer_not_in_table=False):
"""Section-restricted _find_para. Returns (para, conf, status) where
status ∈ {"in_section", "no_section"}. On no_section, caller should
fall back to global _find_para with a WARN log line."""
if not section_number:
return (None, 0.0, 'no_section')
start, end = _section_range(doc, section_number)
if start is None:
return (None, 0.0, 'no_section')
para, conf = _match_paragraphs(doc.paragraphs[start:end], search_text,
prefer_not_in_table)
return (para, conf, 'in_section')
def _find_para_with_section(doc, search_text, section_number, kind_label, log,
prefer_not_in_table=False):
"""Section-aware anchor search with WARN/ERROR logging.
Behaviour:
* section_number present + found in TS + anchor in range β†’ return (para, conf).
* section_number present + not in TS β†’ WARN, fall back to global _find_para.
* section_number present + anchor NOT in range β†’ ERROR + HINT, return (None, 0).
* section_number missing β†’ WARN, fall back to global _find_para.
Logs go to `log` (list of str)."""
if section_number:
para, conf, status = _find_para_in_section(
doc, search_text, section_number, prefer_not_in_table)
if status == 'in_section' and para is not None:
return para, conf
if status == 'no_section':
log.append(f" WARN section '{section_number}' not found in TS β€” falling back to global search")
return _find_para(doc, search_text, prefer_not_in_table)
# in_section but anchor absent β€” check global for HINT
g_para, _ = _find_para(doc, search_text, prefer_not_in_table)
if g_para is not None:
enc = _enclosing_heading(doc, g_para)
actual = _para_heading_number(enc) if enc is not None else '?'
log.append(f" ERROR {kind_label}: anchor {search_text[:60]!r} declared in section "
f"{section_number} but found in section {actual}")
log.append(f" HINT nearest match: {g_para.text[:120]!r}")
else:
log.append(f" ERROR {kind_label}: anchor {search_text[:60]!r} not found in section "
f"{section_number} (or anywhere)")
return None, 0.0
log.append(f" WARN no section_number on change β€” global anchor search for {search_text[:60]!r}")
return _find_para(doc, search_text, prefer_not_in_table)
def _find_table_by_section(doc, section_heading):
"""
Find the table immediately following a paragraph that contains section_heading.
Checks both w:t (plain/inserted) and w:delText (tracked-deleted) so the match
survives even after the heading was wrapped in a tracked deletion.
Empty paragraphs between the heading and the table are tolerated.
Returns (table, confidence) or (None, 0.0).
"""
if not section_heading:
return None, 0.0
norm_h = _norm(section_heading)
ws_h = _norm_ws(section_heading)
heading_seen = False
for element in doc.element.body:
tag = element.tag.split('}')[-1] if '}' in element.tag else element.tag
if tag == 'p':
t_text = ''.join(t.text or '' for t in element.findall('.//' + qn('w:t')))
d_text = ''.join(t.text or '' for t in element.findall('.//' + qn('w:delText')))
full = (t_text + d_text).strip()
if not full:
continue # skip empty paras, keep heading_seen state
if (section_heading in full
or norm_h in _norm(full)
or ws_h in _norm_ws(full)):
heading_seen = True
else:
heading_seen = False # non-matching non-empty para resets
elif tag == 'tbl':
if heading_seen:
for tbl in doc.tables:
if tbl._tbl is element:
return tbl, 1.0
heading_seen = False
return None, 0.0
def _find_table(doc, header_key):
"""
Find a table whose first row cell texts start with header_key.
Returns (table, confidence) or (None, 0.0).
"""
norm_key = [_norm(h) for h in header_key]
for tbl in doc.tables:
if not tbl.rows:
continue
for row in tbl.rows[:3]: # check first 3 rows β€” header may not be row 0
row_texts = [_norm(c.text) for c in row.cells]
match = all(
i < len(row_texts) and norm_key[i] in row_texts[i]
for i in range(len(norm_key))
)
if match:
return tbl, 1.0
return None, 0.0
def _disambiguate_by_context(all_rows, candidates, context_rows_before):
"""Pick the candidate whose preceding rows best match context_rows_before.
context_rows_before: list of expected col-0 texts, closest-first.
Returns the best candidate index; falls back to candidates[0] on tie."""
best_score, best_idx = -1, candidates[0]
for idx in candidates:
score = 0
for depth, expected in enumerate(context_rows_before, start=1):
ctx_idx = idx - depth
if ctx_idx < 0 or not expected:
continue
cell0 = all_rows[ctx_idx].cells[0].text if all_rows[ctx_idx].cells else ''
if _norm(expected) in _norm(cell0) or _norm_ws(expected) in _norm_ws(cell0):
score += 1
if score > best_score:
best_score, best_idx = score, idx
return best_idx
def _find_row(tbl, anchor_text, context_rows_before=None):
"""
Find first row in tbl where col-0 cell text contains anchor_text.
Returns (row_idx, confidence) or (-1, 0.0).
When context_rows_before is provided and multiple rows match, uses the
col-0 texts of the rows preceding each candidate to disambiguate.
Matching levels, in order of confidence:
1.0 β€” exact substring match
0.9 β€” Unicode-normalised match (_norm: xa0, dashes, quotes, …)
0.8 β€” whitespace-stripped match (_norm_ws: also removes tabs/newlines)
0.6 β€” alphanumeric-only match (_norm_alnum: strips all non a-z0-9)
0.55 β€” clean-prefix unique match: extract the leading ASCII-only part of
the anchor and find the single row that contains it.
0.5 β€” clean-prefix + token-overlap: when multiple rows share the prefix,
pick the one whose col-0 tokens overlap most with the anchor tokens.
"""
all_rows = list(tbl.rows)
norm_anchor = _norm(anchor_text)
ws_anchor = _norm_ws(anchor_text)
alnum_anchor = _norm_alnum(anchor_text)
for match_fn, conf in [
(lambda c: anchor_text in c, 1.0),
(lambda c: bool(norm_anchor) and norm_anchor in _norm(c), 0.9),
(lambda c: bool(ws_anchor) and ws_anchor in _norm_ws(c), 0.8),
(lambda c: bool(alnum_anchor) and alnum_anchor in _norm_alnum(c), 0.6),
]:
candidates = [
idx for idx, row in enumerate(all_rows)
if row.cells and match_fn(row.cells[0].text)
]
if not candidates:
continue
if len(candidates) == 1 or not context_rows_before:
return candidates[0], conf
return _disambiguate_by_context(all_rows, candidates, context_rows_before), conf
# ── Prefix-based partial match ─────────────────────────────────────────────
prefix = _clean_prefix(anchor_text)
if prefix and len(prefix) > 8:
prefix_low = prefix.lower()
hits = [
idx for idx, row in enumerate(all_rows)
if row.cells and prefix_low in row.cells[0].text.lower()
]
if len(hits) == 1:
return hits[0], 0.55
elif len(hits) > 1:
anchor_tokens = set(re.findall(r'[a-z0-9]+', anchor_text.lower()))
best_score, best_idx = -1, -1
for hit_idx in hits:
cell_tokens = set(re.findall(r'[a-z0-9]+',
all_rows[hit_idx].cells[0].text.lower()))
score = len(anchor_tokens & cell_tokens)
if score > best_score:
best_score, best_idx = score, hit_idx
if best_idx >= 0:
return best_idx, 0.5
return (-1, 0.0)
# ── vMerge row insertion ──────────────────────────────────────────────────────
def _build_new_tr(cells_data, rev, author, date):
"""
Build and return a new tracked-insert <w:tr> element (does NOT insert it).
cells_data: list of dicts with keys: text, width, vmerge, style.
"""
def _ins_attr():
return {qn('w:id'): rev.next(), qn('w:author'): author, qn('w:date'): date}
def _make_t(text, tag='w:t'):
t = OxmlElement(tag)
t.text = text or ''
if text and (text[0] in (' ', '\t') or text[-1] in (' ', '\t')):
t.set('{http://www.w3.org/XML/1998/namespace}space', 'preserve')
return t
def _make_run(text):
r = OxmlElement('w:r')
r.append(_make_t(text))
return r
new_tr = OxmlElement('w:tr')
# trPr: tracked row insertion
trPr = OxmlElement('w:trPr')
tr_ins = OxmlElement('w:ins')
for k, v in _ins_attr().items():
tr_ins.set(k, v)
trPr.append(tr_ins)
new_tr.append(trPr)
for cd in cells_data:
tc = OxmlElement('w:tc')
tcPr = OxmlElement('w:tcPr')
tcW = OxmlElement('w:tcW')
if cd.get('width'):
tcW.set(qn('w:w'), str(cd['width']))
tcW.set(qn('w:type'), 'dxa')
tcPr.append(tcW)
if cd.get('vmerge'):
vm = OxmlElement('w:vMerge')
tcPr.append(vm)
tc.append(tcPr)
p = OxmlElement('w:p')
pPr = OxmlElement('w:pPr')
if cd.get('style'):
pStyle = OxmlElement('w:pStyle')
pStyle.set(qn('w:val'), cd['style'])
pPr.append(pStyle)
rPr_para = OxmlElement('w:rPr')
pm_ins = OxmlElement('w:ins')
for k, v in _ins_attr().items():
pm_ins.set(k, v)
rPr_para.append(pm_ins)
pPr.append(rPr_para)
p.append(pPr)
if cd.get('text') and not cd.get('vmerge'):
ins_el = OxmlElement('w:ins')
for k, v in _ins_attr().items():
ins_el.set(k, v)
ins_el.append(_make_run(cd['text']))
p.append(ins_el)
tc.append(p)
new_tr.append(tc)
return new_tr
def _insert_vmerge_row(tbl, after_row_idx, cells_data, rev, author, date):
"""
Insert a tracked row after row[after_row_idx].
cells_data: list of dicts with keys: text, width, vmerge, style.
Returns the inserted <w:tr> element.
"""
new_tr = _build_new_tr(cells_data, rev, author, date)
ref_tr = tbl.rows[after_row_idx]._tr
ref_tr.addnext(new_tr)
return new_tr
# ── Section replace (direct XML transplant) ───────────────────────────────────
def _apply_section_replace(doc, change, rev, author, date, log):
"""
Transplant a block of CR elements (del section + ins section) directly into
the TS, replacing the old heading+table at the matching location.
This mirrors what Word does on copy-paste: the exact XML from the CR is
cloned into the TS, with only the tracked-change revision IDs remapped to
avoid conflicts.
"""
from lxml import etree
import copy
loc = change['location']
del_heading = loc.get('del_heading', '')
has_del_table = loc.get('has_del_table', False)
section_number = loc.get('section_number', '')
elements_xml = change.get('elements_xml', [])
if not elements_xml:
log.append(' SKIP section_replace: no elements in manifest')
return False
# ── Resolve search scope: restrict to declared section if possible ─────────
search_paras = doc.paragraphs
section_status = 'no_section_required'
if section_number:
start, end = _section_range(doc, section_number)
if start is not None:
search_paras = doc.paragraphs[start:end]
section_status = 'in_section'
else:
log.append(f" WARN section '{section_number}' not found in TS β€” falling back to global search")
section_status = 'section_not_in_ts'
else:
log.append(" WARN no section_number on section_replace β€” global search")
# ── Find the TS paragraph that matches the deleted heading ─────────────────
ts_para_elem = None
insert_after_anchor = False # when True: insert after anchor, don't delete it
if del_heading:
for para in search_paras:
pt = para.text
if del_heading in pt or _norm(del_heading) in _norm(pt):
ts_para_elem = para._element
break
if ts_para_elem is None:
# Fallback: include paragraphs whose XML text (inc. del runs) matches
for para in search_paras:
if del_heading in _full_para_text(para):
ts_para_elem = para._element
break
else:
# No heading to delete β€” use anchor_text to find insertion point
anchor_text = loc.get('anchor_text', '')
if anchor_text:
if section_status == 'in_section':
anchor_para, _, _ = _find_para_in_section(
doc, anchor_text, section_number)
else:
anchor_para, _ = _find_para(doc, anchor_text)
if anchor_para is not None:
ts_para_elem = anchor_para._element
insert_after_anchor = True
if ts_para_elem is None:
# Section mismatch check: if declared section exists, but del_heading
# is found GLOBALLY in a different section, report that.
if section_status == 'in_section' and del_heading:
for para in doc.paragraphs:
pt = para.text
if del_heading in pt or del_heading in _full_para_text(para):
enc = _enclosing_heading(doc, para)
actual = _para_heading_number(enc) if enc is not None else '?'
log.append(f' ERROR section_replace: del_heading {del_heading!r} declared in section '
f'{section_number} but found in section {actual}')
log.append(f" HINT nearest match: {para.text[:120]!r}")
return False
log.append(f' ERROR section_replace: del_heading {del_heading!r} not found in TS')
tokens = del_heading.split()[:3] if del_heading else []
if tokens:
_hints = sorted(
[p for p in doc.paragraphs if any(tok in p.text for tok in tokens)],
key=lambda p: -len(set(del_heading.split()) & set(p.text.split()))
)[:3]
for _h in _hints:
log.append(f" HINT nearest match: {_h.text[:120]!r}")
return False
ts_body = ts_para_elem.getparent()
# ── Find the table immediately after the heading (if applicable) ───────────
ts_tbl_elem = None
if has_del_table:
found_para = False
for sib in ts_body:
if sib is ts_para_elem:
found_para = True
continue
if not found_para:
continue
sib_tag = sib.tag.split('}')[-1] if '}' in sib.tag else sib.tag
if sib_tag == 'p':
# Allow empty paragraphs between heading and table
if not (''.join(t.text or '' for t in sib.findall('.//' + qn('w:t')))).strip():
continue
break # non-empty paragraph before table β†’ no table to remove
elif sib_tag == 'tbl':
ts_tbl_elem = sib
break
else:
break
# Validate the candidate table matches what the CR says should be deleted
if ts_tbl_elem is not None and elements_xml:
cr_tbl_xmls = [x for x in elements_xml if '<w:tbl' in x]
if cr_tbl_xmls:
from lxml import etree as _etree
cr_tbl_el = _etree.fromstring(cr_tbl_xmls[0].encode())
cr_hdr = ''.join(t.text or '' for t in
cr_tbl_el.findall('.//' + qn('w:t'))[:10]).lower()
ts_hdr = ''.join(t.text or '' for t in
ts_tbl_elem.findall('.//' + qn('w:t'))[:10]).lower()
if cr_hdr and cr_hdr not in ts_hdr and ts_hdr not in cr_hdr:
log.append(' WARN section_replace: candidate table header mismatch'
' β€” skipping table removal')
ts_tbl_elem = None
# ── Clone and remap IDs on the CR elements ─────────────────────────────────
cloned = []
for xml_str in elements_xml:
elem = etree.fromstring(xml_str)
cloned_elem = copy.deepcopy(elem)
# Remap w:id in all tracked-change elements (must be unique per document)
for el in cloned_elem.iter():
if el.get(qn('w:id')) is not None:
el.set(qn('w:id'), rev.next())
cloned.append(cloned_elem)
# ── Insert cloned elements before (or after) the anchor paragraph ─────────
insert_idx = list(ts_body).index(ts_para_elem)
if insert_after_anchor:
insert_idx += 1 # insert after anchor, not before it
for i, elem in enumerate(cloned):
ts_body.insert(insert_idx + i, elem)
# ── Remove the now-replaced TS elements (only when a heading was deleted) ──
if not insert_after_anchor:
ts_body.remove(ts_para_elem)
if ts_tbl_elem is not None:
ts_body.remove(ts_tbl_elem)
n_del = sum(1 for x in elements_xml if 'w:del' in x[:200])
log.append(
f' OK section_replace: {del_heading!r} β†’ {len(elements_xml)} element(s) spliced in'
f' (removed heading{"+ table" if has_del_table else ""})'
)
return True
# ── Per-change-type applicators ───────────────────────────────────────────────
def _apply_text_replace(doc, change, rev, author, date, log):
loc = change['location']
old = change['old']
new = change['new']
if loc['kind'] == 'table_cell':
tbl, t_conf = _find_table(doc, loc['table_header'])
if tbl is None:
log.append(f" ERROR text_replace: table not found {loc['table_header'][:2]!r}")
return False
col_idx = loc['col_idx']
row_anchor = loc['row_anchor']
if row_anchor:
row_idx, r_conf = _find_row(tbl, row_anchor)
if row_idx < 0:
# Primary table doesn't contain this row anchor β€” the CR may be
# targeting a different table than the one _find_table resolved.
# Try every other table in the document before giving up.
for alt_tbl in doc.tables:
if alt_tbl is tbl:
continue
row_idx, r_conf = _find_row(alt_tbl, row_anchor)
if row_idx >= 0:
tbl = alt_tbl
break
if row_idx < 0:
log.append(f" ERROR text_replace: row anchor not found {row_anchor!r}")
return False
row = tbl.rows[row_idx]
if col_idx >= len(row.cells):
log.append(f" ERROR text_replace: col_idx {col_idx} out of range")
return False
cell = row.cells[col_idx]
for para in cell.paragraphs:
if old in para.text:
tracked_modify_para(para, old, new, rev, author, date)
_pfx = 'WARN' if min(t_conf, r_conf) < _WARN_CONF else 'OK '
log.append(f" {_pfx} text_replace (table_cell"
f" t_conf={t_conf:.1f} r_conf={r_conf:.1f}"
f" row={row_idx} col={col_idx}): {old!r} β†’ {new!r}")
return True
log.append(f" ERROR text_replace: old text {old!r} not in cell (row={row_idx} col={col_idx})")
return False
else:
# Empty row anchor: scan all rows in col_idx.
# Prefer the table that follows the section heading (e.g. "Thirty fifth byte:")
# because all-empty table headers match any table.
section_heading = loc.get('section_heading', '')
tbl_by_section, _ = _find_table_by_section(doc, section_heading)
if tbl_by_section is not None:
tables_to_try = [tbl_by_section] + [t for t in doc.tables if t is not tbl_by_section]
else:
tables_to_try = [tbl] + [t for t in doc.tables if t is not tbl]
for search_tbl in tables_to_try:
for r_idx, row in enumerate(search_tbl.rows):
if col_idx >= len(row.cells):
continue
cell = row.cells[col_idx]
for para in cell.paragraphs:
if old in para.text:
tracked_modify_para(para, old, new, rev, author, date)
log.append(f" OK text_replace (table_cell scan row={r_idx} col={col_idx}): {old!r} β†’ {new!r}")
return True
# Final fallback: scan ALL columns of ALL tables (guarded by min length)
if len(old) < _MIN_LEN_ALLCOL_FALLBACK:
log.append(f" ERROR text_replace: {old!r} too short for all-column fallback"
f" (ambiguous β€” skipped)")
return False
_all_start = tbl_by_section if tbl_by_section is not None else tbl
for search_tbl in [_all_start] + [t for t in doc.tables if t is not _all_start]:
for r_idx, row in enumerate(search_tbl.rows):
for c_idx, cell in enumerate(row.cells):
for para in cell.paragraphs:
if old in para.text:
tracked_modify_para(para, old, new, rev, author, date)
log.append(f" WARN text_replace (table_cell any_col"
f" row={r_idx} col={c_idx} β€” low confidence):"
f" {old!r} β†’ {new!r}")
return True
log.append(f" ERROR text_replace: old text {old!r} not found in any table column")
return False
elif loc['kind'] == 'body_para':
ctx = loc.get('para_context', '')
section_number = loc.get('section_number', '')
if len(old) < 4 and ctx:
# Short old text matches too broadly (e.g. a single digit would hit
# the title paragraph). Locate by context first, then verify old
# text is present in that paragraph.
para, conf = _find_para_with_section(
doc, ctx, section_number, 'text_replace', log, prefer_not_in_table=True)
if para is None or old not in para.text:
para = None
else:
para, conf = _find_para_with_section(
doc, old, section_number, 'text_replace', log, prefer_not_in_table=True)
if para is None and ctx:
para, conf = _find_para_with_section(
doc, ctx, section_number, 'text_replace', log, prefer_not_in_table=True)
if para is None:
log.append(f" ERROR text_replace: old text {old!r} not found in TS")
return False
if old in para.text:
tracked_modify_para(para, old, new, rev, author, date)
log.append(f" OK text_replace (body_para conf={conf:.1f}): {old!r} β†’ {new!r}")
return True
log.append(f" ERROR text_replace: old text {old!r} not in resolved paragraph")
return False
log.append(f" ERROR text_replace: unknown kind {loc['kind']!r}")
return False
def _apply_para_insert(doc, change, rev, author, date, log):
loc = change['location']
anchor_text = loc.get('anchor_text', '')
section_number = loc.get('section_number', '')
paras_data = change.get('paragraphs', [])
if not paras_data:
return True
anchor_para, conf = _find_para_with_section(
doc, anchor_text, section_number, 'para_insert', log)
if anchor_para is None:
# When no section_number context, emit the legacy ERROR + HINT lines
if not section_number:
log.append(f" ERROR para_insert: anchor not found {anchor_text[:60]!r}")
tokens = anchor_text.split()[:3]
_hints = sorted(
[p for p in doc.paragraphs if any(tok in p.text for tok in tokens)],
key=lambda p: -len(set(anchor_text.split()) & set(p.text.split()))
)[:3]
for _h in _hints:
log.append(f" HINT nearest match: {_h.text[:120]!r}")
return False
items = [(p['text'], p['style'] or 'Normal') for p in paras_data]
tracked_insert_paras_after(anchor_para, items, rev, author, date)
first_text = paras_data[0]['text'][:50] if paras_data else ''
log.append(f" OK para_insert ({len(paras_data)} para(s) after anchor conf={conf:.1f}): {first_text!r}...")
return True
def _apply_row_insert(doc, change, rev, author, date, log, last_inserted=None):
loc = change['location']
# Prefer table located by section heading (handles ambiguous all-empty headers)
section_heading = loc.get('section_heading', '')
tbl_by_section, _ = _find_table_by_section(doc, section_heading)
if tbl_by_section is not None:
tbl = tbl_by_section
t_conf = 1.0
else:
tbl, t_conf = _find_table(doc, loc['table_header'])
if tbl is None:
log.append(f" ERROR row_insert: table not found {loc['table_header'][:2]!r}")
return False
after_anchor = loc.get('after_row_anchor', '')
context_rows_before = loc.get('context_rows_before', [])
row_idx, r_conf = _find_row(tbl, after_anchor, context_rows_before)
if row_idx < 0:
log.append(f" ERROR row_insert: anchor row not found {after_anchor!r}")
return False
cells_data = change.get('cells', [])
# Fix insertion ordering: when multiple rows target the same (tbl, row_idx),
# each new row should go AFTER the previously inserted one, not after row_idx.
# last_inserted maps (tbl._tbl id, row_idx) β†’ last w:tr element inserted there.
key = (id(tbl._tbl), row_idx)
if last_inserted is not None and key in last_inserted:
# Insert after the previously inserted row to maintain forward order
prev_tr = last_inserted[key]
new_tr = _build_new_tr(cells_data, rev, author, date)
prev_tr.addnext(new_tr)
last_inserted[key] = new_tr
else:
new_tr = _insert_vmerge_row(tbl, row_idx, cells_data, rev, author, date)
if last_inserted is not None:
last_inserted[key] = new_tr
desc = cells_data[1]['text'] if len(cells_data) > 1 else '?'
_pfx = 'WARN' if min(t_conf, r_conf) < _WARN_CONF else 'OK '
log.append(f" {_pfx} row_insert (t_conf={t_conf:.1f} r_conf={r_conf:.1f})"
f" after row[{row_idx}] ({after_anchor!r}): {desc!r}")
return True
# ── Manifest pre-processing ───────────────────────────────────────────────────
def _merge_para_inserts(manifest):
"""
Merge consecutive para_insert entries that share the same anchor_text.
When the CR parser emits multiple para_insert entries for the same anchor
(because [...] context markers were transparent and kept prev_stable_text
unchanged), each would call tracked_insert_paras_after independently.
Since each call starts from the same anchor element and uses addnext(),
later groups push earlier groups down β€” producing reversed order.
Merging them into one entry ensures a single tracked_insert_paras_after
call that inserts all paragraphs in the correct forward order.
"""
result = []
for change in manifest:
if (change.get('type') == 'para_insert'
and result
and result[-1].get('type') == 'para_insert'
and result[-1]['location']['anchor_text'] == change['location']['anchor_text']):
result[-1]['paragraphs'].extend(change['paragraphs'])
else:
merged = dict(change)
if change.get('type') == 'para_insert':
merged['paragraphs'] = list(change['paragraphs'])
result.append(merged)
return result
# ── Main apply function ───────────────────────────────────────────────────────
def apply_manifest(ts_path, manifest, out_path, author=DEFAULT_AUTHOR, date=DEFAULT_DATE):
"""
Apply all changes in manifest to ts_path, save to out_path.
Returns (n_ok, n_skipped, log_lines, n_parsed, n_merged_groups).
"""
doc = docx.Document(str(ts_path))
rev = RevCounter(doc)
log = []
n_ok = 0
n_skip = 0
n_parsed = len(manifest)
manifest = _merge_para_inserts(manifest)
n_merged = len(manifest)
# Track last inserted <w:tr> per (tbl_id, anchor_row_idx) to maintain
# forward insertion order when multiple row_inserts target the same anchor.
last_inserted = {}
for change in manifest:
ctype = change.get('type')
ok = False
if ctype == 'section_replace':
ok = _apply_section_replace(doc, change, rev, author, date, log)
elif ctype == 'text_replace':
ok = _apply_text_replace(doc, change, rev, author, date, log)
elif ctype == 'para_insert':
ok = _apply_para_insert(doc, change, rev, author, date, log)
elif ctype == 'row_insert':
ok = _apply_row_insert(doc, change, rev, author, date, log, last_inserted=last_inserted)
else:
log.append(f" SKIP unknown change type: {ctype!r}")
if ok:
n_ok += 1
else:
n_skip += 1
doc.save(str(out_path))
return n_ok, n_skip, log, n_parsed, n_merged
# ── CLI ───────────────────────────────────────────────────────────────────────
def main():
ap = argparse.ArgumentParser(description='Apply CR manifest to TS DOCX as tracked changes.')
ap.add_argument('ts_docx', help='Target TS DOCX file')
ap.add_argument('manifest', help='JSON manifest from cr_parser.py')
ap.add_argument('--author', default=DEFAULT_AUTHOR, help='Tracked change author')
ap.add_argument('--output', default=None, help='Output path (default: <ts>_applied.docx)')
args = ap.parse_args()
ts_path = Path(args.ts_docx)
out_path = Path(args.output) if args.output else ts_path.parent / (ts_path.stem + '_applied.docx')
with open(args.manifest, encoding='utf-8') as f:
manifest = json.load(f)
print(f'Applying {len(manifest)} change(s) from manifest to {ts_path.name}...')
n_ok, n_skip, log, n_parsed, n_merged = apply_manifest(ts_path, manifest, out_path, author=args.author)
for line in log:
print(line)
print(f'\nParsed: {n_parsed} body changes (merged to {n_merged} groups) β†’ Applied: {n_ok} Skipped: {n_skip}')
print(f'Output: {out_path}')
if __name__ == '__main__':
main()