#!/usr/bin/env python3 """ extract_red_text.py """ import re import json import sys from io import BytesIO from docx import Document from docx.oxml.ns import qn # Import schema constants (TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS, GLOBAL_SETTINGS) # Ensure master_key.py is present in same dir / importable path from master_key import TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS, GLOBAL_SETTINGS def is_red_font(run): """ Robust detection of 'red' font in a run. Tries several sources: - python-docx run.font.color.rgb (safe-guarded) - raw XML rPr/w:color value (hex) Returns True if color appears predominantly red. """ # Quick guard if run is None: return False # 1) Try docx high-level color API if available try: col = getattr(run.font, "color", None) if col is not None: rgb_val = getattr(col, "rgb", None) if rgb_val: # rgb_val might be an RGBColor object or a tuple/list or hex-string try: # If it's sequence-like (tuple/list) with 3 ints if isinstance(rgb_val, (tuple, list)) and len(rgb_val) == 3: rr, gg, bb = rgb_val else: # Try string representation like 'FF0000' or 'ff0000' hexstr = str(rgb_val).strip() if re.fullmatch(r"[0-9A-Fa-f]{6}", hexstr): rr, gg, bb = int(hexstr[0:2], 16), int(hexstr[2:4], 16), int(hexstr[4:6], 16) else: # unknown format - fall through to XML check rr = gg = bb = None if rr is not None: # Heuristic thresholds for 'red-ish' if rr > 150 and gg < 120 and bb < 120 and (rr - gg) > 30 and (rr - bb) > 30: return True except Exception: # fall back to rPr introspection below pass except Exception: # ignore and continue to XML method pass # 2) Inspect raw XML run properties for try: rPr = getattr(run._element, "rPr", None) if rPr is not None: clr = rPr.find(qn('w:color')) if clr is not None: val = clr.get(qn('w:val')) or clr.get('w:val') or clr.get('val') if val and isinstance(val, str): val = val.strip() # sometimes color is provided as 'FF0000' hex or shorthand if re.fullmatch(r"[0-9A-Fa-f]{6}", val): rr, gg, bb = int(val[0:2], 16), int(val[2:4], 16), int(val[4:6], 16) if rr > 150 and gg < 120 and bb < 120 and (rr - gg) > 30 and (rr - bb) > 30: return True except Exception: pass return False def _prev_para_text(tbl): """Return text of previous paragraph node before a given table element.""" prev = tbl._tbl.getprevious() while prev is not None and not prev.tag.endswith("}p"): prev = prev.getprevious() if prev is None: return "" # gather all text nodes under the paragraph element return "".join(node.text for node in prev.iter() if node.tag.endswith("}t") and node.text).strip() def normalize_text(text): """Normalize text for more reliable matching (collapse whitespace).""" if text is None: return "" return re.sub(r'\s+', ' ', text.strip()) def fuzzy_match_heading(heading, patterns): """ Attempt fuzzy matching of heading against regex patterns. patterns is a list of pattern dicts or strings. """ heading_norm = normalize_text(heading.upper()) for p in patterns: if isinstance(p, dict): pat = p.get("text", "") else: pat = p try: if re.search(pat, heading_norm, re.IGNORECASE): return True except re.error: # treat as plain substring fallback if pat and pat.upper() in heading_norm: return True return False def get_table_context(tbl): """Return context metadata for a table to aid schema matching.""" heading = normalize_text(_prev_para_text(tbl)) headers = [] if tbl.rows: # collect header text of first row, keeping cell order headers = [normalize_text(c.text) for c in tbl.rows[0].cells] col0 = [normalize_text(r.cells[0].text) for r in tbl.rows if r.cells and r.cells[0].text.strip()] first_cell = normalize_text(tbl.rows[0].cells[0].text) if tbl.rows else "" all_cells = [] for row in tbl.rows: for cell in row.cells: text = normalize_text(cell.text) if text: all_cells.append(text) return { 'heading': heading, 'headers': headers, 'col0': col0, 'first_cell': first_cell, 'all_cells': all_cells, 'num_rows': len(tbl.rows), 'num_cols': len(tbl.rows[0].cells) if tbl.rows else 0 } def calculate_schema_match_score(schema_name, spec, context): """ Return (score, reasons[]) for how well a table context matches a schema. Heuristic-based scoring; vehicle registration and 'DETAILS' summary boosts added. """ score = 0 reasons = [] table_text = " ".join(context.get('headers', [])).lower() + " " + context.get('heading', "").lower() # Vehicle Registration specific boost if "Vehicle Registration" in schema_name: vehicle_keywords = ["registration", "vehicle", "sub-contractor", "weight verification", "rfs suspension", "roadworthiness"] keyword_matches = sum(1 for kw in vehicle_keywords if kw in table_text) if keyword_matches >= 2: score += 150 reasons.append(f"Vehicle keywords matched: {keyword_matches}") elif keyword_matches >= 1: score += 75 reasons.append(f"Some vehicle keywords matched: {keyword_matches}") # Summary DETAILS boost if "Summary" in schema_name and "details" in table_text: score += 100 reasons.append("Summary with DETAILS found") if "Summary" not in schema_name and "details" in table_text: score -= 75 reasons.append("Non-summary schema penalized due to DETAILS column presence") # Context exclusions for exclusion in spec.get("context_exclusions", []): if exclusion.lower() in table_text: score -= 50 reasons.append(f"Context exclusion: {exclusion}") # Context keywords positive matches kw_count = 0 for kw in spec.get("context_keywords", []): if kw.lower() in table_text: kw_count += 1 if kw_count: score += kw_count * 15 reasons.append(f"Context keywords matched: {kw_count}") # First-cell exact match if context.get('first_cell') and context['first_cell'].upper() == schema_name.upper(): score += 100 reasons.append("Exact first cell match") # Heading pattern match for h in spec.get("headings", []) or []: pat = h.get("text") if isinstance(h, dict) and h.get("text") else h try: if pat and re.search(pat, context.get('heading', ""), re.IGNORECASE): score += 50 reasons.append(f"Heading regex matched: {pat}") break except re.error: if pat and pat.lower() in context.get('heading', "").lower(): score += 50 reasons.append(f"Heading substring matched: {pat}") break # Column header matching (strict) if spec.get("columns"): cols = [normalize_text(c) for c in spec["columns"]] matches = 0 for col in cols: if any(col.upper() in h.upper() for h in context.get('headers', [])): matches += 1 if matches == len(cols): score += 60 reasons.append("All expected columns matched exactly") elif matches > 0: score += matches * 20 reasons.append(f"Partial column matches: {matches}/{len(cols)}") # Label matching for left-oriented tables if spec.get("orientation") == "left": labels = [normalize_text(lbl) for lbl in spec.get("labels", [])] matches = 0 for lbl in labels: if any(lbl.upper() in c.upper() or c.upper() in lbl.upper() for c in context.get('col0', [])): matches += 1 if matches > 0: score += (matches / max(1, len(labels))) * 30 reasons.append(f"Left-orientation label matches: {matches}/{len(labels)}") # Row1 (header row) flexible matching elif spec.get("orientation") == "row1": labels = [normalize_text(lbl) for lbl in spec.get("labels", [])] matches = 0.0 header_texts = " ".join(context.get('headers', [])).upper() for lbl in labels: label_upper = lbl.upper() # exact in any header if any(label_upper in h.upper() for h in context.get('headers', [])): matches += 1.0 else: # partial words from label in header_texts for word in label_upper.split(): if len(word) > 3 and word in header_texts: matches += 0.5 break if matches > 0: score += (matches / max(1.0, len(labels))) * 40 reasons.append(f"Row1 header-like matches: {matches}/{len(labels)}") # Special handling for declaration schemas if schema_name == "Operator Declaration": # boost if 'print name' first cell and heading indicates operator declaration if context.get('first_cell', "").upper().startswith("PRINT"): if "OPERATOR DECLARATION" in context.get('heading', "").upper(): score += 80 reasons.append("Operator Declaration context & first-cell indicate match") elif any("MANAGER" in c.upper() for c in context.get('all_cells', [])): score += 60 reasons.append("Manager found in cells for Operator Declaration") if schema_name == "NHVAS Approved Auditor Declaration": if context.get('first_cell', "").upper().startswith("PRINT"): # penalize where manager words appear (to reduce false positives) if any("MANAGER" in c.upper() for c in context.get('all_cells', [])): score -= 50 reasons.append("Penalty: found manager text in auditor declaration table") return score, reasons def match_table_schema(tbl): """ Iterate TABLE_SCHEMAS and pick best match by score threshold. Returns schema name or None when below threshold. """ context = get_table_context(tbl) best_match = None best_score = float("-inf") for name, spec in TABLE_SCHEMAS.items(): try: score, reasons = calculate_schema_match_score(name, spec, context) except Exception: score, reasons = 0, ["error computing score"] if score > best_score: best_score = score best_match = name # threshold to avoid spurious picks if best_score >= 20: return best_match return None def check_multi_schema_table(tbl): """ Identify tables that contain multiple logical schemas (e.g., Operator Information + Contact Details) Return list of schema names if multi, else None. """ context = get_table_context(tbl) operator_labels = ["Operator name (Legal entity)", "NHVAS Accreditation No.", "Registered trading name/s", "Australian Company Number", "NHVAS Manual"] contact_labels = ["Operator business address", "Operator Postal address", "Email address", "Operator Telephone Number"] has_operator = any(any(op_lbl.upper() in cell.upper() for op_lbl in operator_labels) for cell in context.get('col0', [])) has_contact = any(any(cont_lbl.upper() in cell.upper() for cont_lbl in contact_labels) for cell in context.get('col0', [])) if has_operator and has_contact: return ["Operator Information", "Operator contact details"] return None def extract_multi_schema_table(tbl, schemas): """ For tables that embed multiple schema sections vertically (left orientation), split and extract. Returns a dict mapping schema_name -> {label: [values,...]} """ result = {} for schema_name in schemas: if schema_name not in TABLE_SCHEMAS: continue spec = TABLE_SCHEMAS[schema_name] schema_data = {} # iterate rows and match the left-most cell against spec labels for ri, row in enumerate(tbl.rows): if not row.cells: continue row_label = normalize_text(row.cells[0].text) belongs = False matched_label = None for spec_label in spec.get("labels", []): spec_norm = normalize_text(spec_label).upper() row_norm = row_label.upper() if spec_norm == row_norm or spec_norm in row_norm or row_norm in spec_norm: belongs = True matched_label = spec_label break if not belongs: continue # gather red-text from the row's value cells (all others) for ci, cell in enumerate(row.cells[1:], start=1): red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip() if red_txt: schema_data.setdefault(matched_label, []).append(red_txt) if schema_data: result[schema_name] = schema_data return result def extract_table_data(tbl, schema_name, spec): """ Extract red text from a table for a given schema. Special handling for Vehicle Registration (row1 header orientation). """ # Vehicle Registration special-case (headers in first row) if "Vehicle Registration" in schema_name: print(f" 🚗 EXTRACTION FIX: Processing Vehicle Registration table") labels = spec.get("labels", []) collected = {lbl: [] for lbl in labels} seen = {lbl: set() for lbl in labels} if len(tbl.rows) < 2: print(" ❌ Vehicle table has less than 2 rows; skipping") return {} header_row = tbl.rows[0] column_mapping = {} print(f" 📋 Mapping {len(header_row.cells)} header cells to labels") for col_idx, cell in enumerate(header_row.cells): header_text = normalize_text(cell.text).strip() if not header_text: continue print(f" Column {col_idx}: '{header_text}'") best_match = None best_score = 0.0 for label in labels: # exact match if header_text.upper() == label.upper(): best_match = label best_score = 1.0 break # partial token overlap scoring header_words = set(word.upper() for word in header_text.split() if len(word) > 2) label_words = set(word.upper() for word in label.split() if len(word) > 2) if header_words and label_words: common = header_words.intersection(label_words) if common: score = len(common) / max(len(header_words), len(label_words)) if score > best_score and score >= 0.35: # relaxed threshold for OCR noise best_score = score best_match = label if best_match: column_mapping[col_idx] = best_match print(f" ✅ Mapped to: '{best_match}' (score: {best_score:.2f})") else: # additional heuristics: simple substring matches for label in labels: if label.lower() in header_text.lower() or header_text.lower() in label.lower(): column_mapping[col_idx] = label print(f" ✅ Mapped by substring to: '{label}'") break else: print(f" ⚠️ No mapping found for '{header_text}'") print(f" 📊 Total column mappings: {len(column_mapping)}") # Extract data rows for row_idx in range(1, len(tbl.rows)): row = tbl.rows[row_idx] print(f" 📌 Processing data row {row_idx}") for col_idx, cell in enumerate(row.cells): if col_idx not in column_mapping: continue label = column_mapping[col_idx] red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip() if red_txt: print(f" 🔴 Found red text in '{label}': '{red_txt}'") if red_txt not in seen[label]: seen[label].add(red_txt) collected[label].append(red_txt) result = {k: v for k, v in collected.items() if v} print(f" ✅ Vehicle Registration extracted: {len(result)} columns with data") return result # Generic extraction for other table types labels = spec.get("labels", []) + [schema_name] collected = {lbl: [] for lbl in labels} seen = {lbl: set() for lbl in labels} by_col = (spec.get("orientation") == "row1") start_row = 1 if by_col else 0 rows = tbl.rows[start_row:] for ri, row in enumerate(rows): for ci, cell in enumerate(row.cells): red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip() if not red_txt: continue if by_col: # column-wise mapping (header labels) if ci < len(spec.get("labels", [])): lbl = spec["labels"][ci] else: lbl = schema_name else: # left-oriented: match left label raw_label = normalize_text(row.cells[0].text) lbl = None for spec_label in spec.get("labels", []): if normalize_text(spec_label).upper() == raw_label.upper(): lbl = spec_label break if not lbl: for spec_label in spec.get("labels", []): spec_norm = normalize_text(spec_label).upper() raw_norm = raw_label.upper() if spec_norm in raw_norm or raw_norm in spec_norm: lbl = spec_label break if not lbl: lbl = schema_name if red_txt not in seen[lbl]: seen[lbl].add(red_txt) collected[lbl].append(red_txt) return {k: v for k, v in collected.items() if v} def extract_red_text(input_doc): """ Main extraction function. Accepts a docx.Document object or a path string (filename). Returns dictionary of extracted red-text organized by schema. """ if isinstance(input_doc, str): doc = Document(input_doc) else: doc = input_doc out = {} table_count = 0 for tbl in doc.tables: table_count += 1 # Check for multi-schema tables first multi_schemas = check_multi_schema_table(tbl) if multi_schemas: multi_data = extract_multi_schema_table(tbl, multi_schemas) for schema_name, schema_data in multi_data.items(): if schema_data: if schema_name in out: for k, v in schema_data.items(): out[schema_name].setdefault(k, []).extend(v) else: out[schema_name] = schema_data continue # match a single schema schema = match_table_schema(tbl) if not schema: # no confident schema match continue spec = TABLE_SCHEMAS.get(schema, {}) data = extract_table_data(tbl, schema, spec) if data: if schema in out: for k, v in data.items(): out[schema].setdefault(k, []).extend(v) else: out[schema] = data # Paragraph-level red-text extraction (with contextual heading resolution) paras = {} for idx, para in enumerate(doc.paragraphs): red_txt = "".join(r.text for r in para.runs if is_red_font(r)).strip() if not red_txt: continue # attempt to find nearest preceding heading paragraph (using HEADING_PATTERNS) context = None for j in range(idx - 1, -1, -1): txt = normalize_text(doc.paragraphs[j].text) if not txt: continue all_patterns = HEADING_PATTERNS.get("main", []) + HEADING_PATTERNS.get("sub", []) if any(re.search(p, txt, re.IGNORECASE) for p in all_patterns): context = txt break # fallback: date-line mapping for 'Date' single-line red texts if not context and re.fullmatch(PARAGRAPH_PATTERNS.get("date_line", r"^\s*\d{1,2}(?:st|nd|rd|th)?\s+[A-Za-z]+\s+\d{4}\s*$|^Date$"), red_txt): context = "Date" if not context: context = "(para)" paras.setdefault(context, []).append(red_txt) if paras: out["paragraphs"] = paras return out def extract_red_text_filelike(input_file, output_file): """ Accepts: - input_file: file-like object (BytesIO/File) or path - output_file: file-like object (opened for writing text) or path Returns the parsed dictionary. Writes the JSON to output_file if possible. """ # Reset file-like if necessary if hasattr(input_file, "seek"): try: input_file.seek(0) except Exception: pass # Load Document if isinstance(input_file, (str, bytes)): doc = Document(input_file) else: doc = Document(input_file) result = extract_red_text(doc) # Write result out if hasattr(output_file, "write"): json.dump(result, output_file, indent=2, ensure_ascii=False) try: output_file.flush() except Exception: pass else: with open(output_file, "w", encoding="utf-8") as f: json.dump(result, f, indent=2, ensure_ascii=False) return result if __name__ == "__main__": # Backwards-compatible script entry point if len(sys.argv) == 3: input_docx = sys.argv[1] output_json = sys.argv[2] try: doc = Document(input_docx) word_data = extract_red_text(doc) with open(output_json, 'w', encoding='utf-8') as f: json.dump(word_data, f, indent=2, ensure_ascii=False) print(json.dumps(word_data, indent=2, ensure_ascii=False)) except Exception as e: print("Error during extraction:", e) raise else: print("To use as a module: extract_red_text_filelike(input_file, output_file)")