File size: 21,937 Bytes
7eedaf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b96123
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7eedaf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b96123
7eedaf8
 
 
 
 
 
 
 
 
2b96123
 
7eedaf8
 
 
 
2b96123
 
 
 
 
 
7eedaf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b96123
7eedaf8
2b96123
7eedaf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b96123
7eedaf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b96123
7eedaf8
 
 
 
 
 
 
 
 
b5fc740
7eedaf8
 
 
 
 
 
 
 
 
 
 
 
 
b5fc740
 
 
 
 
 
 
 
 
 
 
7eedaf8
 
 
 
 
 
 
 
 
 
 
 
 
 
2b96123
7eedaf8
 
 
b5fc740
 
 
 
 
 
 
 
7eedaf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b96123
7eedaf8
 
 
 
 
 
 
 
 
2b96123
 
 
 
7eedaf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b96123
7eedaf8
 
 
 
 
 
2b96123
7eedaf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b96123
 
7eedaf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
#!/usr/bin/env python3
"""
cr_parser.py β€” Parse a CR DOCX's tracked changes into a JSON manifest.

Each entry in the manifest is one of:
  {"type": "text_replace",  "location": {...}, "old": "...", "new": "..."}
  {"type": "para_insert",   "location": {...}, "paragraphs": [...]}
  {"type": "row_insert",    "location": {...}, "cells": [...]}

Usage:
    python3 cr_parser.py <cr.docx> [--output manifest.json]
    # or import: from cr_parser import parse_cr
"""

import argparse
import json
import re
import sys
from pathlib import Path

import docx
from docx.oxml.ns import qn


# ── Low-level text helpers ────────────────────────────────────────────────────

def _del_text(elem):
    """Concatenate all w:delText descendants."""
    return ''.join(t.text or '' for t in elem.findall('.//' + qn('w:delText')))

def _ins_text(elem):
    """Concatenate all w:t descendants (inside w:ins)."""
    return ''.join(t.text or '' for t in elem.findall('.//' + qn('w:t')))

def _para_new_text(p_elem):
    """Text of a paragraph after accepting tracked changes (ins included, del excluded)."""
    return ''.join(t.text or '' for t in p_elem.findall('.//' + qn('w:t')))

def _para_orig_text(p_elem):
    """Text of a paragraph as it exists in the TS (del included, ins excluded)."""
    parts = []
    for node in p_elem.iter():
        if node.tag == qn('w:delText') and node.text:
            parts.append(node.text)
        elif node.tag == qn('w:t') and node.text:
            # Skip if inside a w:ins
            if not any(a.tag == qn('w:ins') for a in node.iterancestors()):
                parts.append(node.text)
    return ''.join(parts)

def _style_val(p_elem):
    pPr = p_elem.find(qn('w:pPr'))
    if pPr is None:
        return None
    pStyle = pPr.find(qn('w:pStyle'))
    if pStyle is None:
        return None
    return pStyle.get(qn('w:val'))


_HEADING_NUM_RE = re.compile(r'^(\d+(?:\.\d+)*)\s+\S')
_SKIP_MARKER_RE = re.compile(r'^[\[\(]?\s*(?:\.{3}|…)\s*[\]\)]?$')


def _para_text_with_tabs(p_elem):
    """Paragraph text with w:tab elements rendered as '\\t'.
    Used for heading detection since ETSI headings store the number and title in
    separate runs separated by <w:tab/>, which _para_orig_text would drop."""
    parts = []
    for node in p_elem.iter():
        if node.tag == qn('w:t') and node.text:
            if not any(a.tag == qn('w:ins') for a in node.iterancestors()):
                parts.append(node.text)
        elif node.tag == qn('w:delText') and node.text:
            parts.append(node.text)
        elif node.tag == qn('w:tab'):
            parts.append('\t')
    return ''.join(parts)


