#!/usr/bin/env python3 """ Updated pipeline.py Merged improvements: - removed duplicate functions - table processed-marker to avoid multiple handlers clobbering the same table - stricter detection of print-accreditation/operator-declaration tables - safer force replacement (avoid short->long mapping) - prefer exact qualified keys for Print Name / Position Title lookups - preserved all other logic and prints/logging - ADDED: header normalization, context-aware vehicle JSON selection, management summary scoping, unmatched-headers logging """ import json from docx import Document from docx.shared import RGBColor import re from typing import Any import os # ============================================================================ # Configuration / Heading patterns for document structure detection # ============================================================================ HEADING_PATTERNS = { "main": [ r"NHVAS\s+Audit\s+Summary\s+Report", r"NATIONAL\s+HEAVY\s+VEHICLE\s+ACCREDITATION\s+AUDIT\s+SUMMARY\s+REPORT", r"NHVAS\s+AUDIT\s+SUMMARY\s+REPORT" ], "sub": [ r"AUDIT\s+OBSERVATIONS\s+AND\s+COMMENTS", r"MAINTENANCE\s+MANAGEMENT", r"MASS\s+MANAGEMENT", r"FATIGUE\s+MANAGEMENT", r"Fatigue\s+Management\s+Summary\s+of\s+Audit\s+findings", r"MAINTENANCE\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS", r"MASS\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS", r"Vehicle\s+Registration\s+Numbers\s+of\s+Records\s+Examined", r"CORRECTIVE\s+ACTION\s+REQUEST\s+\(CAR\)", r"NHVAS\s+APPROVED\s+AUDITOR\s+DECLARATION", r"Operator\s+Declaration", r"Operator\s+Information", r"Driver\s*/\s*Scheduler\s+Records\s+Examined" ] } # ============================================================================ # State for unmatched headers (for iterative improvement) # ============================================================================ _unmatched_headers = {} def record_unmatched_header(header: str): if not header: return _unmatched_headers[header] = _unmatched_headers.get(header, 0) + 1 # ============================================================================ # UTILITY FUNCTIONS # ============================================================================ def load_json(filepath): with open(filepath, 'r', encoding='utf-8') as file: return json.load(file) def flatten_json(y, prefix=''): out = {} for key, val in y.items(): new_key = f"{prefix}.{key}" if prefix else key if isinstance(val, dict): out.update(flatten_json(val, new_key)) else: out[new_key] = val out[key] = val return out def is_red(run): color = run.font.color try: return color and ((getattr(color, "rgb", None) and color.rgb == RGBColor(255, 0, 0)) or getattr(color, "theme_color", None) == 1) except Exception: return False def get_value_as_string(value, field_name=""): if isinstance(value, list): if len(value) == 0: return "" elif len(value) == 1: return str(value[0]) else: if "australian company number" in field_name.lower() or "company number" in field_name.lower(): return value else: return " ".join(str(v) for v in value) else: return str(value) def get_clean_text(cell): text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: text += run.text return text.strip() def has_red_text(cell): for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run) and run.text.strip(): return True return False def has_red_text_in_paragraph(paragraph): for run in paragraph.runs: if is_red(run) and run.text.strip(): return True return False # New helper: normalize header text (removes parentheticals, punctuation, etc.) def normalize_header_text(s: str) -> str: if not s: return "" # remove parenthetical content s = re.sub(r'\([^)]*\)', ' ', s) # replace slashes s = s.replace("/", " ") # remove punctuation except # and % s = re.sub(r'[^\w\s\#\%]', ' ', s) s = re.sub(r'\s+', ' ', s).strip().lower() # common canonicalizations s = s.replace('registrationno', 'registration number') s = s.replace('registrationnumber', 'registration number') s = s.replace('sub contracted', 'sub contractor') s = s.replace('sub-contractor', 'sub contractor') s = s.replace('date range', '') s = s.replace('applicable for entry audit', '') s = s.strip() return s # ============================================================================ # JSON MATCHING FUNCTIONS # ============================================================================ def find_matching_json_value(field_name, flat_json): """Find matching value in JSON with multiple strategies""" field_name = (field_name or "").strip() if not field_name: return None # Try exact match first if field_name in flat_json: print(f" ✅ Direct match found for key '{field_name}'") return flat_json[field_name] # Case-insensitive exact match for key, value in flat_json.items(): if key.lower() == field_name.lower(): print(f" ✅ Case-insensitive match found for key '{field_name}' with JSON key '{key}'") return value # Better Print Name detection for operator vs auditor if field_name.lower().strip() == "print name": operator_keys = [k for k in flat_json.keys() if "operator" in k.lower() and "print name" in k.lower()] auditor_keys = [k for k in flat_json.keys() if "auditor" in k.lower() and ("print name" in k.lower() or "name" in k.lower())] if operator_keys: print(f" ✅ Operator Print Name match: '{field_name}' -> '{operator_keys[0]}'") return flat_json[operator_keys[0]] elif auditor_keys: print(f" ✅ Auditor Name match: '{field_name}' -> '{auditor_keys[0]}'") return flat_json[auditor_keys[0]] # Suffix matching for nested keys for key, value in flat_json.items(): if '.' in key and key.split('.')[-1].lower() == field_name.lower(): print(f" ✅ Suffix match found for key '{field_name}' with JSON key '{key}'") return value # Clean & exact match attempt clean_field = re.sub(r'[^\w\s]', ' ', field_name.lower()).strip() clean_field = re.sub(r'\s+', ' ', clean_field) for key, value in flat_json.items(): clean_key = re.sub(r'[^\w\s]', ' ', key.lower()).strip() clean_key = re.sub(r'\s+', ' ', clean_key) if clean_field == clean_key: print(f" ✅ Clean match found for key '{field_name}' with JSON key '{key}'") return value # Enhanced fuzzy matching with word-token scoring field_words = set(word.lower() for word in re.findall(r'\b\w+\b', field_name) if len(word) > 2) if not field_words: return None best_match = None best_score = 0 best_key = None for key, value in flat_json.items(): key_words = set(word.lower() for word in re.findall(r'\b\w+\b', key) if len(word) > 2) if not key_words: continue common_words = field_words.intersection(key_words) if common_words: similarity = len(common_words) / len(field_words.union(key_words)) coverage = len(common_words) / len(field_words) final_score = (similarity * 0.6) + (coverage * 0.4) if final_score > best_score: best_score = final_score best_match = value best_key = key if best_match and best_score >= 0.25: print(f" ✅ Fuzzy match found for key '{field_name}' with JSON key '{best_key}' (score: {best_score:.2f})") return best_match print(f" ❌ No match found for '{field_name}'") return None # ============================================================================ # RED TEXT PROCESSING FUNCTIONS # ============================================================================ def extract_red_text_segments(cell): red_segments = [] for para_idx, paragraph in enumerate(cell.paragraphs): current_segment = "" segment_runs = [] for run_idx, run in enumerate(paragraph.runs): if is_red(run): if run.text: current_segment += run.text segment_runs.append((para_idx, run_idx, run)) else: if segment_runs: red_segments.append({ 'text': current_segment, 'runs': segment_runs.copy(), 'paragraph_idx': para_idx }) current_segment = "" segment_runs = [] if segment_runs: red_segments.append({ 'text': current_segment, 'runs': segment_runs.copy(), 'paragraph_idx': para_idx }) return red_segments def replace_all_red_segments(red_segments, replacement_text): if not red_segments: return 0 if '\n' in replacement_text: replacement_lines = replacement_text.split('\n') else: replacement_lines = [replacement_text] replacements_made = 0 if red_segments and replacement_lines: first_segment = red_segments[0] if first_segment['runs']: first_run = first_segment['runs'][0][2] first_run.text = replacement_lines[0] first_run.font.color.rgb = RGBColor(0, 0, 0) replacements_made = 1 for _, _, run in first_segment['runs'][1:]: run.text = '' for segment in red_segments[1:]: for _, _, run in segment['runs']: run.text = '' if len(replacement_lines) > 1 and red_segments: try: first_run = red_segments[0]['runs'][0][2] paragraph = first_run.element.getparent() from docx.oxml import OxmlElement parent = first_run.element.getparent() for line in replacement_lines[1:]: if line.strip(): br = OxmlElement('w:br') first_run.element.append(br) new_run = paragraph.add_run(line.strip()) new_run.font.color.rgb = RGBColor(0, 0, 0) except Exception: if red_segments and red_segments[0]['runs']: first_run = red_segments[0]['runs'][0][2] first_run.text = ' '.join(replacement_lines) first_run.font.color.rgb = RGBColor(0, 0, 0) return replacements_made def replace_single_segment(segment, replacement_text): if not segment['runs']: return False first_run = segment['runs'][0][2] first_run.text = replacement_text first_run.font.color.rgb = RGBColor(0, 0, 0) for _, _, run in segment['runs'][1:]: run.text = '' return True def replace_red_text_in_cell(cell, replacement_text): red_segments = extract_red_text_segments(cell) if not red_segments: return 0 return replace_all_red_segments(red_segments, replacement_text) # ============================================================================ # SPECIALIZED TABLE HANDLERS # ============================================================================ def handle_australian_company_number(row, company_numbers): replacements_made = 0 for i, digit in enumerate(company_numbers): cell_idx = i + 1 if cell_idx < len(row.cells): cell = row.cells[cell_idx] if has_red_text(cell): cell_replacements = replace_red_text_in_cell(cell, str(digit)) replacements_made += cell_replacements print(f" -> Placed digit '{digit}' in cell {cell_idx + 1}") return replacements_made def handle_vehicle_registration_table(table, flat_json): """Handle vehicle registration table data replacement (improved header normalization and context-aware selection)""" replacements_made = 0 # build a table_text context (used to find mass/maintenance/fatigue) table_text = "" for r in table.rows[:3]: for c in r.cells: table_text += get_clean_text(c).lower() + " " # 1) Detect the most relevant vehicle-related JSON section using context tokens vehicle_section = None context_tokens = [] if "mass" in table_text: context_tokens.append("mass") if "maintenance" in table_text: context_tokens.append("maintenance") if "fatigue" in table_text or "driver" in table_text or "scheduler" in table_text: context_tokens.append("fatigue") # candidate keys that mention 'registration' or 'vehicle' candidates = [] for key, value in flat_json.items(): k = key.lower() if "registration" in k or "vehicle registration" in k or "vehicle" in k: candidates.append((key, value)) # prefer candidates whose key contains one of the context tokens if candidates and context_tokens: for token in context_tokens: for k, v in candidates: if token in k.lower(): vehicle_section = v if isinstance(v, (list, dict)) else {k: v} print(f" ✅ Found vehicle data by context token '{token}' in key '{k}'") break if vehicle_section: break # fallback: choose candidate containing 'registration' explicitly if vehicle_section is None and candidates: for k, v in candidates: if "registration" in k.lower(): vehicle_section = v if isinstance(v, (list, dict)) else {k: v} print(f" ✅ Fallback vehicle data chosen from '{k}'") break # fallback: collect flattened keys that look like vehicle columns if vehicle_section is None: potential_columns = {} for key, value in flat_json.items(): lk = key.lower() if any(col_name in lk for col_name in ["registration number", "sub-contractor", "weight verification", "rfs suspension", "trip records", "suspension", "daily checks", "fault recording", "fault repair", "roadworthiness"]): if "." in key: column_name = key.split(".")[-1] else: column_name = key potential_columns[column_name] = value if potential_columns: vehicle_section = potential_columns print(f" ✅ Found vehicle data from flattened keys: {list(vehicle_section.keys())}") if not vehicle_section: print(f" ❌ Vehicle registration data not found in JSON") return 0 # ensure vehicle_section is a dict mapping column_name -> list/value if isinstance(vehicle_section, list): # if a list of dicts, attempt to flatten into columns if vehicle_section and isinstance(vehicle_section[0], dict): flattened = {} for entry in vehicle_section: for k, v in entry.items(): flattened.setdefault(k, []).append(v) vehicle_section = flattened if not isinstance(vehicle_section, dict): # convert single scalar to dict try: vehicle_section = dict(vehicle_section) except Exception: vehicle_section = {str(k): v for k, v in (vehicle_section.items() if isinstance(vehicle_section, dict) else [])} print(f" ✅ Found vehicle registration data with {len(vehicle_section)} columns") # Find header row index by searching for a row that contains 'registration' + 'number' header_row_idx = -1 header_row = None for row_idx, row in enumerate(table.rows): row_text = " ".join(get_clean_text(cell).lower() for cell in row.cells) if "registration" in row_text and "number" in row_text: header_row_idx = row_idx header_row = row break if header_row_idx == -1: # try alternative detection: a row with 'registration' or 'reg no' for row_idx, row in enumerate(table.rows): row_text = " ".join(get_clean_text(cell).lower() for cell in row.cells) if "registration" in row_text or "reg no" in row_text or "regno" in row_text: header_row_idx = row_idx header_row = row break if header_row_idx == -1: print(f" ❌ Could not find header row in vehicle table") return 0 print(f" ✅ Found header row at index {header_row_idx}") # Enhanced column mapping: normalize both header and candidate keys, token overlap scoring column_mapping = {} # build normalized master map from vehicle_section keys master_labels = {} for orig_key in vehicle_section.keys(): norm = normalize_header_text(str(orig_key)) if norm: master_labels.setdefault(norm, orig_key) # add fallback synonyms for common labels (preserve existing) fallback_synonyms = [ "no", "registration number", "reg no", "registration", "sub contractor", "sub-contractor", "sub contracted", "weight verification records", "rfs suspension certification", "suspension system maintenance", "trip records", "fault recording reporting", "daily checks", "roadworthiness certificates", "maintenance records", "fault repair" ] for syn in fallback_synonyms: norm = normalize_header_text(syn) if norm and norm not in master_labels: master_labels.setdefault(norm, syn) # map header cells for col_idx, cell in enumerate(header_row.cells): header_text = get_clean_text(cell).strip() if not header_text: continue # skip 'No.' column mapping attempts in many templates if header_text.strip().lower() in {"no", "no.", "#"}: continue norm_header = normalize_header_text(header_text) best_match = None best_score = 0.0 # exact normalized match if norm_header in master_labels: best_match = master_labels[norm_header] best_score = 1.0 else: # token overlap scoring header_tokens = set(t for t in norm_header.split() if len(t) > 2) for norm_key, orig_label in master_labels.items(): key_tokens = set(t for t in norm_key.split() if len(t) > 2) if not key_tokens: continue common = header_tokens.intersection(key_tokens) if common: score = len(common) / max(1, len(header_tokens.union(key_tokens))) else: # substring fallback if norm_header in norm_key or norm_key in norm_header: score = min(len(norm_header), len(norm_key)) / max(len(norm_header), len(norm_key)) else: score = 0.0 if score > best_score: best_score = score best_match = orig_label if best_match and best_score >= 0.30: column_mapping[col_idx] = best_match print(f" 📌 Column {col_idx}: '{header_text}' -> '{best_match}' (norm: '{norm_header}', score: {best_score:.2f})") else: print(f" ⚠️ No mapping found for '{header_text}' (norm: '{norm_header}')") record_unmatched_header(header_text) if not column_mapping: print(f" ❌ No column mappings found") return 0 # Determine number of rows to populate max_data_rows = 0 for json_key, data in vehicle_section.items(): if isinstance(data, list): max_data_rows = max(max_data_rows, len(data)) print(f" 📌 Need to populate {max_data_rows} data rows") # Fill or add rows as needed for data_row_index in range(max_data_rows): table_row_idx = header_row_idx + 1 + data_row_index if table_row_idx >= len(table.rows): print(f" ⚠️ Row {table_row_idx + 1} doesn't exist - table only has {len(table.rows)} rows") print(f" ➕ Adding new row for vehicle {data_row_index + 1}") new_row = table.add_row() print(f" ✅ Successfully added row {len(table.rows)} to the table") row = table.rows[table_row_idx] print(f" 📌 Processing data row {table_row_idx + 1} (vehicle {data_row_index + 1})") for col_idx, json_key in column_mapping.items(): if col_idx < len(row.cells): cell = row.cells[col_idx] column_data = vehicle_section.get(json_key, []) if isinstance(column_data, list) and data_row_index < len(column_data): replacement_value = str(column_data[data_row_index]) cell_text = get_clean_text(cell) if has_red_text(cell) or not cell_text.strip(): if not cell_text.strip(): cell.text = replacement_value replacements_made += 1 print(f" -> Added '{replacement_value}' to empty cell (column '{json_key}')") else: cell_replacements = replace_red_text_in_cell(cell, replacement_value) replacements_made += cell_replacements if cell_replacements > 0: print(f" -> Replaced red text with '{replacement_value}' (column '{json_key}')") return replacements_made def handle_attendance_list_table_enhanced(table, flat_json): """Enhanced Attendance List processing with better detection""" replacements_made = 0 attendance_patterns = [ "attendance list", "names and position titles", "attendees" ] found_attendance_row = None for row_idx, row in enumerate(table.rows[:3]): for cell_idx, cell in enumerate(row.cells): cell_text = get_clean_text(cell).lower() if any(pattern in cell_text for pattern in attendance_patterns): found_attendance_row = row_idx print(f" 🎯 ENHANCED: Found Attendance List in row {row_idx + 1}, cell {cell_idx + 1}") break if found_attendance_row is not None: break if found_attendance_row is None: return 0 attendance_value = None attendance_search_keys = [ "Attendance List (Names and Position Titles).Attendance List (Names and Position Titles)", "Attendance List (Names and Position Titles)", "attendance list", "attendees" ] print(f" 🔍 Searching for attendance data in JSON...") for search_key in attendance_search_keys: attendance_value = find_matching_json_value(search_key, flat_json) if attendance_value is not None: print(f" ✅ Found attendance data with key: '{search_key}'") print(f" 📊 Raw value: {attendance_value}") break if attendance_value is None: print(f" ❌ No attendance data found in JSON") return 0 target_cell = None print(f" 🔍 Scanning ALL cells in attendance table for red text...") for row_idx, row in enumerate(table.rows): for cell_idx, cell in enumerate(row.cells): if has_red_text(cell): print(f" 🎯 Found red text in row {row_idx + 1}, cell {cell_idx + 1}") red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text print(f" 📋 Red text content: '{red_text[:80]}...'") red_text_lower = red_text.lower() if any(indicator in red_text_lower for indicator in ['manager', '–', '-']): target_cell = cell print(f" ✅ This looks like attendance data - using this cell") break if target_cell is not None: break if target_cell is None: print(f" ⚠️ No red text found that looks like attendance data") return 0 if has_red_text(target_cell): print(f" 🔧 Replacing red text with properly formatted attendance list...") if isinstance(attendance_value, list): attendance_list = [str(item).strip() for item in attendance_value if str(item).strip()] else: attendance_list = [str(attendance_value).strip()] print(f" 📝 Attendance items to add:") for i, item in enumerate(attendance_list): print(f" {i+1}. {item}") replacement_text = "\n".join(attendance_list) cell_replacements = replace_red_text_in_cell(target_cell, replacement_text) replacements_made += cell_replacements print(f" ✅ Added {len(attendance_list)} attendance items") print(f" 📊 Replacements made: {cell_replacements}") return replacements_made def fix_management_summary_details_column(table, flat_json): """Fix the DETAILS column in Management Summary table (multi-management aware).""" replacements_made = 0 print(f" 🎯 FIX: Management Summary DETAILS column processing") # Build table text to detect management type(s) table_text = "" for row in table.rows[:3]: for cell in row.cells: table_text += get_clean_text(cell).lower() + " " # Identify which management types this table likely represents mgmt_types = [] if "mass management" in table_text or "mass" in table_text: mgmt_types.append("Mass Management Summary") if "maintenance management" in table_text or "maintenance" in table_text: mgmt_types.append("Maintenance Management Summary") if "fatigue management" in table_text or "fatigue" in table_text or "driver" in table_text: mgmt_types.append("Fatigue Management Summary") if not mgmt_types: # fallback: try fuzzy detection through headings or presence of "Std 5." etc. if any("std 5" in get_clean_text(c).lower() for r in table.rows for c in r.cells): mgmt_types.append("Mass Management Summary") if not mgmt_types: return 0 for mgmt_type in mgmt_types: print(f" ✅ Confirmed {mgmt_type} table processing") # find data dict in flat_json for mgmt_type mgmt_data = flat_json.get(mgmt_type) if not isinstance(mgmt_data, dict): # attempt suffix based keys in flat_json for key in flat_json.keys(): if mgmt_type.split()[0].lower() in key.lower() and "summary" in key.lower(): mgmt_data = flat_json.get(key) break if not isinstance(mgmt_data, dict): print(f" ⚠️ No JSON management dict found for {mgmt_type}, skipping this type") continue # Process rows looking for Std 5. and Std 6. for row_idx, row in enumerate(table.rows): if len(row.cells) >= 2: standard_cell = row.cells[0] details_cell = row.cells[1] standard_text = get_clean_text(standard_cell).strip().lower() # Std 5. if "std 5" in standard_text or "verification" in standard_text: if has_red_text(details_cell): print(f" 🔍 Found Std 5/Verification with red text") # try to find the appropriate key in mgmt_data std_val = None # exact key variants for candidate in ("Std 5. Verification", "Std 5 Verification", "Std 5", "Verification"): std_val = mgmt_data.get(candidate) if std_val is not None: break # fuzzy fallback if std_val is None: for k, v in mgmt_data.items(): if 'std 5' in k.lower() or 'verification' in k.lower(): std_val = v break if std_val is not None: replacement_text = get_value_as_string(std_val, "Std 5. Verification") cell_replacements = replace_red_text_in_cell(details_cell, replacement_text) replacements_made += cell_replacements if cell_replacements: print(f" ✅ Replaced Std 5. Verification details for {mgmt_type}") # Std 6. if "std 6" in standard_text or "internal review" in standard_text: if has_red_text(details_cell): print(f" 🔍 Found Std 6/Internal Review with red text") std_val = None for candidate in ("Std 6. Internal Review", "Std 6 Internal Review", "Std 6", "Internal Review"): std_val = mgmt_data.get(candidate) if std_val is not None: break if std_val is None: for k, v in mgmt_data.items(): if 'std 6' in k.lower() or 'internal review' in k.lower(): std_val = v break if std_val is not None: replacement_text = get_value_as_string(std_val, "Std 6. Internal Review") cell_replacements = replace_red_text_in_cell(details_cell, replacement_text) replacements_made += cell_replacements if cell_replacements: print(f" ✅ Replaced Std 6. Internal Review details for {mgmt_type}") return replacements_made # Canonical operator declaration fixer (keeps original robust logic) def fix_operator_declaration_empty_values(table, flat_json): replacements_made = 0 print(f" 🎯 FIX: Operator Declaration empty values processing") table_context = "" for row in table.rows: for cell in row.cells: table_context += get_clean_text(cell).lower() + " " if not ("print name" in table_context and "position title" in table_context): return 0 print(f" ✅ Confirmed Operator Declaration table") def parse_name_and_position(value): if value is None: return None, None if isinstance(value, list): if len(value) == 0: return None, None if len(value) == 1: return str(value[0]).strip(), None first = str(value[0]).strip() second = str(value[1]).strip() if first and second: return first, second value = " ".join(str(v).strip() for v in value if str(v).strip()) s = str(value).strip() if not s: return None, None parts = re.split(r'\s+[-–—]\s+|\s*,\s*|\s*\|\s*', s) if len(parts) >= 2: left = parts[0].strip() right = parts[1].strip() role_indicators = ['manager', 'auditor', 'owner', 'director', 'supervisor', 'coordinator', 'driver', 'operator', 'representative', 'chief'] if any(ind in right.lower() for ind in role_indicators) or len(right.split()) <= 4: return left, right if any(ind in left.lower() for ind in role_indicators) and not any(ind in right.lower() for ind in role_indicators): return right, left return left, right tokens = s.split() if len(tokens) >= 2: last = tokens[-1] role_indicators = ['manager', 'auditor', 'owner', 'director', 'supervisor', 'coordinator', 'driver', 'operator', 'representative', 'chief'] if any(ind == last.lower() for ind in role_indicators): return " ".join(tokens[:-1]), last return s, None for row_idx, row in enumerate(table.rows): if len(row.cells) >= 2: cell1_text = get_clean_text(row.cells[0]).strip().lower() cell2_text = get_clean_text(row.cells[1]).strip().lower() if "print name" in cell1_text and "position" in cell2_text: print(f" 📌 Found header row at {row_idx + 1}") if row_idx + 1 < len(table.rows): data_row = table.rows[row_idx + 1] if len(data_row.cells) >= 2: name_cell = data_row.cells[0] position_cell = data_row.cells[1] name_text = get_clean_text(name_cell).strip() position_text = get_clean_text(position_cell).strip() print(f" 📋 Current values: Name='{name_text}', Position='{position_text}'") name_value = find_matching_json_value("Operator Declaration.Print Name", flat_json) if name_value is None: name_value = find_matching_json_value("Print Name", flat_json) position_value = find_matching_json_value("Operator Declaration.Position Title", flat_json) if position_value is None: position_value = find_matching_json_value("Position Title", flat_json) parsed_name_from_nameval, parsed_pos_from_nameval = parse_name_and_position(name_value) if name_value is not None else (None, None) parsed_name_from_posval, parsed_pos_from_posval = parse_name_and_position(position_value) if position_value is not None else (None, None) final_name = None final_pos = None if parsed_name_from_nameval: final_name = parsed_name_from_nameval elif name_value is not None: final_name = get_value_as_string(name_value) if parsed_pos_from_posval: final_pos = parsed_pos_from_posval elif position_value is not None: final_pos = get_value_as_string(position_value) elif parsed_pos_from_nameval: final_pos = parsed_pos_from_nameval if isinstance(final_name, list): final_name = " ".join(str(x) for x in final_name).strip() if isinstance(final_pos, list): final_pos = " ".join(str(x) for x in final_pos).strip() if isinstance(final_name, str): final_name = final_name.strip() if isinstance(final_pos, str): final_pos = final_pos.strip() def looks_like_person(name_str): if not name_str: return False bad_phrases = ["pty ltd", "company", "farming", "p/l", "plc"] low = name_str.lower() if any(bp in low for bp in bad_phrases): return False return len(name_str) > 1 if (not name_text or has_red_text(name_cell)) and final_name and looks_like_person(final_name): if has_red_text(name_cell): replace_red_text_in_cell(name_cell, final_name) else: name_cell.text = final_name replacements_made += 1 print(f" ✅ Updated Print Name -> '{final_name}'") if (not position_text or has_red_text(position_cell)) and final_pos: if has_red_text(position_cell): replace_red_text_in_cell(position_cell, final_pos) else: position_cell.text = final_pos replacements_made += 1 print(f" ✅ Updated Position Title -> '{final_pos}'") break if replacements_made > 0: try: setattr(table, "_processed_operator_declaration", True) print(" 🔖 Marked table as processed by Operator Declaration handler") except Exception: pass return replacements_made def handle_multiple_red_segments_in_cell(cell, flat_json): replacements_made = 0 red_segments = extract_red_text_segments(cell) if not red_segments: return 0 for i, segment in enumerate(red_segments): segment_text = segment['text'].strip() if segment_text: json_value = find_matching_json_value(segment_text, flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value, segment_text) if replace_single_segment(segment, replacement_text): replacements_made += 1 print(f" ✅ Replaced segment {i+1}: '{segment_text}' -> '{replacement_text}'") return replacements_made def handle_nature_business_multiline_fix(cell, flat_json): replacements_made = 0 red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text red_text = red_text.strip() if not red_text: return 0 nature_indicators = ["transport", "logistics", "freight", "delivery", "trucking", "haulage"] if any(indicator in red_text.lower() for indicator in nature_indicators): nature_value = find_matching_json_value("Nature of Business", flat_json) if nature_value is not None: replacement_text = get_value_as_string(nature_value, "Nature of Business") cell_replacements = replace_red_text_in_cell(cell, replacement_text) replacements_made += cell_replacements print(f" ✅ Fixed Nature of Business multiline content") return replacements_made def handle_management_summary_fix(cell, flat_json): replacements_made = 0 red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text red_text = red_text.strip() if not red_text: return 0 management_types = ["Mass Management Summary", "Maintenance Management Summary", "Fatigue Management Summary"] for mgmt_type in management_types: if mgmt_type in flat_json: mgmt_data = flat_json[mgmt_type] if isinstance(mgmt_data, dict): for std_key, std_value in mgmt_data.items(): if isinstance(std_value, list) and std_value: if len(red_text) > 10: for item in std_value: if red_text.lower() in str(item).lower() or str(item).lower() in red_text.lower(): replacement_text = "\n".join(str(i) for i in std_value) cell_replacements = replace_red_text_in_cell(cell, replacement_text) replacements_made += cell_replacements print(f" ✅ Fixed {mgmt_type} - {std_key}") return replacements_made return replacements_made # ============================================================================ # SMALL OPERATOR/AUDITOR TABLE HANDLER (skip if already processed) # ============================================================================ def handle_operator_declaration_fix(table, flat_json): replacements_made = 0 if getattr(table, "_processed_operator_declaration", False): print(f" ⏭️ Skipping - Operator Declaration table already processed") return 0 if len(table.rows) > 4: return 0 replaced = fix_operator_declaration_empty_values(table, flat_json) replacements_made += replaced if replaced: return replacements_made def is_date_like(s: str) -> bool: if not s: return False s = s.strip() month_names = r"(jan|feb|mar|apr|may|jun|jul|aug|sep|sept|oct|nov|dec|january|february|march|april|may|june|july|august|september|october|november|december)" if re.search(r"\bDate\b", s, re.IGNORECASE): return True if re.search(r"\b\d{1,2}(?:st|nd|rd|th)?\b\s+" + month_names, s, re.IGNORECASE): return True if re.search(month_names + r".*\b\d{4}\b", s, re.IGNORECASE): return True if re.search(r"\b\d{1,2}[\/\.\-]\d{1,2}[\/\.\-]\d{2,4}\b", s): return True if re.search(r"\b\d{4}[\/\.\-]\d{1,2}[\/\.\-]\d{1,2}\b", s): return True if re.fullmatch(r"\d{4}", s): return True return False def looks_like_person_name(s: str) -> bool: if not s: return False low = s.lower().strip() bad_terms = ["pty ltd", "p/l", "plc", "company", "farming", "farm", "trust", "ltd"] if any(bt in low for bt in bad_terms): return False if len(low) < 3: return False return bool(re.search(r"[a-zA-Z]", low)) def looks_like_position(s: str) -> bool: if not s: return False low = s.lower() roles = ["manager", "auditor", "owner", "director", "supervisor", "coordinator", "driver", "operator", "representative", "chief"] return any(r in low for r in roles) print(f" 🎯 Processing other declaration table (fallback small-table behavior)") for row_idx, row in enumerate(table.rows): for cell_idx, cell in enumerate(row.cells): if not has_red_text(cell): continue declaration_fields = [ "NHVAS Approved Auditor Declaration.Print Name", "Auditor name", "Signature", "Date" ] replaced_this_cell = False for field in declaration_fields: field_value = find_matching_json_value(field, flat_json) if field_value is None: continue replacement_text = get_value_as_string(field_value, field).strip() if not replacement_text: continue if is_date_like(replacement_text): red_text = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red(run)).strip() if "date" not in red_text.lower(): print(f" ⚠️ Skipping date-like replacement for field '{field}' -> '{replacement_text[:30]}...'") continue if (looks_like_person_name(replacement_text) or looks_like_position(replacement_text) or "signature" in field.lower() or "date" in field.lower()): cell_replacements = replace_red_text_in_cell(cell, replacement_text) if cell_replacements > 0: replacements_made += cell_replacements replaced_this_cell = True print(f" ✅ Fixed declaration field: {field} -> '{replacement_text}'") break else: print(f" ⚠️ Replacement for field '{field}' does not look like name/role, skipping: '{replacement_text[:30]}...'") continue if not replaced_this_cell: red_text = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red(run)).strip().lower() if "signature" in red_text: cell_replacements = replace_red_text_in_cell(cell, "[Signature]") if cell_replacements > 0: replacements_made += cell_replacements print(f" ✅ Inserted placeholder [Signature]") elif "date" in red_text: date_value = find_matching_json_value("Date", flat_json) or find_matching_json_value("Date of Audit", flat_json) or find_matching_json_value("Audit was conducted on", flat_json) if date_value is not None: date_text = get_value_as_string(date_value) if not is_date_like(date_text): print(f" ⚠️ Found date-value but not date-like, skipping: '{date_text}'") else: cell_replacements = replace_red_text_in_cell(cell, date_text) if cell_replacements > 0: replacements_made += cell_replacements print(f" ✅ Inserted date value: '{date_text}'") if replacements_made > 0: try: setattr(table, "_processed_operator_declaration", True) print(" 🔖 Marked table as processed by operator declaration fallback") except Exception: pass return replacements_made def handle_print_accreditation_section(table, flat_json): replacements_made = 0 if getattr(table, "_processed_operator_declaration", False): print(f" ⏭️ Skipping Print Accreditation - this is an Operator Declaration table") return 0 table_context = "" for row in table.rows: for cell in row.cells: table_context += get_clean_text(cell).lower() + " " if "operator declaration" in table_context or ("print name" in table_context and "position title" in table_context): print(f" ⏭️ Skipping Print Accreditation - this is an Operator Declaration table") return 0 print(f" 📋 Processing Print Accreditation section") for row_idx, row in enumerate(table.rows): for cell_idx, cell in enumerate(row.cells): if has_red_text(cell): accreditation_fields = [ "(print accreditation name)", "Operator name (Legal entity)", "Print accreditation name", "(print accreditation name)" ] for field in accreditation_fields: field_value = find_matching_json_value(field, flat_json) if field_value is not None: replacement_text = get_value_as_string(field_value, field) if replacement_text.strip(): cell_replacements = replace_red_text_in_cell(cell, replacement_text) replacements_made += cell_replacements if cell_replacements > 0: print(f" ✅ Fixed accreditation: {field}") break return replacements_made def process_single_column_sections(cell, key_text, flat_json): replacements_made = 0 if has_red_text(cell): red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text if red_text.strip(): section_value = find_matching_json_value(red_text.strip(), flat_json) if section_value is None: section_value = find_matching_json_value(key_text, flat_json) if section_value is not None: section_replacement = get_value_as_string(section_value, red_text.strip()) cell_replacements = replace_red_text_in_cell(cell, section_replacement) replacements_made += cell_replacements if cell_replacements > 0: print(f" ✅ Fixed single column section: '{key_text}'") return replacements_made # ============================================================================ # MAIN TABLE/PARAGRAPH PROCESSING # ============================================================================ def process_tables(document, flat_json): """Process all tables in the document with comprehensive fixes""" replacements_made = 0 for table_idx, table in enumerate(document.tables): print(f"\n🔍 Processing table {table_idx + 1}:") # collect brief context table_text = "" for row in table.rows[:3]: for cell in row.cells: table_text += get_clean_text(cell).lower() + " " # detect management summary & details column management_summary_indicators = ["mass management", "maintenance management", "fatigue management"] has_management = any(indicator in table_text for indicator in management_summary_indicators) has_details = "details" in table_text if has_management and has_details: print(f" 📋 Detected Management Summary table") summary_fixes = fix_management_summary_details_column(table, flat_json) replacements_made += summary_fixes # Process remaining red text in management summary summary_replacements = 0 for row_idx, row in enumerate(table.rows): for cell_idx, cell in enumerate(row.cells): if has_red_text(cell): # Try direct matching with new schema names first for mgmt_type in ["Mass Management Summary", "Maintenance Management Summary", "Fatigue Management Summary"]: if mgmt_type.lower().replace(" summary", "") in table_text: if mgmt_type in flat_json: mgmt_data = flat_json[mgmt_type] if isinstance(mgmt_data, dict): for std_key, std_value in mgmt_data.items(): if isinstance(std_value, list) and len(std_value) > 0: red_text = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red(run)).strip() for item in std_value: if len(red_text) > 15 and red_text.lower() in str(item).lower(): replacement_text = "\n".join(str(i) for i in std_value) cell_replacements = replace_red_text_in_cell(cell, replacement_text) summary_replacements += cell_replacements print(f" ✅ Updated {std_key} with summary data") break break if summary_replacements == 0: cell_replacements = handle_management_summary_fix(cell, flat_json) summary_replacements += cell_replacements replacements_made += summary_replacements continue # Detect Vehicle Registration tables vehicle_indicators = ["registration number", "sub-contractor", "weight verification", "rfs suspension", "registration"] indicator_count = sum(1 for indicator in vehicle_indicators if indicator in table_text) if indicator_count >= 2: print(f" 🚗 Detected Vehicle Registration table") vehicle_replacements = handle_vehicle_registration_table(table, flat_json) replacements_made += vehicle_replacements continue # Detect Attendance List tables if "attendance list" in table_text and "names and position titles" in table_text: print(f" 👥 Detected Attendance List table") attendance_replacements = handle_attendance_list_table_enhanced(table, flat_json) replacements_made += attendance_replacements continue # Detect Print Accreditation / Operator Declaration tables print_accreditation_indicators = ["print name", "position title"] indicator_count = sum(1 for indicator in print_accreditation_indicators if indicator in table_text) if indicator_count >= 2 or ("print name" in table_text and "position title" in table_text): print(f" 📋 Detected Print Accreditation/Operator Declaration table") declaration_fixes = fix_operator_declaration_empty_values(table, flat_json) replacements_made += declaration_fixes if not getattr(table, "_processed_operator_declaration", False): print_accreditation_replacements = handle_print_accreditation_section(table, flat_json) replacements_made += print_accreditation_replacements continue # Process regular table rows (original logic preserved) for row_idx, row in enumerate(table.rows): if len(row.cells) < 1: continue key_cell = row.cells[0] key_text = get_clean_text(key_cell) if not key_text: continue print(f" 📌 Row {row_idx + 1}: Key = '{key_text}'") json_value = find_matching_json_value(key_text, flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value, key_text) if ("australian company number" in key_text.lower() or "company number" in key_text.lower()) and isinstance(json_value, list): cell_replacements = handle_australian_company_number(row, json_value) replacements_made += cell_replacements elif ("attendance list" in key_text.lower() or "nature of" in key_text.lower()) and row_idx + 1 < len(table.rows): print(f" ✅ Section header detected, checking next row...") next_row = table.rows[row_idx + 1] for cell_idx, cell in enumerate(next_row.cells): if has_red_text(cell): print(f" ✅ Found red text in next row, cell {cell_idx + 1}") if isinstance(json_value, list): replacement_text = "\n".join(str(item) for item in json_value) cell_replacements = replace_red_text_in_cell(cell, replacement_text) replacements_made += cell_replacements if cell_replacements > 0: print(f" -> Replaced section content") elif len(row.cells) == 1 or (len(row.cells) > 1 and not any(has_red_text(row.cells[i]) for i in range(1, len(row.cells)))): if has_red_text(key_cell): cell_replacements = process_single_column_sections(key_cell, key_text, flat_json) replacements_made += cell_replacements else: for cell_idx in range(1, len(row.cells)): value_cell = row.cells[cell_idx] if has_red_text(value_cell): print(f" ✅ Found red text in column {cell_idx + 1}") cell_replacements = replace_red_text_in_cell(value_cell, replacement_text) replacements_made += cell_replacements else: if len(row.cells) == 1 and has_red_text(key_cell): red_text = "" for paragraph in key_cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text if red_text.strip(): section_value = find_matching_json_value(red_text.strip(), flat_json) if section_value is not None: section_replacement = get_value_as_string(section_value, red_text.strip()) cell_replacements = replace_red_text_in_cell(key_cell, section_replacement) replacements_made += cell_replacements for cell_idx in range(len(row.cells)): cell = row.cells[cell_idx] if has_red_text(cell): cell_replacements = handle_multiple_red_segments_in_cell(cell, flat_json) replacements_made += cell_replacements if cell_replacements == 0: surgical_fix = handle_nature_business_multiline_fix(cell, flat_json) replacements_made += surgical_fix if cell_replacements == 0: management_summary_fix = handle_management_summary_fix(cell, flat_json) replacements_made += management_summary_fix # Final declaration checks on last few tables print(f"\n🎯 Final check for Declaration tables...") for table in document.tables[-3:]: if len(table.rows) <= 4: if getattr(table, "_processed_operator_declaration", False): print(f" ⏭️ Skipping - already processed by operator declaration handler") continue declaration_fix = handle_operator_declaration_fix(table, flat_json) replacements_made += declaration_fix return replacements_made def process_paragraphs(document, flat_json): """Process all paragraphs in the document""" replacements_made = 0 print(f"\n🔍 Processing paragraphs:") for para_idx, paragraph in enumerate(document.paragraphs): red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] if red_runs: red_text_only = "".join(run.text for run in red_runs).strip() print(f" 📌 Paragraph {para_idx + 1}: Found red text: '{red_text_only}'") json_value = find_matching_json_value(red_text_only, flat_json) if json_value is None: if "AUDITOR SIGNATURE" in red_text_only.upper() or "DATE" in red_text_only.upper(): json_value = find_matching_json_value("auditor signature", flat_json) elif "OPERATOR SIGNATURE" in red_text_only.upper(): json_value = find_matching_json_value("operator signature", flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value) print(f" ✅ Replacing red text with: '{replacement_text}'") red_runs[0].text = replacement_text red_runs[0].font.color.rgb = RGBColor(0, 0, 0) for run in red_runs[1:]: run.text = '' replacements_made += 1 return replacements_made def process_headings(document, flat_json): """Process headings and their related content""" replacements_made = 0 print(f"\n🔍 Processing headings:") paragraphs = document.paragraphs for para_idx, paragraph in enumerate(paragraphs): paragraph_text = paragraph.text.strip() if not paragraph_text: continue matched_heading = None for category, patterns in HEADING_PATTERNS.items(): for pattern in patterns: if re.search(pattern, paragraph_text, re.IGNORECASE): matched_heading = pattern break if matched_heading: break if matched_heading: print(f" 📌 Found heading at paragraph {para_idx + 1}: '{paragraph_text}'") if has_red_text_in_paragraph(paragraph): print(f" 🔴 Found red text in heading itself") heading_replacements = process_red_text_in_paragraph(paragraph, paragraph_text, flat_json) replacements_made += heading_replacements for next_para_offset in range(1, 6): next_para_idx = para_idx + next_para_offset if next_para_idx >= len(paragraphs): break next_paragraph = paragraphs[next_para_idx] next_text = next_paragraph.text.strip() if not next_text: continue is_another_heading = False for category, patterns in HEADING_PATTERNS.items(): for pattern in patterns: if re.search(pattern, next_text, re.IGNORECASE): is_another_heading = True break if is_another_heading: break if is_another_heading: break if has_red_text_in_paragraph(next_paragraph): print(f" 🔴 Found red text in paragraph {next_para_idx + 1} after heading") context_replacements = process_red_text_in_paragraph( next_paragraph, paragraph_text, flat_json ) replacements_made += context_replacements return replacements_made def process_red_text_in_paragraph(paragraph, context_text, flat_json): """Process red text within a paragraph using context""" replacements_made = 0 red_text_segments = [] for run in paragraph.runs: if is_red(run) and run.text.strip(): red_text_segments.append(run.text.strip()) if not red_text_segments: return 0 combined_red_text = " ".join(red_text_segments).strip() print(f" 🔍 Red text found: '{combined_red_text}'") json_value = None json_value = find_matching_json_value(combined_red_text, flat_json) if json_value is None: if "NHVAS APPROVED AUDITOR" in context_text.upper(): auditor_fields = ["auditor name", "auditor", "nhvas auditor", "approved auditor", "print name"] for field in auditor_fields: json_value = find_matching_json_value(field, flat_json) if json_value is not None: print(f" ✅ Found auditor match with field: '{field}'") break elif "OPERATOR DECLARATION" in context_text.upper(): operator_fields = ["operator name", "operator", "company name", "organisation name", "print name"] for field in operator_fields: json_value = find_matching_json_value(field, flat_json) if json_value is not None: print(f" ✅ Found operator match with field: '{field}'") break if json_value is None: context_queries = [ f"{context_text} {combined_red_text}", combined_red_text, context_text ] for query in context_queries: json_value = find_matching_json_value(query, flat_json) if json_value is not None: print(f" ✅ Found match with combined query") break if json_value is not None: replacement_text = get_value_as_string(json_value, combined_red_text) red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] if red_runs: red_runs[0].text = replacement_text red_runs[0].font.color.rgb = RGBColor(0, 0, 0) for run in red_runs[1:]: run.text = '' replacements_made = 1 print(f" ✅ Replaced with: '{replacement_text}'") else: print(f" ❌ No match found for red text: '{combined_red_text}'") return replacements_made # ============================================================================ # Main process function # ============================================================================ def process_hf(json_file, docx_file, output_file): """Main processing function with comprehensive error handling""" try: # Load JSON if hasattr(json_file, "read"): json_data = json.load(json_file) else: with open(json_file, 'r', encoding='utf-8') as f: json_data = json.load(f) flat_json = flatten_json(json_data) print("📄 Available JSON keys (sample):") for i, (key, value) in enumerate(sorted(flat_json.items())): if i < 10: print(f" - {key}: {value}") print(f" ... and {len(flat_json) - 10} more keys\n") # Load DOCX if hasattr(docx_file, "read"): doc = Document(docx_file) else: doc = Document(docx_file) # Process document with all fixes print("🚀 Starting comprehensive document processing...") table_replacements = process_tables(doc, flat_json) paragraph_replacements = process_paragraphs(doc, flat_json) heading_replacements = process_headings(doc, flat_json) total_replacements = table_replacements + paragraph_replacements + heading_replacements # Save unmatched headers for iterative improvement if _unmatched_headers: try: tmp_path = "/tmp/unmatched_headers.json" with open(tmp_path, 'w', encoding='utf-8') as f: json.dump(_unmatched_headers, f, indent=2, ensure_ascii=False) print(f"✅ Unmatched headers saved to {tmp_path}") except Exception as e: print(f"⚠️ Could not save unmatched headers: {e}") # Save output docx if hasattr(output_file, "write"): doc.save(output_file) else: # If output path is a file path string doc.save(output_file) print(f"\n✅ Document saved as: {output_file}") print(f"✅ Total replacements: {total_replacements}") print(f" 📊 Tables: {table_replacements}") print(f" 📝 Paragraphs: {paragraph_replacements}") print(f" 📋 Headings: {heading_replacements}") print(f"🎉 Processing complete!") except FileNotFoundError as e: print(f"❌ File not found: {e}") except Exception as e: print(f"❌ Error: {e}") import traceback traceback.print_exc() # ============================================================================ # CLI entrypoint # ============================================================================ if __name__ == "__main__": import sys if len(sys.argv) != 4: print("Usage: python pipeline.py ") exit(1) docx_path = sys.argv[1] json_path = sys.argv[2] output_path = sys.argv[3] process_hf(json_path, docx_path, output_path)