Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| import re | |
| import json | |
| import sys | |
| from docx import Document | |
| from docx.oxml.ns import qn | |
| from master_key import TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS | |
| def is_red_font(run): | |
| col = run.font.color | |
| if col and col.rgb: | |
| r, g, b = col.rgb | |
| if r>150 and g<100 and b<100 and (r-g)>30 and (r-b)>30: | |
| return True | |
| rPr = getattr(run._element, "rPr", None) | |
| if rPr is not None: | |
| clr = rPr.find(qn('w:color')) | |
| if clr is not None: | |
| val = clr.get(qn('w:val')) | |
| if re.fullmatch(r"[0-9A-Fa-f]{6}", val): | |
| rr, gg, bb = int(val[:2],16), int(val[2:4],16), int(val[4:],16) | |
| if rr>150 and gg<100 and bb<100 and (rr-gg)>30 and (rr-bb)>30: | |
| return True | |
| return False | |
| def _prev_para_text(tbl): | |
| prev = tbl._tbl.getprevious() | |
| while prev is not None and not prev.tag.endswith("}p"): | |
| prev = prev.getprevious() | |
| if prev is None: | |
| return "" | |
| return "".join(node.text for node in prev.iter() if node.tag.endswith("}t") and node.text).strip() | |
| def match_table_schema(tbl): | |
| # look for explicit heading constraint | |
| heading = _prev_para_text(tbl) | |
| headers = [c.text.strip() for c in tbl.rows[0].cells] | |
| col0 = [r.cells[0].text.strip() for r in tbl.rows] | |
| # 1) exact first-cell name | |
| first = tbl.rows[0].cells[0].text.strip() | |
| if first in TABLE_SCHEMAS: | |
| spec = TABLE_SCHEMAS[first] | |
| if not spec.get("headings") or any(h["text"]==heading for h in spec.get("headings",[])): | |
| return first | |
| # 2) any other schema with explicit headings | |
| for name, spec in TABLE_SCHEMAS.items(): | |
| if any(h["text"]==heading for h in spec.get("headings",[])): | |
| return name | |
| # 3) by two-column 'columns' | |
| for name, spec in TABLE_SCHEMAS.items(): | |
| cols = spec.get("columns") | |
| if cols and all(col in headers for col in cols): | |
| return name | |
| # 4) row1 tables | |
| for name, spec in TABLE_SCHEMAS.items(): | |
| if spec["orientation"]=="row1" and all(lbl in headers for lbl in spec["labels"]): | |
| return name | |
| # 5) left tables | |
| for name, spec in TABLE_SCHEMAS.items(): | |
| if spec["orientation"]=="left" and all(lbl in col0 for lbl in spec["labels"]): | |
| return name | |
| return None | |
| def extract_red_text(path): | |
| doc = Document(path) | |
| out = {} | |
| # --- TABLES --- | |
| for tbl in doc.tables: | |
| schema = match_table_schema(tbl) | |
| if not schema: | |
| continue | |
| spec = TABLE_SCHEMAS[schema] | |
| # handle the special split_labels (row1 only) | |
| if spec.get("split_labels") and spec["orientation"]=="row1": | |
| cell_txt = tbl.rows[1].cells[0].text.strip() | |
| first_lbl = spec["split_labels"][0] | |
| narrative, _, tail = cell_txt.partition(first_lbl) | |
| narrative = narrative.strip() | |
| if narrative: | |
| out.setdefault(schema, {}).setdefault(spec["labels"][0], []).append(narrative) | |
| for i, lbl in enumerate(spec["split_labels"]): | |
| nxt = spec["split_labels"][i+1] if i+1<len(spec["split_labels"]) else None | |
| pattern = rf"{re.escape(lbl)}\s*(.+?)(?={re.escape(nxt)})" if nxt else rf"{re.escape(lbl)}\s*(.+)$" | |
| m = re.search(pattern, cell_txt, flags=re.DOTALL) | |
| if m: | |
| val = m.group(1).strip() | |
| out.setdefault(schema, {}).setdefault(lbl, []).append(val) | |
| continue | |
| # normal tables | |
| labels = spec["labels"] + [schema] | |
| collected = {lbl: [] for lbl in labels} | |
| seen = {lbl: set() for lbl in labels} | |
| by_col = (spec["orientation"]=="row1") | |
| rows = tbl.rows[1:] | |
| for ri, row in enumerate(rows): | |
| for ci, cell in enumerate(row.cells): | |
| red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip() | |
| if not red_txt: | |
| continue | |
| if by_col: | |
| # column header → your defined label | |
| lbl = spec["labels"][ci] if ci < len(spec["labels"]) else schema | |
| else: | |
| # first cell in this row → must be one of your labels | |
| raw = row.cells[0].text.strip() | |
| lbl = raw if raw in spec["labels"] else schema | |
| if red_txt not in seen[lbl]: | |
| seen[lbl].add(red_txt) | |
| collected[lbl].append(red_txt) | |
| # keep only non-empty | |
| data = {k:v for k,v in collected.items() if v} | |
| if data: | |
| out[schema] = data | |
| # --- PARAGRAPHS --- | |
| paras = {} | |
| for idx, para in enumerate(doc.paragraphs): | |
| red_txt = "".join(r.text for r in para.runs if is_red_font(r)).strip() | |
| if not red_txt: | |
| continue | |
| # find nearest heading above | |
| context = None | |
| for j in range(idx-1, -1, -1): | |
| txt = doc.paragraphs[j].text.strip() | |
| if txt and any(re.search(p, txt) for p in HEADING_PATTERNS["main"]+HEADING_PATTERNS["sub"]): | |
| context = txt | |
| break | |
| # fallback for date line | |
| if not context and re.fullmatch(PARAGRAPH_PATTERNS["date_line"], red_txt): | |
| context = "Date" | |
| paras.setdefault(context or "(para)", []).append(red_txt) | |
| if paras: | |
| out["paragraphs"] = paras | |
| return out | |
| if __name__ == "__main__": | |
| fn = sys.argv[1] if len(sys.argv)>1 else "test.docx" | |
| print(json.dumps(extract_red_text(fn), indent=2, ensure_ascii=False)) |