def _heading_number(p_elem):
    """Return dotted section number if this paragraph is a numbered heading, else None.
    Requires the paragraph style to start with 'Heading' (case-insensitive) β€” this
    prevents false positives from body paragraphs whose text starts with a digit,
    notably bit-description lines like "1 = alphabet set." (style B30) that appear
    in Terminal Profile sections."""
    style = (_style_val(p_elem) or '').lower()
    if not style.startswith('heading'):
        return None
    text = _para_text_with_tabs(p_elem).strip()
    m = _HEADING_NUM_RE.match(text)
    return m.group(1) if m else None


def _is_skip_marker(text):
    """True for [...] / […] / ... / … / (...) / (…) after .strip()."""
    return bool(_SKIP_MARKER_RE.match(text.strip()))


def _is_rpr_ins(ins_elem):
    """True if w:ins is inside w:rPr β€” a formatting change, not a content insertion."""
    p = ins_elem.getparent()
    return p is not None and p.tag == qn('w:rPr')

def _is_inserted_para(p_elem):
    """True if this paragraph's paragraph-mark is tracked as inserted (whole new para)."""
    pPr = p_elem.find(qn('w:pPr'))
    if pPr is None:
        return False
    rPr = pPr.find(qn('w:rPr'))
    if rPr is None:
        return False
    return rPr.find(qn('w:ins')) is not None

def _is_deleted_para(p_elem):
    """True if this paragraph's paragraph-mark is tracked as deleted (whole para deleted)."""
    pPr = p_elem.find(qn('w:pPr'))
    if pPr is None:
        return False
    rPr = pPr.find(qn('w:rPr'))
    if rPr is None:
        return False
    return rPr.find(qn('w:del')) is not None

def _is_fully_deleted_tbl(tbl_elem):
    """True if every row in the table is tracked as a row-level deletion."""
    rows = tbl_elem.findall(qn('w:tr'))
    if not rows:
        return False
    return all(
        tr.find(qn('w:trPr')) is not None and
        tr.find(qn('w:trPr')).find(qn('w:del')) is not None
        for tr in rows
    )

def _is_fully_inserted_tbl(tbl_elem):
    """True if every row in the table is tracked as a row-level insertion."""
    rows = tbl_elem.findall(qn('w:tr'))
    if not rows:
        return False
    return all(
        tr.find(qn('w:trPr')) is not None and
        tr.find(qn('w:trPr')).find(qn('w:ins')) is not None
        for tr in rows
    )


# ── Table helpers ─────────────────────────────────────────────────────────────

def _table_header(tbl_elem):
    """First row cell texts β€” used as table identifier."""
    first_tr = tbl_elem.find(qn('w:tr'))
    if first_tr is None:
        return []
    cells = []
    for tc in first_tr.findall(qn('w:tc')):
        p = tc.find('.//' + qn('w:p'))
        cells.append(_para_new_text(p).strip() if p is not None else '')
    return cells

def _row_col0(tr_elem):
    """Col-0 text of a table row β€” used as row anchor."""
    tc = tr_elem.find(qn('w:tc'))
    if tc is None:
        return ''
    p = tc.find('.//' + qn('w:p'))
    return _para_new_text(p).strip() if p is not None else ''


# ── Inline del+ins extraction (from a single paragraph) ──────────────────────

def _extract_inline_replacements(p_elem):
    """
    Return list of (old_text, new_text) pairs from del+ins sibling pairs.
    Handles: del-then-ins, ins-then-del, multi-fragment consecutive dels.
    Filters: whitespace-only dels with no adjacent ins, empty dels, rPr ins.
    """
    children = list(p_elem)
    pairs = []
    skip = set()

    for i, child in enumerate(children):
        if i in skip:
            continue

        if child.tag != qn('w:del'):
            continue

        old_text = _del_text(child)

        # Empty del (paragraph-mark or line-break deletion) β€” discard
        if not old_text:
            skip.add(i)
            continue

        # Merge consecutive del siblings (multi-fragment deletion)
        j = i + 1
        while j < len(children) and children[j].tag == qn('w:del'):
            old_text += _del_text(children[j])
            skip.add(j)
            j += 1

        # Whitespace-only del: only keep if there's an adjacent ins
        next_sib = children[j] if j < len(children) else None
        prev_sib = children[i - 1] if i > 0 else None

        new_text = None
        if next_sib is not None and next_sib.tag == qn('w:ins') and not _is_rpr_ins(next_sib):
            new_text = _ins_text(next_sib)
            skip.add(j)
        elif prev_sib is not None and prev_sib.tag == qn('w:ins') and not _is_rpr_ins(prev_sib):
            new_text = _ins_text(prev_sib)

        if new_text is None:
            if not old_text.strip():
                skip.add(i)
                continue  # whitespace artefact with no counterpart
            # Pure deletion (no replacement) β€” record with empty new
            pairs.append((old_text, ''))
        else:
            pairs.append((old_text, new_text))

    return pairs


