#!/usr/bin/env python3 """ ts_applicator.py — Apply a CR change manifest to a TS DOCX as tracked changes. Reads a JSON manifest produced by cr_parser.py and applies every change to the target TS using docx_helpers tracked-change primitives. Usage: python3 ts_applicator.py [--author NAME] [--output path] # or import: from ts_applicator import apply_manifest """ import argparse import json import re import sys from pathlib import Path import docx from docx.oxml import OxmlElement from docx.oxml.ns import qn sys.path.insert(0, str(Path(__file__).parent)) _MIN_LEN_ALLCOL_FALLBACK = 8 # old text shorter than this is too ambiguous for any-column search _WARN_CONF = 0.8 # confidence below this emits WARN instead of OK from docx_helpers import ( RevCounter, tracked_modify_para, tracked_insert_paras_after, AUTHOR as DEFAULT_AUTHOR, DATE as DEFAULT_DATE, ) # ── Text normalisation ──────────────────────────────────────────────────────── _UNICODE_REPLACEMENTS = ( ('\xa0', ' '), # non-breaking space ('\u202f', ' '), # narrow no-break space ('\u2007', ' '), # figure space ('\u2060', ''), # word joiner (invisible) ('\u200b', ''), # zero-width space ('\u00ad', ''), # soft hyphen (invisible) ('\u2011', '-'), # non-breaking hyphen ('\u2013', '-'), # en dash ('\u2014', '-'), # em dash ('\u2212', '-'), # minus sign ('\u2018', "'"), # left single quote ('\u2019', "'"), # right single quote ('\u201c', '"'), # left double quote ('\u201d', '"'), # right double quote ('\u2026', '...'), # horizontal ellipsis → three dots ) def _norm(text): """Normalise common Unicode invisible/whitespace/punctuation variants for comparison.""" for old, new in _UNICODE_REPLACEMENTS: text = text.replace(old, new) return text.strip() def _norm_ws(text): """ Strip all whitespace for structural matching. ETSI TS files store structured paragraphs (references, abbreviations, headings) with a TAB between the code and the body text, e.g.: '[27]\\tGlobalPlatform: ...' 'CLT\\tContactLess Tunnelling' '8.3\\tRAM implementation over HTTPS' The CR's text extraction concatenates runs directly, losing the tab: '[27]GlobalPlatform: ...' 'CLTContactLess Tunnelling' '8.3RAM implementation over HTTPS' Removing all whitespace from both sides before comparing solves this. Used as a third-level fallback (confidence 0.8) after exact and NBSP-norm. """ for old, new in _UNICODE_REPLACEMENTS: text = text.replace(old, new) return re.sub(r'\s+', '', text) def _norm_alnum(text): """Keep only lowercase alphanumeric characters — last-resort matching. Strips all punctuation, spaces, and Unicode variants so that only the raw word/number content is compared. Used as a confidence-0.6 fallback in _find_row when even whitespace-stripped matching fails (e.g. different bracket styles, quote variants, or punctuation differences between the CR and the TS). """ return re.sub(r'[^a-z0-9]', '', text.lower()) def _clean_prefix(text: str) -> str: """Return the longest leading substring that contains only standard printable ASCII characters (ord 32–126). Non-breaking spaces, curly quotes, and other Unicode characters embedded mid-text (e.g. between spec number components like 'TS\xa0102\xa0226') make the full anchor unmatchable. The clean prefix — the part before the first such character — is still reliable and specific enough to locate the correct row. """ end = 0 for ch in text: if ord(ch) < 32 or ord(ch) > 126: break end += 1 return text[:end].strip() # ── Document search helpers ─────────────────────────────────────────────────── def _full_para_text(para): """All text content including w:t (normal/inserted) and w:delText (deleted runs).""" el = para._element return ''.join(t.text or '' for t in el.findall('.//' + qn('w:t'))) + \ ''.join(t.text or '' for t in el.findall('.//' + qn('w:delText'))) def _original_para_text(para): """Reconstruct paragraph text as it was before tracked changes. Iterates in document order, keeping: - w:t runs that are NOT inside a w:ins element (stable text) - w:delText runs (deleted-but-original text) Skipping: - w:t runs inside w:ins (newly inserted text) This allows anchors that reference original phrasing (e.g. 'SCP81Connection') to still match after a tracked '1'→'X' replacement has been applied to that paragraph — where _full_para_text would return the concatenation out of order. """ el = para._element result = [] for node in el.iter(): if node.tag == qn('w:t'): # Skip if this w:t is wrapped in a w:ins element is_inserted = False for anc in node.iterancestors(): if anc is el: break if anc.tag == qn('w:ins'): is_inserted = True break if not is_inserted: result.append(node.text or '') elif node.tag == qn('w:delText'): result.append(node.text or '') return ''.join(result) def _match_paragraphs(paragraphs, search_text, prefer_not_in_table=False): """Core 5-tier matching logic. Operates on any iterable of Paragraph objects. Returns (para, confidence) or (None, 0.0).""" norm_search = _norm(search_text) ws_search = _norm_ws(search_text) candidates_exact = [] candidates_norm = [] candidates_ws = [] candidates_orig = [] candidates_del = [] for para in paragraphs: pt = para.text if search_text in pt: candidates_exact.append(para) elif norm_search and norm_search in _norm(pt): candidates_norm.append(para) elif ws_search and ws_search in _norm_ws(pt): candidates_ws.append(para) else: orig_pt = _original_para_text(para) if (search_text in orig_pt or (norm_search and norm_search in _norm(orig_pt))): candidates_orig.append(para) elif ws_search and ws_search in _norm_ws(orig_pt): candidates_orig.append(para) else: full_pt = _full_para_text(para) if search_text in full_pt: candidates_del.append(para) elif ws_search and ws_search in _norm_ws(full_pt): candidates_del.append(para) def _in_table(para): p = para._element return any(a.tag == qn('w:tc') for a in p.iterancestors()) if not prefer_not_in_table: for pool, conf in [(candidates_exact, 1.0), (candidates_norm, 0.9), (candidates_ws, 0.8), (candidates_orig, 0.7), (candidates_del, 0.6)]: if pool: return pool[0], conf return None, 0.0 best_table_match = (None, 0.0) for pool, conf in [(candidates_exact, 1.0), (candidates_norm, 0.9), (candidates_ws, 0.8), (candidates_orig, 0.7), (candidates_del, 0.6)]: if not pool: continue body_only = [p for p in pool if not _in_table(p)] if body_only: return body_only[0], conf if best_table_match[0] is None: best_table_match = (pool[0], conf) return best_table_match if best_table_match[0] is not None else (None, 0.0) def _find_para(doc, search_text, prefer_not_in_table=False): """Find the first paragraph containing search_text across the entire doc. Five-tier matching (see _match_paragraphs). Returns (para, confidence).""" return _match_paragraphs(doc.paragraphs, search_text, prefer_not_in_table) # ── Section-aware anchor search ─────────────────────────────────────────────── _HEADING_NUM_RE = re.compile(r'^(\d+(?:\.\d+)*)\s+\S') def _para_heading_number(para): """Dotted section number if this paragraph is a real TS heading, else None. Requires the paragraph style to start with 'Heading' (case-insensitive) — this rejects false positives from TOC entries (style 'toc N'), address lines in the front matter (style 'FP'), change history labels (style 'B3'), etc. ETSI/3GPP TS documents always style real headings as 'Heading 1'..'Heading N'.""" style_name = (para.style.name if para.style is not None else '') or '' if not style_name.lower().startswith('heading'): return None m = _HEADING_NUM_RE.match(para.text.strip()) return m.group(1) if m else None def _is_descendant_section(child, parent): """True if `child` is `parent` or nested under it (by dotted-prefix).""" return child == parent or child.startswith(parent + '.') def _section_range(doc, target): """Return (start_idx, end_idx) into doc.paragraphs spanning the target section. start = index of the heading whose number == target. end = index of the next heading whose number is NOT a descendant of target (or len(doc.paragraphs) if none). Returns (None, None) if target heading not found. Recomputed per-call.""" paras = doc.paragraphs start = None for i, p in enumerate(paras): n = _para_heading_number(p) if n is None: continue if start is None and n == target: start = i continue if start is not None and not _is_descendant_section(n, target): return (start, i) return (start, len(paras)) if start is not None else (None, None) def _enclosing_heading(doc, para): """Walk backward from para to the first preceding heading paragraph. Returns the heading Paragraph or None. Used for HINT lines.""" paras = doc.paragraphs target_elem = para._element start_idx = None for i, p in enumerate(paras): if p._element is target_elem: start_idx = i break if start_idx is None: return None for i in range(start_idx, -1, -1): if _para_heading_number(paras[i]) is not None: return paras[i] return None def _find_para_in_section(doc, search_text, section_number, prefer_not_in_table=False): """Section-restricted _find_para. Returns (para, conf, status) where status ∈ {"in_section", "no_section"}. On no_section, caller should fall back to global _find_para with a WARN log line.""" if not section_number: return (None, 0.0, 'no_section') start, end = _section_range(doc, section_number) if start is None: return (None, 0.0, 'no_section') para, conf = _match_paragraphs(doc.paragraphs[start:end], search_text, prefer_not_in_table) return (para, conf, 'in_section') def _find_para_with_section(doc, search_text, section_number, kind_label, log, prefer_not_in_table=False): """Section-aware anchor search with WARN/ERROR logging. Behaviour: * section_number present + found in TS + anchor in range → return (para, conf). * section_number present + not in TS → WARN, fall back to global _find_para. * section_number present + anchor NOT in range → ERROR + HINT, return (None, 0). * section_number missing → WARN, fall back to global _find_para. Logs go to `log` (list of str).""" if section_number: para, conf, status = _find_para_in_section( doc, search_text, section_number, prefer_not_in_table) if status == 'in_section' and para is not None: return para, conf if status == 'no_section': log.append(f" WARN section '{section_number}' not found in TS — falling back to global search") return _find_para(doc, search_text, prefer_not_in_table) # in_section but anchor absent — check global for HINT g_para, _ = _find_para(doc, search_text, prefer_not_in_table) if g_para is not None: enc = _enclosing_heading(doc, g_para) actual = _para_heading_number(enc) if enc is not None else '?' log.append(f" ERROR {kind_label}: anchor {search_text[:60]!r} declared in section " f"{section_number} but found in section {actual}") log.append(f" HINT nearest match: {g_para.text[:120]!r}") else: log.append(f" ERROR {kind_label}: anchor {search_text[:60]!r} not found in section " f"{section_number} (or anywhere)") return None, 0.0 log.append(f" WARN no section_number on change — global anchor search for {search_text[:60]!r}") return _find_para(doc, search_text, prefer_not_in_table) def _find_table_by_section(doc, section_heading): """ Find the table immediately following a paragraph that contains section_heading. Checks both w:t (plain/inserted) and w:delText (tracked-deleted) so the match survives even after the heading was wrapped in a tracked deletion. Empty paragraphs between the heading and the table are tolerated. Returns (table, confidence) or (None, 0.0). """ if not section_heading: return None, 0.0 norm_h = _norm(section_heading) ws_h = _norm_ws(section_heading) heading_seen = False for element in doc.element.body: tag = element.tag.split('}')[-1] if '}' in element.tag else element.tag if tag == 'p': t_text = ''.join(t.text or '' for t in element.findall('.//' + qn('w:t'))) d_text = ''.join(t.text or '' for t in element.findall('.//' + qn('w:delText'))) full = (t_text + d_text).strip() if not full: continue # skip empty paras, keep heading_seen state if (section_heading in full or norm_h in _norm(full) or ws_h in _norm_ws(full)): heading_seen = True else: heading_seen = False # non-matching non-empty para resets elif tag == 'tbl': if heading_seen: for tbl in doc.tables: if tbl._tbl is element: return tbl, 1.0 heading_seen = False return None, 0.0 def _find_table(doc, header_key): """ Find a table whose first row cell texts start with header_key. Returns (table, confidence) or (None, 0.0). """ norm_key = [_norm(h) for h in header_key] for tbl in doc.tables: if not tbl.rows: continue for row in tbl.rows[:3]: # check first 3 rows — header may not be row 0 row_texts = [_norm(c.text) for c in row.cells] match = all( i < len(row_texts) and norm_key[i] in row_texts[i] for i in range(len(norm_key)) ) if match: return tbl, 1.0 return None, 0.0 def _disambiguate_by_context(all_rows, candidates, context_rows_before): """Pick the candidate whose preceding rows best match context_rows_before. context_rows_before: list of expected col-0 texts, closest-first. Returns the best candidate index; falls back to candidates[0] on tie.""" best_score, best_idx = -1, candidates[0] for idx in candidates: score = 0 for depth, expected in enumerate(context_rows_before, start=1): ctx_idx = idx - depth if ctx_idx < 0 or not expected: continue cell0 = all_rows[ctx_idx].cells[0].text if all_rows[ctx_idx].cells else '' if _norm(expected) in _norm(cell0) or _norm_ws(expected) in _norm_ws(cell0): score += 1 if score > best_score: best_score, best_idx = score, idx return best_idx def _find_row(tbl, anchor_text, context_rows_before=None): """ Find first row in tbl where col-0 cell text contains anchor_text. Returns (row_idx, confidence) or (-1, 0.0). When context_rows_before is provided and multiple rows match, uses the col-0 texts of the rows preceding each candidate to disambiguate. Matching levels, in order of confidence: 1.0 — exact substring match 0.9 — Unicode-normalised match (_norm: xa0, dashes, quotes, …) 0.8 — whitespace-stripped match (_norm_ws: also removes tabs/newlines) 0.6 — alphanumeric-only match (_norm_alnum: strips all non a-z0-9) 0.55 — clean-prefix unique match: extract the leading ASCII-only part of the anchor and find the single row that contains it. 0.5 — clean-prefix + token-overlap: when multiple rows share the prefix, pick the one whose col-0 tokens overlap most with the anchor tokens. """ all_rows = list(tbl.rows) norm_anchor = _norm(anchor_text) ws_anchor = _norm_ws(anchor_text) alnum_anchor = _norm_alnum(anchor_text) for match_fn, conf in [ (lambda c: anchor_text in c, 1.0), (lambda c: bool(norm_anchor) and norm_anchor in _norm(c), 0.9), (lambda c: bool(ws_anchor) and ws_anchor in _norm_ws(c), 0.8), (lambda c: bool(alnum_anchor) and alnum_anchor in _norm_alnum(c), 0.6), ]: candidates = [ idx for idx, row in enumerate(all_rows) if row.cells and match_fn(row.cells[0].text) ] if not candidates: continue if len(candidates) == 1 or not context_rows_before: return candidates[0], conf return _disambiguate_by_context(all_rows, candidates, context_rows_before), conf # ── Prefix-based partial match ───────────────────────────────────────────── prefix = _clean_prefix(anchor_text) if prefix and len(prefix) > 8: prefix_low = prefix.lower() hits = [ idx for idx, row in enumerate(all_rows) if row.cells and prefix_low in row.cells[0].text.lower() ] if len(hits) == 1: return hits[0], 0.55 elif len(hits) > 1: anchor_tokens = set(re.findall(r'[a-z0-9]+', anchor_text.lower())) best_score, best_idx = -1, -1 for hit_idx in hits: cell_tokens = set(re.findall(r'[a-z0-9]+', all_rows[hit_idx].cells[0].text.lower())) score = len(anchor_tokens & cell_tokens) if score > best_score: best_score, best_idx = score, hit_idx if best_idx >= 0: return best_idx, 0.5 return (-1, 0.0) # ── vMerge row insertion ────────────────────────────────────────────────────── def _build_new_tr(cells_data, rev, author, date): """ Build and return a new tracked-insert element (does NOT insert it). cells_data: list of dicts with keys: text, width, vmerge, style. """ def _ins_attr(): return {qn('w:id'): rev.next(), qn('w:author'): author, qn('w:date'): date} def _make_t(text, tag='w:t'): t = OxmlElement(tag) t.text = text or '' if text and (text[0] in (' ', '\t') or text[-1] in (' ', '\t')): t.set('{http://www.w3.org/XML/1998/namespace}space', 'preserve') return t def _make_run(text): r = OxmlElement('w:r') r.append(_make_t(text)) return r new_tr = OxmlElement('w:tr') # trPr: tracked row insertion trPr = OxmlElement('w:trPr') tr_ins = OxmlElement('w:ins') for k, v in _ins_attr().items(): tr_ins.set(k, v) trPr.append(tr_ins) new_tr.append(trPr) for cd in cells_data: tc = OxmlElement('w:tc') tcPr = OxmlElement('w:tcPr') tcW = OxmlElement('w:tcW') if cd.get('width'): tcW.set(qn('w:w'), str(cd['width'])) tcW.set(qn('w:type'), 'dxa') tcPr.append(tcW) if cd.get('vmerge'): vm = OxmlElement('w:vMerge') tcPr.append(vm) tc.append(tcPr) p = OxmlElement('w:p') pPr = OxmlElement('w:pPr') if cd.get('style'): pStyle = OxmlElement('w:pStyle') pStyle.set(qn('w:val'), cd['style']) pPr.append(pStyle) rPr_para = OxmlElement('w:rPr') pm_ins = OxmlElement('w:ins') for k, v in _ins_attr().items(): pm_ins.set(k, v) rPr_para.append(pm_ins) pPr.append(rPr_para) p.append(pPr) if cd.get('text') and not cd.get('vmerge'): ins_el = OxmlElement('w:ins') for k, v in _ins_attr().items(): ins_el.set(k, v) ins_el.append(_make_run(cd['text'])) p.append(ins_el) tc.append(p) new_tr.append(tc) return new_tr def _insert_vmerge_row(tbl, after_row_idx, cells_data, rev, author, date): """ Insert a tracked row after row[after_row_idx]. cells_data: list of dicts with keys: text, width, vmerge, style. Returns the inserted element. """ new_tr = _build_new_tr(cells_data, rev, author, date) ref_tr = tbl.rows[after_row_idx]._tr ref_tr.addnext(new_tr) return new_tr # ── Section replace (direct XML transplant) ─────────────────────────────────── def _apply_section_replace(doc, change, rev, author, date, log): """ Transplant a block of CR elements (del section + ins section) directly into the TS, replacing the old heading+table at the matching location. This mirrors what Word does on copy-paste: the exact XML from the CR is cloned into the TS, with only the tracked-change revision IDs remapped to avoid conflicts. """ from lxml import etree import copy loc = change['location'] del_heading = loc.get('del_heading', '') has_del_table = loc.get('has_del_table', False) section_number = loc.get('section_number', '') elements_xml = change.get('elements_xml', []) if not elements_xml: log.append(' SKIP section_replace: no elements in manifest') return False # ── Resolve search scope: restrict to declared section if possible ───────── search_paras = doc.paragraphs section_status = 'no_section_required' if section_number: start, end = _section_range(doc, section_number) if start is not None: search_paras = doc.paragraphs[start:end] section_status = 'in_section' else: log.append(f" WARN section '{section_number}' not found in TS — falling back to global search") section_status = 'section_not_in_ts' else: log.append(" WARN no section_number on section_replace — global search") # ── Find the TS paragraph that matches the deleted heading ───────────────── ts_para_elem = None insert_after_anchor = False # when True: insert after anchor, don't delete it if del_heading: for para in search_paras: pt = para.text if del_heading in pt or _norm(del_heading) in _norm(pt): ts_para_elem = para._element break if ts_para_elem is None: # Fallback: include paragraphs whose XML text (inc. del runs) matches for para in search_paras: if del_heading in _full_para_text(para): ts_para_elem = para._element break else: # No heading to delete — use anchor_text to find insertion point anchor_text = loc.get('anchor_text', '') if anchor_text: if section_status == 'in_section': anchor_para, _, _ = _find_para_in_section( doc, anchor_text, section_number) else: anchor_para, _ = _find_para(doc, anchor_text) if anchor_para is not None: ts_para_elem = anchor_para._element insert_after_anchor = True if ts_para_elem is None: # Section mismatch check: if declared section exists, but del_heading # is found GLOBALLY in a different section, report that. if section_status == 'in_section' and del_heading: for para in doc.paragraphs: pt = para.text if del_heading in pt or del_heading in _full_para_text(para): enc = _enclosing_heading(doc, para) actual = _para_heading_number(enc) if enc is not None else '?' log.append(f' ERROR section_replace: del_heading {del_heading!r} declared in section ' f'{section_number} but found in section {actual}') log.append(f" HINT nearest match: {para.text[:120]!r}") return False log.append(f' ERROR section_replace: del_heading {del_heading!r} not found in TS') tokens = del_heading.split()[:3] if del_heading else [] if tokens: _hints = sorted( [p for p in doc.paragraphs if any(tok in p.text for tok in tokens)], key=lambda p: -len(set(del_heading.split()) & set(p.text.split())) )[:3] for _h in _hints: log.append(f" HINT nearest match: {_h.text[:120]!r}") return False ts_body = ts_para_elem.getparent() # ── Find the table immediately after the heading (if applicable) ─────────── ts_tbl_elem = None if has_del_table: found_para = False for sib in ts_body: if sib is ts_para_elem: found_para = True continue if not found_para: continue sib_tag = sib.tag.split('}')[-1] if '}' in sib.tag else sib.tag if sib_tag == 'p': # Allow empty paragraphs between heading and table if not (''.join(t.text or '' for t in sib.findall('.//' + qn('w:t')))).strip(): continue break # non-empty paragraph before table → no table to remove elif sib_tag == 'tbl': ts_tbl_elem = sib break else: break # Validate the candidate table matches what the CR says should be deleted if ts_tbl_elem is not None and elements_xml: cr_tbl_xmls = [x for x in elements_xml if '= 0: tbl = alt_tbl break if row_idx < 0: log.append(f" ERROR text_replace: row anchor not found {row_anchor!r}") return False row = tbl.rows[row_idx] if col_idx >= len(row.cells): log.append(f" ERROR text_replace: col_idx {col_idx} out of range") return False cell = row.cells[col_idx] for para in cell.paragraphs: if old in para.text: tracked_modify_para(para, old, new, rev, author, date) _pfx = 'WARN' if min(t_conf, r_conf) < _WARN_CONF else 'OK ' log.append(f" {_pfx} text_replace (table_cell" f" t_conf={t_conf:.1f} r_conf={r_conf:.1f}" f" row={row_idx} col={col_idx}): {old!r} → {new!r}") return True log.append(f" ERROR text_replace: old text {old!r} not in cell (row={row_idx} col={col_idx})") return False else: # Empty row anchor: scan all rows in col_idx. # Prefer the table that follows the section heading (e.g. "Thirty fifth byte:") # because all-empty table headers match any table. section_heading = loc.get('section_heading', '') tbl_by_section, _ = _find_table_by_section(doc, section_heading) if tbl_by_section is not None: tables_to_try = [tbl_by_section] + [t for t in doc.tables if t is not tbl_by_section] else: tables_to_try = [tbl] + [t for t in doc.tables if t is not tbl] for search_tbl in tables_to_try: for r_idx, row in enumerate(search_tbl.rows): if col_idx >= len(row.cells): continue cell = row.cells[col_idx] for para in cell.paragraphs: if old in para.text: tracked_modify_para(para, old, new, rev, author, date) log.append(f" OK text_replace (table_cell scan row={r_idx} col={col_idx}): {old!r} → {new!r}") return True # Final fallback: scan ALL columns of ALL tables (guarded by min length) if len(old) < _MIN_LEN_ALLCOL_FALLBACK: log.append(f" ERROR text_replace: {old!r} too short for all-column fallback" f" (ambiguous — skipped)") return False _all_start = tbl_by_section if tbl_by_section is not None else tbl for search_tbl in [_all_start] + [t for t in doc.tables if t is not _all_start]: for r_idx, row in enumerate(search_tbl.rows): for c_idx, cell in enumerate(row.cells): for para in cell.paragraphs: if old in para.text: tracked_modify_para(para, old, new, rev, author, date) log.append(f" WARN text_replace (table_cell any_col" f" row={r_idx} col={c_idx} — low confidence):" f" {old!r} → {new!r}") return True log.append(f" ERROR text_replace: old text {old!r} not found in any table column") return False elif loc['kind'] == 'body_para': ctx = loc.get('para_context', '') section_number = loc.get('section_number', '') if len(old) < 4 and ctx: # Short old text matches too broadly (e.g. a single digit would hit # the title paragraph). Locate by context first, then verify old # text is present in that paragraph. para, conf = _find_para_with_section( doc, ctx, section_number, 'text_replace', log, prefer_not_in_table=True) if para is None or old not in para.text: para = None else: para, conf = _find_para_with_section( doc, old, section_number, 'text_replace', log, prefer_not_in_table=True) if para is None and ctx: para, conf = _find_para_with_section( doc, ctx, section_number, 'text_replace', log, prefer_not_in_table=True) if para is None: log.append(f" ERROR text_replace: old text {old!r} not found in TS") return False if old in para.text: tracked_modify_para(para, old, new, rev, author, date) log.append(f" OK text_replace (body_para conf={conf:.1f}): {old!r} → {new!r}") return True log.append(f" ERROR text_replace: old text {old!r} not in resolved paragraph") return False log.append(f" ERROR text_replace: unknown kind {loc['kind']!r}") return False def _apply_para_insert(doc, change, rev, author, date, log): loc = change['location'] anchor_text = loc.get('anchor_text', '') section_number = loc.get('section_number', '') paras_data = change.get('paragraphs', []) if not paras_data: return True anchor_para, conf = _find_para_with_section( doc, anchor_text, section_number, 'para_insert', log) if anchor_para is None: # When no section_number context, emit the legacy ERROR + HINT lines if not section_number: log.append(f" ERROR para_insert: anchor not found {anchor_text[:60]!r}") tokens = anchor_text.split()[:3] _hints = sorted( [p for p in doc.paragraphs if any(tok in p.text for tok in tokens)], key=lambda p: -len(set(anchor_text.split()) & set(p.text.split())) )[:3] for _h in _hints: log.append(f" HINT nearest match: {_h.text[:120]!r}") return False items = [(p['text'], p['style'] or 'Normal') for p in paras_data] tracked_insert_paras_after(anchor_para, items, rev, author, date) first_text = paras_data[0]['text'][:50] if paras_data else '' log.append(f" OK para_insert ({len(paras_data)} para(s) after anchor conf={conf:.1f}): {first_text!r}...") return True def _apply_row_insert(doc, change, rev, author, date, log, last_inserted=None): loc = change['location'] # Prefer table located by section heading (handles ambiguous all-empty headers) section_heading = loc.get('section_heading', '') tbl_by_section, _ = _find_table_by_section(doc, section_heading) if tbl_by_section is not None: tbl = tbl_by_section t_conf = 1.0 else: tbl, t_conf = _find_table(doc, loc['table_header']) if tbl is None: log.append(f" ERROR row_insert: table not found {loc['table_header'][:2]!r}") return False after_anchor = loc.get('after_row_anchor', '') context_rows_before = loc.get('context_rows_before', []) row_idx, r_conf = _find_row(tbl, after_anchor, context_rows_before) if row_idx < 0: log.append(f" ERROR row_insert: anchor row not found {after_anchor!r}") return False cells_data = change.get('cells', []) # Fix insertion ordering: when multiple rows target the same (tbl, row_idx), # each new row should go AFTER the previously inserted one, not after row_idx. # last_inserted maps (tbl._tbl id, row_idx) → last w:tr element inserted there. key = (id(tbl._tbl), row_idx) if last_inserted is not None and key in last_inserted: # Insert after the previously inserted row to maintain forward order prev_tr = last_inserted[key] new_tr = _build_new_tr(cells_data, rev, author, date) prev_tr.addnext(new_tr) last_inserted[key] = new_tr else: new_tr = _insert_vmerge_row(tbl, row_idx, cells_data, rev, author, date) if last_inserted is not None: last_inserted[key] = new_tr desc = cells_data[1]['text'] if len(cells_data) > 1 else '?' _pfx = 'WARN' if min(t_conf, r_conf) < _WARN_CONF else 'OK ' log.append(f" {_pfx} row_insert (t_conf={t_conf:.1f} r_conf={r_conf:.1f})" f" after row[{row_idx}] ({after_anchor!r}): {desc!r}") return True # ── Manifest pre-processing ─────────────────────────────────────────────────── def _merge_para_inserts(manifest): """ Merge consecutive para_insert entries that share the same anchor_text. When the CR parser emits multiple para_insert entries for the same anchor (because [...] context markers were transparent and kept prev_stable_text unchanged), each would call tracked_insert_paras_after independently. Since each call starts from the same anchor element and uses addnext(), later groups push earlier groups down — producing reversed order. Merging them into one entry ensures a single tracked_insert_paras_after call that inserts all paragraphs in the correct forward order. """ result = [] for change in manifest: if (change.get('type') == 'para_insert' and result and result[-1].get('type') == 'para_insert' and result[-1]['location']['anchor_text'] == change['location']['anchor_text']): result[-1]['paragraphs'].extend(change['paragraphs']) else: merged = dict(change) if change.get('type') == 'para_insert': merged['paragraphs'] = list(change['paragraphs']) result.append(merged) return result # ── Main apply function ─────────────────────────────────────────────────────── def apply_manifest(ts_path, manifest, out_path, author=DEFAULT_AUTHOR, date=DEFAULT_DATE): """ Apply all changes in manifest to ts_path, save to out_path. Returns (n_ok, n_skipped, log_lines, n_parsed, n_merged_groups). """ doc = docx.Document(str(ts_path)) rev = RevCounter(doc) log = [] n_ok = 0 n_skip = 0 n_parsed = len(manifest) manifest = _merge_para_inserts(manifest) n_merged = len(manifest) # Track last inserted per (tbl_id, anchor_row_idx) to maintain # forward insertion order when multiple row_inserts target the same anchor. last_inserted = {} for change in manifest: ctype = change.get('type') ok = False if ctype == 'section_replace': ok = _apply_section_replace(doc, change, rev, author, date, log) elif ctype == 'text_replace': ok = _apply_text_replace(doc, change, rev, author, date, log) elif ctype == 'para_insert': ok = _apply_para_insert(doc, change, rev, author, date, log) elif ctype == 'row_insert': ok = _apply_row_insert(doc, change, rev, author, date, log, last_inserted=last_inserted) else: log.append(f" SKIP unknown change type: {ctype!r}") if ok: n_ok += 1 else: n_skip += 1 doc.save(str(out_path)) return n_ok, n_skip, log, n_parsed, n_merged # ── CLI ─────────────────────────────────────────────────────────────────────── def main(): ap = argparse.ArgumentParser(description='Apply CR manifest to TS DOCX as tracked changes.') ap.add_argument('ts_docx', help='Target TS DOCX file') ap.add_argument('manifest', help='JSON manifest from cr_parser.py') ap.add_argument('--author', default=DEFAULT_AUTHOR, help='Tracked change author') ap.add_argument('--output', default=None, help='Output path (default: _applied.docx)') args = ap.parse_args() ts_path = Path(args.ts_docx) out_path = Path(args.output) if args.output else ts_path.parent / (ts_path.stem + '_applied.docx') with open(args.manifest, encoding='utf-8') as f: manifest = json.load(f) print(f'Applying {len(manifest)} change(s) from manifest to {ts_path.name}...') n_ok, n_skip, log, n_parsed, n_merged = apply_manifest(ts_path, manifest, out_path, author=args.author) for line in log: print(line) print(f'\nParsed: {n_parsed} body changes (merged to {n_merged} groups) → Applied: {n_ok} Skipped: {n_skip}') print(f'Output: {out_path}') if __name__ == '__main__': main()