Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| ts_applicator.py β Apply a CR change manifest to a TS DOCX as tracked changes. | |
| Reads a JSON manifest produced by cr_parser.py and applies every change | |
| to the target TS using docx_helpers tracked-change primitives. | |
| Usage: | |
| python3 ts_applicator.py <ts.docx> <manifest.json> [--author NAME] [--output path] | |
| # or import: from ts_applicator import apply_manifest | |
| """ | |
| import argparse | |
| import json | |
| import re | |
| import sys | |
| from pathlib import Path | |
| import docx | |
| from docx.oxml import OxmlElement | |
| from docx.oxml.ns import qn | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| _MIN_LEN_ALLCOL_FALLBACK = 8 # old text shorter than this is too ambiguous for any-column search | |
| _WARN_CONF = 0.8 # confidence below this emits WARN instead of OK | |
| from docx_helpers import ( | |
| RevCounter, | |
| tracked_modify_para, | |
| tracked_insert_paras_after, | |
| AUTHOR as DEFAULT_AUTHOR, | |
| DATE as DEFAULT_DATE, | |
| ) | |
| # ββ Text normalisation ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _UNICODE_REPLACEMENTS = ( | |
| ('\xa0', ' '), # non-breaking space | |
| ('\u202f', ' '), # narrow no-break space | |
| ('\u2007', ' '), # figure space | |
| ('\u2060', ''), # word joiner (invisible) | |
| ('\u200b', ''), # zero-width space | |
| ('\u00ad', ''), # soft hyphen (invisible) | |
| ('\u2011', '-'), # non-breaking hyphen | |
| ('\u2013', '-'), # en dash | |
| ('\u2014', '-'), # em dash | |
| ('\u2212', '-'), # minus sign | |
| ('\u2018', "'"), # left single quote | |
| ('\u2019', "'"), # right single quote | |
| ('\u201c', '"'), # left double quote | |
| ('\u201d', '"'), # right double quote | |
| ('\u2026', '...'), # horizontal ellipsis β three dots | |
| ) | |
| def _norm(text): | |
| """Normalise common Unicode invisible/whitespace/punctuation variants for comparison.""" | |
| for old, new in _UNICODE_REPLACEMENTS: | |
| text = text.replace(old, new) | |
| return text.strip() | |
| def _norm_ws(text): | |
| """ | |
| Strip all whitespace for structural matching. | |
| ETSI TS files store structured paragraphs (references, abbreviations, | |
| headings) with a TAB between the code and the body text, e.g.: | |
| '[27]\\tGlobalPlatform: ...' | |
| 'CLT\\tContactLess Tunnelling' | |
| '8.3\\tRAM implementation over HTTPS' | |
| The CR's text extraction concatenates runs directly, losing the tab: | |
| '[27]GlobalPlatform: ...' | |
| 'CLTContactLess Tunnelling' | |
| '8.3RAM implementation over HTTPS' | |
| Removing all whitespace from both sides before comparing solves this. | |
| Used as a third-level fallback (confidence 0.8) after exact and NBSP-norm. | |
| """ | |
| for old, new in _UNICODE_REPLACEMENTS: | |
| text = text.replace(old, new) | |
| return re.sub(r'\s+', '', text) | |
| def _norm_alnum(text): | |
| """Keep only lowercase alphanumeric characters β last-resort matching. | |
| Strips all punctuation, spaces, and Unicode variants so that only the | |
| raw word/number content is compared. Used as a confidence-0.6 fallback | |
| in _find_row when even whitespace-stripped matching fails (e.g. different | |
| bracket styles, quote variants, or punctuation differences between the CR | |
| and the TS). | |
| """ | |
| return re.sub(r'[^a-z0-9]', '', text.lower()) | |
| def _clean_prefix(text: str) -> str: | |
| """Return the longest leading substring that contains only standard printable | |
| ASCII characters (ord 32β126). | |
| Non-breaking spaces, curly quotes, and other Unicode characters embedded | |
| mid-text (e.g. between spec number components like 'TS\xa0102\xa0226') | |
| make the full anchor unmatchable. The clean prefix β the part before the | |
| first such character β is still reliable and specific enough to locate the | |
| correct row. | |
| """ | |
| end = 0 | |
| for ch in text: | |
| if ord(ch) < 32 or ord(ch) > 126: | |
| break | |
| end += 1 | |
| return text[:end].strip() | |
| # ββ Document search helpers βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _full_para_text(para): | |
| """All text content including w:t (normal/inserted) and w:delText (deleted runs).""" | |
| el = para._element | |
| return ''.join(t.text or '' for t in el.findall('.//' + qn('w:t'))) + \ | |
| ''.join(t.text or '' for t in el.findall('.//' + qn('w:delText'))) | |
| def _original_para_text(para): | |
| """Reconstruct paragraph text as it was before tracked changes. | |
| Iterates in document order, keeping: | |
| - w:t runs that are NOT inside a w:ins element (stable text) | |
| - w:delText runs (deleted-but-original text) | |
| Skipping: | |
| - w:t runs inside w:ins (newly inserted text) | |
| This allows anchors that reference original phrasing (e.g. 'SCP81Connection') | |
| to still match after a tracked '1'β'X' replacement has been applied to that | |
| paragraph β where _full_para_text would return the concatenation out of order. | |
| """ | |
| el = para._element | |
| result = [] | |
| for node in el.iter(): | |
| if node.tag == qn('w:t'): | |
| # Skip if this w:t is wrapped in a w:ins element | |
| is_inserted = False | |
| for anc in node.iterancestors(): | |
| if anc is el: | |
| break | |
| if anc.tag == qn('w:ins'): | |
| is_inserted = True | |
| break | |
| if not is_inserted: | |
| result.append(node.text or '') | |
| elif node.tag == qn('w:delText'): | |
| result.append(node.text or '') | |
| return ''.join(result) | |
| def _match_paragraphs(paragraphs, search_text, prefer_not_in_table=False): | |
| """Core 5-tier matching logic. Operates on any iterable of Paragraph objects. | |
| Returns (para, confidence) or (None, 0.0).""" | |
| norm_search = _norm(search_text) | |
| ws_search = _norm_ws(search_text) | |
| candidates_exact = [] | |
| candidates_norm = [] | |
| candidates_ws = [] | |
| candidates_orig = [] | |
| candidates_del = [] | |
| for para in paragraphs: | |
| pt = para.text | |
| if search_text in pt: | |
| candidates_exact.append(para) | |
| elif norm_search and norm_search in _norm(pt): | |
| candidates_norm.append(para) | |
| elif ws_search and ws_search in _norm_ws(pt): | |
| candidates_ws.append(para) | |
| else: | |
| orig_pt = _original_para_text(para) | |
| if (search_text in orig_pt | |
| or (norm_search and norm_search in _norm(orig_pt))): | |
| candidates_orig.append(para) | |
| elif ws_search and ws_search in _norm_ws(orig_pt): | |
| candidates_orig.append(para) | |
| else: | |
| full_pt = _full_para_text(para) | |
| if search_text in full_pt: | |
| candidates_del.append(para) | |
| elif ws_search and ws_search in _norm_ws(full_pt): | |
| candidates_del.append(para) | |
| def _in_table(para): | |
| p = para._element | |
| return any(a.tag == qn('w:tc') for a in p.iterancestors()) | |
| if not prefer_not_in_table: | |
| for pool, conf in [(candidates_exact, 1.0), (candidates_norm, 0.9), | |
| (candidates_ws, 0.8), (candidates_orig, 0.7), | |
| (candidates_del, 0.6)]: | |
| if pool: | |
| return pool[0], conf | |
| return None, 0.0 | |
| best_table_match = (None, 0.0) | |
| for pool, conf in [(candidates_exact, 1.0), (candidates_norm, 0.9), | |
| (candidates_ws, 0.8), (candidates_orig, 0.7), | |
| (candidates_del, 0.6)]: | |
| if not pool: | |
| continue | |
| body_only = [p for p in pool if not _in_table(p)] | |
| if body_only: | |
| return body_only[0], conf | |
| if best_table_match[0] is None: | |
| best_table_match = (pool[0], conf) | |
| return best_table_match if best_table_match[0] is not None else (None, 0.0) | |
| def _find_para(doc, search_text, prefer_not_in_table=False): | |
| """Find the first paragraph containing search_text across the entire doc. | |
| Five-tier matching (see _match_paragraphs). Returns (para, confidence).""" | |
| return _match_paragraphs(doc.paragraphs, search_text, prefer_not_in_table) | |
| # ββ Section-aware anchor search βββββββββββββββββββββββββββββββββββββββββββββββ | |
| _HEADING_NUM_RE = re.compile(r'^(\d+(?:\.\d+)*)\s+\S') | |
| def _para_heading_number(para): | |
| """Dotted section number if this paragraph is a real TS heading, else None. | |
| Requires the paragraph style to start with 'Heading' (case-insensitive) β this | |
| rejects false positives from TOC entries (style 'toc N'), address lines in the | |
| front matter (style 'FP'), change history labels (style 'B3'), etc. ETSI/3GPP | |
| TS documents always style real headings as 'Heading 1'..'Heading N'.""" | |
| style_name = (para.style.name if para.style is not None else '') or '' | |
| if not style_name.lower().startswith('heading'): | |
| return None | |
| m = _HEADING_NUM_RE.match(para.text.strip()) | |
| return m.group(1) if m else None | |
| def _is_descendant_section(child, parent): | |
| """True if `child` is `parent` or nested under it (by dotted-prefix).""" | |
| return child == parent or child.startswith(parent + '.') | |
| def _section_range(doc, target): | |
| """Return (start_idx, end_idx) into doc.paragraphs spanning the target section. | |
| start = index of the heading whose number == target. | |
| end = index of the next heading whose number is NOT a descendant of target | |
| (or len(doc.paragraphs) if none). | |
| Returns (None, None) if target heading not found. Recomputed per-call.""" | |
| paras = doc.paragraphs | |
| start = None | |
| for i, p in enumerate(paras): | |
| n = _para_heading_number(p) | |
| if n is None: | |
| continue | |
| if start is None and n == target: | |
| start = i | |
| continue | |
| if start is not None and not _is_descendant_section(n, target): | |
| return (start, i) | |
| return (start, len(paras)) if start is not None else (None, None) | |
| def _enclosing_heading(doc, para): | |
| """Walk backward from para to the first preceding heading paragraph. | |
| Returns the heading Paragraph or None. Used for HINT lines.""" | |
| paras = doc.paragraphs | |
| target_elem = para._element | |
| start_idx = None | |
| for i, p in enumerate(paras): | |
| if p._element is target_elem: | |
| start_idx = i | |
| break | |
| if start_idx is None: | |
| return None | |
| for i in range(start_idx, -1, -1): | |
| if _para_heading_number(paras[i]) is not None: | |
| return paras[i] | |
| return None | |
| def _find_para_in_section(doc, search_text, section_number, prefer_not_in_table=False): | |
| """Section-restricted _find_para. Returns (para, conf, status) where | |
| status β {"in_section", "no_section"}. On no_section, caller should | |
| fall back to global _find_para with a WARN log line.""" | |
| if not section_number: | |
| return (None, 0.0, 'no_section') | |
| start, end = _section_range(doc, section_number) | |
| if start is None: | |
| return (None, 0.0, 'no_section') | |
| para, conf = _match_paragraphs(doc.paragraphs[start:end], search_text, | |
| prefer_not_in_table) | |
| return (para, conf, 'in_section') | |
| def _find_para_with_section(doc, search_text, section_number, kind_label, log, | |
| prefer_not_in_table=False): | |
| """Section-aware anchor search with WARN/ERROR logging. | |
| Behaviour: | |
| * section_number present + found in TS + anchor in range β return (para, conf). | |
| * section_number present + not in TS β WARN, fall back to global _find_para. | |
| * section_number present + anchor NOT in range β ERROR + HINT, return (None, 0). | |
| * section_number missing β WARN, fall back to global _find_para. | |
| Logs go to `log` (list of str).""" | |
| if section_number: | |
| para, conf, status = _find_para_in_section( | |
| doc, search_text, section_number, prefer_not_in_table) | |
| if status == 'in_section' and para is not None: | |
| return para, conf | |
| if status == 'no_section': | |
| log.append(f" WARN section '{section_number}' not found in TS β falling back to global search") | |
| return _find_para(doc, search_text, prefer_not_in_table) | |
| # in_section but anchor absent β check global for HINT | |
| g_para, _ = _find_para(doc, search_text, prefer_not_in_table) | |
| if g_para is not None: | |
| enc = _enclosing_heading(doc, g_para) | |
| actual = _para_heading_number(enc) if enc is not None else '?' | |
| log.append(f" ERROR {kind_label}: anchor {search_text[:60]!r} declared in section " | |
| f"{section_number} but found in section {actual}") | |
| log.append(f" HINT nearest match: {g_para.text[:120]!r}") | |
| else: | |
| log.append(f" ERROR {kind_label}: anchor {search_text[:60]!r} not found in section " | |
| f"{section_number} (or anywhere)") | |
| return None, 0.0 | |
| log.append(f" WARN no section_number on change β global anchor search for {search_text[:60]!r}") | |
| return _find_para(doc, search_text, prefer_not_in_table) | |
| def _find_table_by_section(doc, section_heading): | |
| """ | |
| Find the table immediately following a paragraph that contains section_heading. | |
| Checks both w:t (plain/inserted) and w:delText (tracked-deleted) so the match | |
| survives even after the heading was wrapped in a tracked deletion. | |
| Empty paragraphs between the heading and the table are tolerated. | |
| Returns (table, confidence) or (None, 0.0). | |
| """ | |
| if not section_heading: | |
| return None, 0.0 | |
| norm_h = _norm(section_heading) | |
| ws_h = _norm_ws(section_heading) | |
| heading_seen = False | |
| for element in doc.element.body: | |
| tag = element.tag.split('}')[-1] if '}' in element.tag else element.tag | |
| if tag == 'p': | |
| t_text = ''.join(t.text or '' for t in element.findall('.//' + qn('w:t'))) | |
| d_text = ''.join(t.text or '' for t in element.findall('.//' + qn('w:delText'))) | |
| full = (t_text + d_text).strip() | |
| if not full: | |
| continue # skip empty paras, keep heading_seen state | |
| if (section_heading in full | |
| or norm_h in _norm(full) | |
| or ws_h in _norm_ws(full)): | |
| heading_seen = True | |
| else: | |
| heading_seen = False # non-matching non-empty para resets | |
| elif tag == 'tbl': | |
| if heading_seen: | |
| for tbl in doc.tables: | |
| if tbl._tbl is element: | |
| return tbl, 1.0 | |
| heading_seen = False | |
| return None, 0.0 | |
| def _find_table(doc, header_key): | |
| """ | |
| Find a table whose first row cell texts start with header_key. | |
| Returns (table, confidence) or (None, 0.0). | |
| """ | |
| norm_key = [_norm(h) for h in header_key] | |
| for tbl in doc.tables: | |
| if not tbl.rows: | |
| continue | |
| for row in tbl.rows[:3]: # check first 3 rows β header may not be row 0 | |
| row_texts = [_norm(c.text) for c in row.cells] | |
| match = all( | |
| i < len(row_texts) and norm_key[i] in row_texts[i] | |
| for i in range(len(norm_key)) | |
| ) | |
| if match: | |
| return tbl, 1.0 | |
| return None, 0.0 | |
| def _disambiguate_by_context(all_rows, candidates, context_rows_before): | |
| """Pick the candidate whose preceding rows best match context_rows_before. | |
| context_rows_before: list of expected col-0 texts, closest-first. | |
| Returns the best candidate index; falls back to candidates[0] on tie.""" | |
| best_score, best_idx = -1, candidates[0] | |
| for idx in candidates: | |
| score = 0 | |
| for depth, expected in enumerate(context_rows_before, start=1): | |
| ctx_idx = idx - depth | |
| if ctx_idx < 0 or not expected: | |
| continue | |
| cell0 = all_rows[ctx_idx].cells[0].text if all_rows[ctx_idx].cells else '' | |
| if _norm(expected) in _norm(cell0) or _norm_ws(expected) in _norm_ws(cell0): | |
| score += 1 | |
| if score > best_score: | |
| best_score, best_idx = score, idx | |
| return best_idx | |
| def _find_row(tbl, anchor_text, context_rows_before=None): | |
| """ | |
| Find first row in tbl where col-0 cell text contains anchor_text. | |
| Returns (row_idx, confidence) or (-1, 0.0). | |
| When context_rows_before is provided and multiple rows match, uses the | |
| col-0 texts of the rows preceding each candidate to disambiguate. | |
| Matching levels, in order of confidence: | |
| 1.0 β exact substring match | |
| 0.9 β Unicode-normalised match (_norm: xa0, dashes, quotes, β¦) | |
| 0.8 β whitespace-stripped match (_norm_ws: also removes tabs/newlines) | |
| 0.6 β alphanumeric-only match (_norm_alnum: strips all non a-z0-9) | |
| 0.55 β clean-prefix unique match: extract the leading ASCII-only part of | |
| the anchor and find the single row that contains it. | |
| 0.5 β clean-prefix + token-overlap: when multiple rows share the prefix, | |
| pick the one whose col-0 tokens overlap most with the anchor tokens. | |
| """ | |
| all_rows = list(tbl.rows) | |
| norm_anchor = _norm(anchor_text) | |
| ws_anchor = _norm_ws(anchor_text) | |
| alnum_anchor = _norm_alnum(anchor_text) | |
| for match_fn, conf in [ | |
| (lambda c: anchor_text in c, 1.0), | |
| (lambda c: bool(norm_anchor) and norm_anchor in _norm(c), 0.9), | |
| (lambda c: bool(ws_anchor) and ws_anchor in _norm_ws(c), 0.8), | |
| (lambda c: bool(alnum_anchor) and alnum_anchor in _norm_alnum(c), 0.6), | |
| ]: | |
| candidates = [ | |
| idx for idx, row in enumerate(all_rows) | |
| if row.cells and match_fn(row.cells[0].text) | |
| ] | |
| if not candidates: | |
| continue | |
| if len(candidates) == 1 or not context_rows_before: | |
| return candidates[0], conf | |
| return _disambiguate_by_context(all_rows, candidates, context_rows_before), conf | |
| # ββ Prefix-based partial match βββββββββββββββββββββββββββββββββββββββββββββ | |
| prefix = _clean_prefix(anchor_text) | |
| if prefix and len(prefix) > 8: | |
| prefix_low = prefix.lower() | |
| hits = [ | |
| idx for idx, row in enumerate(all_rows) | |
| if row.cells and prefix_low in row.cells[0].text.lower() | |
| ] | |
| if len(hits) == 1: | |
| return hits[0], 0.55 | |
| elif len(hits) > 1: | |
| anchor_tokens = set(re.findall(r'[a-z0-9]+', anchor_text.lower())) | |
| best_score, best_idx = -1, -1 | |
| for hit_idx in hits: | |
| cell_tokens = set(re.findall(r'[a-z0-9]+', | |
| all_rows[hit_idx].cells[0].text.lower())) | |
| score = len(anchor_tokens & cell_tokens) | |
| if score > best_score: | |
| best_score, best_idx = score, hit_idx | |
| if best_idx >= 0: | |
| return best_idx, 0.5 | |
| return (-1, 0.0) | |
| # ββ vMerge row insertion ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_new_tr(cells_data, rev, author, date): | |
| """ | |
| Build and return a new tracked-insert <w:tr> element (does NOT insert it). | |
| cells_data: list of dicts with keys: text, width, vmerge, style. | |
| """ | |
| def _ins_attr(): | |
| return {qn('w:id'): rev.next(), qn('w:author'): author, qn('w:date'): date} | |
| def _make_t(text, tag='w:t'): | |
| t = OxmlElement(tag) | |
| t.text = text or '' | |
| if text and (text[0] in (' ', '\t') or text[-1] in (' ', '\t')): | |
| t.set('{http://www.w3.org/XML/1998/namespace}space', 'preserve') | |
| return t | |
| def _make_run(text): | |
| r = OxmlElement('w:r') | |
| r.append(_make_t(text)) | |
| return r | |
| new_tr = OxmlElement('w:tr') | |
| # trPr: tracked row insertion | |
| trPr = OxmlElement('w:trPr') | |
| tr_ins = OxmlElement('w:ins') | |
| for k, v in _ins_attr().items(): | |
| tr_ins.set(k, v) | |
| trPr.append(tr_ins) | |
| new_tr.append(trPr) | |
| for cd in cells_data: | |
| tc = OxmlElement('w:tc') | |
| tcPr = OxmlElement('w:tcPr') | |
| tcW = OxmlElement('w:tcW') | |
| if cd.get('width'): | |
| tcW.set(qn('w:w'), str(cd['width'])) | |
| tcW.set(qn('w:type'), 'dxa') | |
| tcPr.append(tcW) | |
| if cd.get('vmerge'): | |
| vm = OxmlElement('w:vMerge') | |
| tcPr.append(vm) | |
| tc.append(tcPr) | |
| p = OxmlElement('w:p') | |
| pPr = OxmlElement('w:pPr') | |
| if cd.get('style'): | |
| pStyle = OxmlElement('w:pStyle') | |
| pStyle.set(qn('w:val'), cd['style']) | |
| pPr.append(pStyle) | |
| rPr_para = OxmlElement('w:rPr') | |
| pm_ins = OxmlElement('w:ins') | |
| for k, v in _ins_attr().items(): | |
| pm_ins.set(k, v) | |
| rPr_para.append(pm_ins) | |
| pPr.append(rPr_para) | |
| p.append(pPr) | |
| if cd.get('text') and not cd.get('vmerge'): | |
| ins_el = OxmlElement('w:ins') | |
| for k, v in _ins_attr().items(): | |
| ins_el.set(k, v) | |
| ins_el.append(_make_run(cd['text'])) | |
| p.append(ins_el) | |
| tc.append(p) | |
| new_tr.append(tc) | |
| return new_tr | |
| def _insert_vmerge_row(tbl, after_row_idx, cells_data, rev, author, date): | |
| """ | |
| Insert a tracked row after row[after_row_idx]. | |
| cells_data: list of dicts with keys: text, width, vmerge, style. | |
| Returns the inserted <w:tr> element. | |
| """ | |
| new_tr = _build_new_tr(cells_data, rev, author, date) | |
| ref_tr = tbl.rows[after_row_idx]._tr | |
| ref_tr.addnext(new_tr) | |
| return new_tr | |
| # ββ Section replace (direct XML transplant) βββββββββββββββββββββββββββββββββββ | |
| def _apply_section_replace(doc, change, rev, author, date, log): | |
| """ | |
| Transplant a block of CR elements (del section + ins section) directly into | |
| the TS, replacing the old heading+table at the matching location. | |
| This mirrors what Word does on copy-paste: the exact XML from the CR is | |
| cloned into the TS, with only the tracked-change revision IDs remapped to | |
| avoid conflicts. | |
| """ | |
| from lxml import etree | |
| import copy | |
| loc = change['location'] | |
| del_heading = loc.get('del_heading', '') | |
| has_del_table = loc.get('has_del_table', False) | |
| section_number = loc.get('section_number', '') | |
| elements_xml = change.get('elements_xml', []) | |
| if not elements_xml: | |
| log.append(' SKIP section_replace: no elements in manifest') | |
| return False | |
| # ββ Resolve search scope: restrict to declared section if possible βββββββββ | |
| search_paras = doc.paragraphs | |
| section_status = 'no_section_required' | |
| if section_number: | |
| start, end = _section_range(doc, section_number) | |
| if start is not None: | |
| search_paras = doc.paragraphs[start:end] | |
| section_status = 'in_section' | |
| else: | |
| log.append(f" WARN section '{section_number}' not found in TS β falling back to global search") | |
| section_status = 'section_not_in_ts' | |
| else: | |
| log.append(" WARN no section_number on section_replace β global search") | |
| # ββ Find the TS paragraph that matches the deleted heading βββββββββββββββββ | |
| ts_para_elem = None | |
| insert_after_anchor = False # when True: insert after anchor, don't delete it | |
| if del_heading: | |
| for para in search_paras: | |
| pt = para.text | |
| if del_heading in pt or _norm(del_heading) in _norm(pt): | |
| ts_para_elem = para._element | |
| break | |
| if ts_para_elem is None: | |
| # Fallback: include paragraphs whose XML text (inc. del runs) matches | |
| for para in search_paras: | |
| if del_heading in _full_para_text(para): | |
| ts_para_elem = para._element | |
| break | |
| else: | |
| # No heading to delete β use anchor_text to find insertion point | |
| anchor_text = loc.get('anchor_text', '') | |
| if anchor_text: | |
| if section_status == 'in_section': | |
| anchor_para, _, _ = _find_para_in_section( | |
| doc, anchor_text, section_number) | |
| else: | |
| anchor_para, _ = _find_para(doc, anchor_text) | |
| if anchor_para is not None: | |
| ts_para_elem = anchor_para._element | |
| insert_after_anchor = True | |
| if ts_para_elem is None: | |
| # Section mismatch check: if declared section exists, but del_heading | |
| # is found GLOBALLY in a different section, report that. | |
| if section_status == 'in_section' and del_heading: | |
| for para in doc.paragraphs: | |
| pt = para.text | |
| if del_heading in pt or del_heading in _full_para_text(para): | |
| enc = _enclosing_heading(doc, para) | |
| actual = _para_heading_number(enc) if enc is not None else '?' | |
| log.append(f' ERROR section_replace: del_heading {del_heading!r} declared in section ' | |
| f'{section_number} but found in section {actual}') | |
| log.append(f" HINT nearest match: {para.text[:120]!r}") | |
| return False | |
| log.append(f' ERROR section_replace: del_heading {del_heading!r} not found in TS') | |
| tokens = del_heading.split()[:3] if del_heading else [] | |
| if tokens: | |
| _hints = sorted( | |
| [p for p in doc.paragraphs if any(tok in p.text for tok in tokens)], | |
| key=lambda p: -len(set(del_heading.split()) & set(p.text.split())) | |
| )[:3] | |
| for _h in _hints: | |
| log.append(f" HINT nearest match: {_h.text[:120]!r}") | |
| return False | |
| ts_body = ts_para_elem.getparent() | |
| # ββ Find the table immediately after the heading (if applicable) βββββββββββ | |
| ts_tbl_elem = None | |
| if has_del_table: | |
| found_para = False | |
| for sib in ts_body: | |
| if sib is ts_para_elem: | |
| found_para = True | |
| continue | |
| if not found_para: | |
| continue | |
| sib_tag = sib.tag.split('}')[-1] if '}' in sib.tag else sib.tag | |
| if sib_tag == 'p': | |
| # Allow empty paragraphs between heading and table | |
| if not (''.join(t.text or '' for t in sib.findall('.//' + qn('w:t')))).strip(): | |
| continue | |
| break # non-empty paragraph before table β no table to remove | |
| elif sib_tag == 'tbl': | |
| ts_tbl_elem = sib | |
| break | |
| else: | |
| break | |
| # Validate the candidate table matches what the CR says should be deleted | |
| if ts_tbl_elem is not None and elements_xml: | |
| cr_tbl_xmls = [x for x in elements_xml if '<w:tbl' in x] | |
| if cr_tbl_xmls: | |
| from lxml import etree as _etree | |
| cr_tbl_el = _etree.fromstring(cr_tbl_xmls[0].encode()) | |
| cr_hdr = ''.join(t.text or '' for t in | |
| cr_tbl_el.findall('.//' + qn('w:t'))[:10]).lower() | |
| ts_hdr = ''.join(t.text or '' for t in | |
| ts_tbl_elem.findall('.//' + qn('w:t'))[:10]).lower() | |
| if cr_hdr and cr_hdr not in ts_hdr and ts_hdr not in cr_hdr: | |
| log.append(' WARN section_replace: candidate table header mismatch' | |
| ' β skipping table removal') | |
| ts_tbl_elem = None | |
| # ββ Clone and remap IDs on the CR elements βββββββββββββββββββββββββββββββββ | |
| cloned = [] | |
| for xml_str in elements_xml: | |
| elem = etree.fromstring(xml_str) | |
| cloned_elem = copy.deepcopy(elem) | |
| # Remap w:id in all tracked-change elements (must be unique per document) | |
| for el in cloned_elem.iter(): | |
| if el.get(qn('w:id')) is not None: | |
| el.set(qn('w:id'), rev.next()) | |
| cloned.append(cloned_elem) | |
| # ββ Insert cloned elements before (or after) the anchor paragraph βββββββββ | |
| insert_idx = list(ts_body).index(ts_para_elem) | |
| if insert_after_anchor: | |
| insert_idx += 1 # insert after anchor, not before it | |
| for i, elem in enumerate(cloned): | |
| ts_body.insert(insert_idx + i, elem) | |
| # ββ Remove the now-replaced TS elements (only when a heading was deleted) ββ | |
| if not insert_after_anchor: | |
| ts_body.remove(ts_para_elem) | |
| if ts_tbl_elem is not None: | |
| ts_body.remove(ts_tbl_elem) | |
| n_del = sum(1 for x in elements_xml if 'w:del' in x[:200]) | |
| log.append( | |
| f' OK section_replace: {del_heading!r} β {len(elements_xml)} element(s) spliced in' | |
| f' (removed heading{"+ table" if has_del_table else ""})' | |
| ) | |
| return True | |
| # ββ Per-change-type applicators βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _apply_text_replace(doc, change, rev, author, date, log): | |
| loc = change['location'] | |
| old = change['old'] | |
| new = change['new'] | |
| if loc['kind'] == 'table_cell': | |
| tbl, t_conf = _find_table(doc, loc['table_header']) | |
| if tbl is None: | |
| log.append(f" ERROR text_replace: table not found {loc['table_header'][:2]!r}") | |
| return False | |
| col_idx = loc['col_idx'] | |
| row_anchor = loc['row_anchor'] | |
| if row_anchor: | |
| row_idx, r_conf = _find_row(tbl, row_anchor) | |
| if row_idx < 0: | |
| # Primary table doesn't contain this row anchor β the CR may be | |
| # targeting a different table than the one _find_table resolved. | |
| # Try every other table in the document before giving up. | |
| for alt_tbl in doc.tables: | |
| if alt_tbl is tbl: | |
| continue | |
| row_idx, r_conf = _find_row(alt_tbl, row_anchor) | |
| if row_idx >= 0: | |
| tbl = alt_tbl | |
| break | |
| if row_idx < 0: | |
| log.append(f" ERROR text_replace: row anchor not found {row_anchor!r}") | |
| return False | |
| row = tbl.rows[row_idx] | |
| if col_idx >= len(row.cells): | |
| log.append(f" ERROR text_replace: col_idx {col_idx} out of range") | |
| return False | |
| cell = row.cells[col_idx] | |
| for para in cell.paragraphs: | |
| if old in para.text: | |
| tracked_modify_para(para, old, new, rev, author, date) | |
| _pfx = 'WARN' if min(t_conf, r_conf) < _WARN_CONF else 'OK ' | |
| log.append(f" {_pfx} text_replace (table_cell" | |
| f" t_conf={t_conf:.1f} r_conf={r_conf:.1f}" | |
| f" row={row_idx} col={col_idx}): {old!r} β {new!r}") | |
| return True | |
| log.append(f" ERROR text_replace: old text {old!r} not in cell (row={row_idx} col={col_idx})") | |
| return False | |
| else: | |
| # Empty row anchor: scan all rows in col_idx. | |
| # Prefer the table that follows the section heading (e.g. "Thirty fifth byte:") | |
| # because all-empty table headers match any table. | |
| section_heading = loc.get('section_heading', '') | |
| tbl_by_section, _ = _find_table_by_section(doc, section_heading) | |
| if tbl_by_section is not None: | |
| tables_to_try = [tbl_by_section] + [t for t in doc.tables if t is not tbl_by_section] | |
| else: | |
| tables_to_try = [tbl] + [t for t in doc.tables if t is not tbl] | |
| for search_tbl in tables_to_try: | |
| for r_idx, row in enumerate(search_tbl.rows): | |
| if col_idx >= len(row.cells): | |
| continue | |
| cell = row.cells[col_idx] | |
| for para in cell.paragraphs: | |
| if old in para.text: | |
| tracked_modify_para(para, old, new, rev, author, date) | |
| log.append(f" OK text_replace (table_cell scan row={r_idx} col={col_idx}): {old!r} β {new!r}") | |
| return True | |
| # Final fallback: scan ALL columns of ALL tables (guarded by min length) | |
| if len(old) < _MIN_LEN_ALLCOL_FALLBACK: | |
| log.append(f" ERROR text_replace: {old!r} too short for all-column fallback" | |
| f" (ambiguous β skipped)") | |
| return False | |
| _all_start = tbl_by_section if tbl_by_section is not None else tbl | |
| for search_tbl in [_all_start] + [t for t in doc.tables if t is not _all_start]: | |
| for r_idx, row in enumerate(search_tbl.rows): | |
| for c_idx, cell in enumerate(row.cells): | |
| for para in cell.paragraphs: | |
| if old in para.text: | |
| tracked_modify_para(para, old, new, rev, author, date) | |
| log.append(f" WARN text_replace (table_cell any_col" | |
| f" row={r_idx} col={c_idx} β low confidence):" | |
| f" {old!r} β {new!r}") | |
| return True | |
| log.append(f" ERROR text_replace: old text {old!r} not found in any table column") | |
| return False | |
| elif loc['kind'] == 'body_para': | |
| ctx = loc.get('para_context', '') | |
| section_number = loc.get('section_number', '') | |
| if len(old) < 4 and ctx: | |
| # Short old text matches too broadly (e.g. a single digit would hit | |
| # the title paragraph). Locate by context first, then verify old | |
| # text is present in that paragraph. | |
| para, conf = _find_para_with_section( | |
| doc, ctx, section_number, 'text_replace', log, prefer_not_in_table=True) | |
| if para is None or old not in para.text: | |
| para = None | |
| else: | |
| para, conf = _find_para_with_section( | |
| doc, old, section_number, 'text_replace', log, prefer_not_in_table=True) | |
| if para is None and ctx: | |
| para, conf = _find_para_with_section( | |
| doc, ctx, section_number, 'text_replace', log, prefer_not_in_table=True) | |
| if para is None: | |
| log.append(f" ERROR text_replace: old text {old!r} not found in TS") | |
| return False | |
| if old in para.text: | |
| tracked_modify_para(para, old, new, rev, author, date) | |
| log.append(f" OK text_replace (body_para conf={conf:.1f}): {old!r} β {new!r}") | |
| return True | |
| log.append(f" ERROR text_replace: old text {old!r} not in resolved paragraph") | |
| return False | |
| log.append(f" ERROR text_replace: unknown kind {loc['kind']!r}") | |
| return False | |
| def _apply_para_insert(doc, change, rev, author, date, log): | |
| loc = change['location'] | |
| anchor_text = loc.get('anchor_text', '') | |
| section_number = loc.get('section_number', '') | |
| paras_data = change.get('paragraphs', []) | |
| if not paras_data: | |
| return True | |
| anchor_para, conf = _find_para_with_section( | |
| doc, anchor_text, section_number, 'para_insert', log) | |
| if anchor_para is None: | |
| # When no section_number context, emit the legacy ERROR + HINT lines | |
| if not section_number: | |
| log.append(f" ERROR para_insert: anchor not found {anchor_text[:60]!r}") | |
| tokens = anchor_text.split()[:3] | |
| _hints = sorted( | |
| [p for p in doc.paragraphs if any(tok in p.text for tok in tokens)], | |
| key=lambda p: -len(set(anchor_text.split()) & set(p.text.split())) | |
| )[:3] | |
| for _h in _hints: | |
| log.append(f" HINT nearest match: {_h.text[:120]!r}") | |
| return False | |
| items = [(p['text'], p['style'] or 'Normal') for p in paras_data] | |
| tracked_insert_paras_after(anchor_para, items, rev, author, date) | |
| first_text = paras_data[0]['text'][:50] if paras_data else '' | |
| log.append(f" OK para_insert ({len(paras_data)} para(s) after anchor conf={conf:.1f}): {first_text!r}...") | |
| return True | |
| def _apply_row_insert(doc, change, rev, author, date, log, last_inserted=None): | |
| loc = change['location'] | |
| # Prefer table located by section heading (handles ambiguous all-empty headers) | |
| section_heading = loc.get('section_heading', '') | |
| tbl_by_section, _ = _find_table_by_section(doc, section_heading) | |
| if tbl_by_section is not None: | |
| tbl = tbl_by_section | |
| t_conf = 1.0 | |
| else: | |
| tbl, t_conf = _find_table(doc, loc['table_header']) | |
| if tbl is None: | |
| log.append(f" ERROR row_insert: table not found {loc['table_header'][:2]!r}") | |
| return False | |
| after_anchor = loc.get('after_row_anchor', '') | |
| context_rows_before = loc.get('context_rows_before', []) | |
| row_idx, r_conf = _find_row(tbl, after_anchor, context_rows_before) | |
| if row_idx < 0: | |
| log.append(f" ERROR row_insert: anchor row not found {after_anchor!r}") | |
| return False | |
| cells_data = change.get('cells', []) | |
| # Fix insertion ordering: when multiple rows target the same (tbl, row_idx), | |
| # each new row should go AFTER the previously inserted one, not after row_idx. | |
| # last_inserted maps (tbl._tbl id, row_idx) β last w:tr element inserted there. | |
| key = (id(tbl._tbl), row_idx) | |
| if last_inserted is not None and key in last_inserted: | |
| # Insert after the previously inserted row to maintain forward order | |
| prev_tr = last_inserted[key] | |
| new_tr = _build_new_tr(cells_data, rev, author, date) | |
| prev_tr.addnext(new_tr) | |
| last_inserted[key] = new_tr | |
| else: | |
| new_tr = _insert_vmerge_row(tbl, row_idx, cells_data, rev, author, date) | |
| if last_inserted is not None: | |
| last_inserted[key] = new_tr | |
| desc = cells_data[1]['text'] if len(cells_data) > 1 else '?' | |
| _pfx = 'WARN' if min(t_conf, r_conf) < _WARN_CONF else 'OK ' | |
| log.append(f" {_pfx} row_insert (t_conf={t_conf:.1f} r_conf={r_conf:.1f})" | |
| f" after row[{row_idx}] ({after_anchor!r}): {desc!r}") | |
| return True | |
| # ββ Manifest pre-processing βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _merge_para_inserts(manifest): | |
| """ | |
| Merge consecutive para_insert entries that share the same anchor_text. | |
| When the CR parser emits multiple para_insert entries for the same anchor | |
| (because [...] context markers were transparent and kept prev_stable_text | |
| unchanged), each would call tracked_insert_paras_after independently. | |
| Since each call starts from the same anchor element and uses addnext(), | |
| later groups push earlier groups down β producing reversed order. | |
| Merging them into one entry ensures a single tracked_insert_paras_after | |
| call that inserts all paragraphs in the correct forward order. | |
| """ | |
| result = [] | |
| for change in manifest: | |
| if (change.get('type') == 'para_insert' | |
| and result | |
| and result[-1].get('type') == 'para_insert' | |
| and result[-1]['location']['anchor_text'] == change['location']['anchor_text']): | |
| result[-1]['paragraphs'].extend(change['paragraphs']) | |
| else: | |
| merged = dict(change) | |
| if change.get('type') == 'para_insert': | |
| merged['paragraphs'] = list(change['paragraphs']) | |
| result.append(merged) | |
| return result | |
| # ββ Main apply function βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def apply_manifest(ts_path, manifest, out_path, author=DEFAULT_AUTHOR, date=DEFAULT_DATE): | |
| """ | |
| Apply all changes in manifest to ts_path, save to out_path. | |
| Returns (n_ok, n_skipped, log_lines, n_parsed, n_merged_groups). | |
| """ | |
| doc = docx.Document(str(ts_path)) | |
| rev = RevCounter(doc) | |
| log = [] | |
| n_ok = 0 | |
| n_skip = 0 | |
| n_parsed = len(manifest) | |
| manifest = _merge_para_inserts(manifest) | |
| n_merged = len(manifest) | |
| # Track last inserted <w:tr> per (tbl_id, anchor_row_idx) to maintain | |
| # forward insertion order when multiple row_inserts target the same anchor. | |
| last_inserted = {} | |
| for change in manifest: | |
| ctype = change.get('type') | |
| ok = False | |
| if ctype == 'section_replace': | |
| ok = _apply_section_replace(doc, change, rev, author, date, log) | |
| elif ctype == 'text_replace': | |
| ok = _apply_text_replace(doc, change, rev, author, date, log) | |
| elif ctype == 'para_insert': | |
| ok = _apply_para_insert(doc, change, rev, author, date, log) | |
| elif ctype == 'row_insert': | |
| ok = _apply_row_insert(doc, change, rev, author, date, log, last_inserted=last_inserted) | |
| else: | |
| log.append(f" SKIP unknown change type: {ctype!r}") | |
| if ok: | |
| n_ok += 1 | |
| else: | |
| n_skip += 1 | |
| doc.save(str(out_path)) | |
| return n_ok, n_skip, log, n_parsed, n_merged | |
| # ββ CLI βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(): | |
| ap = argparse.ArgumentParser(description='Apply CR manifest to TS DOCX as tracked changes.') | |
| ap.add_argument('ts_docx', help='Target TS DOCX file') | |
| ap.add_argument('manifest', help='JSON manifest from cr_parser.py') | |
| ap.add_argument('--author', default=DEFAULT_AUTHOR, help='Tracked change author') | |
| ap.add_argument('--output', default=None, help='Output path (default: <ts>_applied.docx)') | |
| args = ap.parse_args() | |
| ts_path = Path(args.ts_docx) | |
| out_path = Path(args.output) if args.output else ts_path.parent / (ts_path.stem + '_applied.docx') | |
| with open(args.manifest, encoding='utf-8') as f: | |
| manifest = json.load(f) | |
| print(f'Applying {len(manifest)} change(s) from manifest to {ts_path.name}...') | |
| n_ok, n_skip, log, n_parsed, n_merged = apply_manifest(ts_path, manifest, out_path, author=args.author) | |
| for line in log: | |
| print(line) | |
| print(f'\nParsed: {n_parsed} body changes (merged to {n_merged} groups) β Applied: {n_ok} Skipped: {n_skip}') | |
| print(f'Output: {out_path}') | |
| if __name__ == '__main__': | |
| main() | |