# ── Table change extraction ───────────────────────────────────────────────────

def _parse_table(tbl_elem, changes, section_heading='', section_number=''):
    header = _table_header(tbl_elem)
    header_key = header[:3]  # first 3 columns enough for matching
    rows = tbl_elem.findall(qn('w:tr'))

    for tr_idx, tr in enumerate(rows):
        trPr = tr.find(qn('w:trPr'))

        # ── Tracked row insertion ─────────────────────────────────────────
        if trPr is not None and trPr.find(qn('w:ins')) is not None:
            # Find preceding stable rows for anchor + context disambiguation
            stable_before = []
            for prev_idx in range(tr_idx - 1, -1, -1):
                prev_tr = rows[prev_idx]
                prev_trPr = prev_tr.find(qn('w:trPr'))
                if prev_trPr is None or prev_trPr.find(qn('w:ins')) is None:
                    stable_before.append(_row_col0(prev_tr))
                    if len(stable_before) >= 3:
                        break

            after_anchor = stable_before[0] if stable_before else ''
            context_rows_before = stable_before[1:]

            cells = []
            for tc in tr.findall(qn('w:tc')):
                tcPr = tc.find(qn('w:tcPr'))

                # Width
                width = None
                if tcPr is not None:
                    tcW = tcPr.find(qn('w:tcW'))
                    if tcW is not None:
                        try:
                            width = int(tcW.get(qn('w:w'), 0))
                        except (ValueError, TypeError):
                            width = None

                # vMerge (no w:val attribute = continuation)
                is_vmerge = False
                if tcPr is not None:
                    vm = tcPr.find(qn('w:vMerge'))
                    if vm is not None and vm.get(qn('w:val')) is None:
                        is_vmerge = True

                # Text β€” prefer ins text, fall back to all text
                cell_ins_text = _ins_text(tc)
                p = tc.find('.//' + qn('w:p'))
                cell_text = cell_ins_text if cell_ins_text else (_para_new_text(p) if p else '')
                style = _style_val(p) if p is not None else None

                cells.append({
                    'text': cell_text.strip(),
                    'width': width,
                    'vmerge': is_vmerge,
                    'style': style,
                })

            changes.append({
                'type': 'row_insert',
                'location': {
                    'kind': 'table_row',
                    'table_header': header_key,
                    'after_row_anchor': after_anchor,
                    'context_rows_before': context_rows_before,
                    'section_heading': section_heading,
                    'section_number': section_number,
                },
                'cells': cells,
            })
            continue

        # ── Cell-level text_replace ───────────────────────────────────────
        row_anchor = _row_col0(tr)
        tcs = tr.findall(qn('w:tc'))
        for col_idx, tc in enumerate(tcs):
            for p in tc.findall('.//' + qn('w:p')):
                for old_text, new_text in _extract_inline_replacements(p):
                    if not old_text:
                        continue
                    changes.append({
                        'type': 'text_replace',
                        'location': {
                            'kind': 'table_cell',
                            'table_header': header_key,
                            'row_anchor': row_anchor,
                            'col_idx': col_idx,
                            'section_heading': section_heading,
                            'section_number': section_number,
                        },
                        'old': old_text,
                        'new': new_text,
                    })


# ── Body paragraph extraction ─────────────────────────────────────────────────

def _parse_body(body, changes):
    """
    Walk direct children of w:body, emitting changes.

    Change types emitted:
      section_replace β€” a contiguous block of fully-deleted elements (para and/or
                        table, tracked at the paragraph-mark / row level) followed
                        immediately by a contiguous block of fully-inserted elements.
                        The raw XML of ALL those CR elements is stored verbatim so
                        the applicator can transplant them directly into the TS β€”
                        exactly what Word does on a copy-paste.
      text_replace    β€” an inline del+ins pair inside an otherwise-stable paragraph.
      para_insert     β€” one or more wholly-new paragraphs with no corresponding
                        deletion (rare; kept for backward compatibility).
    """
    from lxml import etree

    prev_stable_text = ''
    current_section = ''

    # ── Section-replace accumulator ───────────────────────────────────────────
    sec_del = []    # fully-deleted elements (CR del block)
    sec_sep = []    # empty/separator paragraphs between del and ins blocks
    sec_ins = []    # fully-inserted elements (CR ins block)
    sec_state = 'stable'   # 'stable' | 'del' | 'sep' | 'ins'
    sec_anchor = ''

    def flush_section():
        nonlocal sec_state, sec_anchor, prev_stable_text
        if not sec_del and not sec_ins:
            sec_del.clear(); sec_sep.clear(); sec_ins.clear()
            sec_state = 'stable'
            return
        # The del_heading is the text content of the first deleted paragraph
        del_heading = ''
        for e in sec_del:
            tag = e.tag.split('}')[-1] if '}' in e.tag else e.tag
            if tag == 'p':
                t = _del_text(e).strip() or _para_orig_text(e).strip()
                if t:
                    del_heading = t
                    break
        # Fallback: if first deleted element was a table, use its first cell text
        if not del_heading:
            for e in sec_del:
                tag = e.tag.split('}')[-1] if '}' in e.tag else e.tag
                if tag == 'tbl':
                    first_tc = e.find('.//' + qn('w:tc'))
                    if first_tc is not None:
                        p = first_tc.find('.//' + qn('w:p'))
                        del_heading = (_para_new_text(p) if p is not None
                                       else _para_new_text(first_tc)).strip()
                    break
        # Serialize all elements for the manifest (del + sep + ins)
        all_elems = sec_del + sec_sep + sec_ins
        elements_xml = [etree.tostring(e, encoding='unicode') for e in all_elems]
        has_del_table = any(
            (e.tag.split('}')[-1] if '}' in e.tag else e.tag) == 'tbl'
            for e in sec_del
        )
        changes.append({
            'type': 'section_replace',
            'location': {
                'kind': 'body',
                'del_heading': del_heading,
                'has_del_table': has_del_table,
                'anchor_text': sec_anchor,
                'section_number': current_section,
            },
            'elements_xml': elements_xml,
        })
        # Refresh anchor so subsequent para_insert targets the new text, not the deleted one
        if sec_ins:
            last_p = next((e for e in reversed(sec_ins)
                           if (e.tag.split('}')[-1] if '}' in e.tag else e.tag) == 'p'), None)
            if last_p is not None:
                candidate = _para_new_text(last_p).strip()
                if candidate:
                    prev_stable_text = candidate
        sec_del.clear(); sec_sep.clear(); sec_ins.clear()
        sec_state = 'stable'

    # ── Para-insert accumulator (for standalone new paragraphs) ───────────────
    insert_group = []

    def flush_group():
        if not insert_group:
            return
        paras = [
            {'text': _para_new_text(p).strip(), 'style': _style_val(p)}
            for p in insert_group
        ]
        paras = [p for p in paras if p['text'] or p['style']]
        if paras:
            changes.append({
                'type': 'para_insert',
                'location': {
                    'kind': 'body',
                    'anchor_text': prev_stable_text,
                    'section_number': current_section,
                },
                'paragraphs': paras,
            })
        insert_group.clear()

    for elem in body:
        tag = elem.tag.split('}')[-1] if '}' in elem.tag else elem.tag

        if tag == 'p':
            hn = _heading_number(elem)
            if hn:
                current_section = hn

            is_del = _is_deleted_para(elem)
            is_ins = _is_inserted_para(elem)
            is_empty = not _para_orig_text(elem).strip() and not _para_new_text(elem).strip()

            if is_del:
                # Start or continue the del block
                if sec_state == 'ins':
                    flush_section()   # ins before del = two separate section_replaces
                if sec_state == 'stable':
                    flush_group()
                    sec_anchor = prev_stable_text
                sec_state = 'del'
                sec_del.append(elem)

            elif is_ins:
                if sec_state in ('del', 'sep'):
                    # ins block follows a del block β†’ part of section_replace
                    sec_state = 'ins'
                    sec_ins.append(elem)
                elif sec_state == 'ins':
                    sec_ins.append(elem)
                else:
                    # Standalone ins paragraph (no preceding del block)
                    flush_group()   # (should already be empty)
                    insert_group.append(elem)

            elif is_empty:
                if sec_state == 'del':
                    # Separator between del and ins blocks
                    sec_state = 'sep'
                    sec_sep.append(elem)
                elif sec_state in ('sep', 'ins'):
                    sec_ins.append(elem)
                else:
                    # Empty para in stable region β€” ignore for anchoring
                    pass

            else:
                # Stable (or inline-changed) paragraph
                flush_section()
                flush_group()

                for old_text, new_text in _extract_inline_replacements(elem):
                    if not old_text:
                        continue
                    changes.append({
                        'type': 'text_replace',
                        'location': {
                            'kind': 'body_para',
                            'para_context': _para_orig_text(elem).strip(),
                            'section_number': current_section,
                        },
                        'old': old_text,
                        'new': new_text,
                    })

                orig = _para_orig_text(elem).strip()
                if orig and not _is_skip_marker(orig):
                    prev_stable_text = orig

        elif tag == 'tbl':
            if _is_fully_deleted_tbl(elem):
                if sec_state == 'ins':
                    flush_section()
                if sec_state == 'stable':
                    flush_group()
                    sec_anchor = prev_stable_text
                sec_state = 'del'
                sec_del.append(elem)

            elif _is_fully_inserted_tbl(elem):
                if sec_state in ('del', 'sep', 'ins'):
                    sec_state = 'ins'
                    sec_ins.append(elem)
                else:
                    # Standalone fully-inserted table (no del block) β€” treat as section_replace
                    flush_group()
                    sec_anchor = prev_stable_text
                    sec_state = 'ins'
                    sec_ins.append(elem)

            else:
                # Table with inline cell changes
                flush_section()
                flush_group()
                _parse_table(elem, changes, section_heading=prev_stable_text,
                             section_number=current_section)

    flush_section()
    flush_group()


# ── Public API ────────────────────────────────────────────────────────────────

def parse_cr(cr_path, output_json=None):
    """
    Parse all tracked changes in a CR DOCX.
    Returns list of change dicts. Optionally saves to JSON.
    """
    doc = docx.Document(str(cr_path))
    body = doc.element.body
    changes = []
    _parse_body(body, changes)

    if output_json:
        Path(output_json).write_text(
            json.dumps(changes, indent=2, ensure_ascii=False), encoding='utf-8'
        )
    return changes


# ── CLI ───────────────────────────────────────────────────────────────────────

def main():
    ap = argparse.ArgumentParser(description='Parse CR DOCX tracked changes into JSON manifest.')
    ap.add_argument('cr_docx', help='CR DOCX file path')
    ap.add_argument('--output', default=None, help='Output JSON path (default: print to stdout)')
    args = ap.parse_args()

    changes = parse_cr(args.cr_docx, output_json=args.output)

    if args.output:
        print(f'Wrote {len(changes)} change(s) β†’ {args.output}')
    else:
        print(json.dumps(changes, indent=2, ensure_ascii=False))


if __name__ == '__main__':
    main()