Spaces:
Running
Running
| import json | |
| from docx import Document | |
| from docx.shared import RGBColor | |
| import re | |
| # Enhanced heading patterns (ADDITIVE - keeps your existing ones) | |
| HEADING_PATTERNS = { | |
| "main": [ | |
| r"NHVAS\s+Audit\s+Summary\s+Report", | |
| r"NATIONAL\s+HEAVY\s+VEHICLE\s+ACCREDITATION\s+AUDIT\s+SUMMARY\s+REPORT", | |
| r"NHVAS\s+AUDIT\s+SUMMARY\s+REPORT" | |
| ], | |
| "sub": [ | |
| r"AUDIT\s+OBSERVATIONS\s+AND\s+COMMENTS", | |
| r"MAINTENANCE\s+MANAGEMENT", | |
| r"MASS\s+MANAGEMENT", | |
| r"FATIGUE\s+MANAGEMENT", | |
| r"Fatigue\s+Management\s+Summary\s+of\s+Audit\s+findings", | |
| r"MAINTENANCE\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS", | |
| r"MASS\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS", | |
| r"Vehicle\s+Registration\s+Numbers\s+of\s+Records\s+Examined", | |
| r"CORRECTIVE\s+ACTION\s+REQUEST\s+\(CAR\)", | |
| r"NHVAS\s+APPROVED\s+AUDITOR\s+DECLARATION", | |
| r"Operator\s+Declaration", | |
| r"Operator\s+Information", | |
| r"Driver\s*/\s*Scheduler\s+Records\s+Examined" | |
| ] | |
| } | |
| def load_json(filepath): | |
| with open(filepath, 'r') as file: | |
| return json.load(file) | |
| def flatten_json(y, prefix=''): | |
| out = {} | |
| for key, val in y.items(): | |
| new_key = f"{prefix}.{key}" if prefix else key | |
| if isinstance(val, dict): | |
| out.update(flatten_json(val, new_key)) | |
| else: | |
| out[new_key] = val | |
| out[key] = val | |
| return out | |
| def is_red(run): | |
| color = run.font.color | |
| return color and (color.rgb == RGBColor(255, 0, 0) or getattr(color, "theme_color", None) == 1) | |
| def get_value_as_string(value, field_name=""): | |
| if isinstance(value, list): | |
| if len(value) == 0: | |
| return "" | |
| elif len(value) == 1: | |
| return str(value[0]) | |
| else: | |
| if "australian company number" in field_name.lower() or "company number" in field_name.lower(): | |
| return value | |
| else: | |
| return " ".join(str(v) for v in value) | |
| else: | |
| return str(value) | |
| def find_matching_json_value(field_name, flat_json): | |
| """Enhanced dynamic matching without manual mappings""" | |
| field_name = field_name.strip() | |
| # Try exact match first | |
| if field_name in flat_json: | |
| print(f" β Direct match found for key '{field_name}'") | |
| return flat_json[field_name] | |
| # Try case-insensitive exact match | |
| for key, value in flat_json.items(): | |
| if key.lower() == field_name.lower(): | |
| print(f" β Case-insensitive match found for key '{field_name}' with JSON key '{key}'") | |
| return value | |
| # Try suffix matching (for nested keys like "section.field") | |
| for key, value in flat_json.items(): | |
| if '.' in key and key.split('.')[-1].lower() == field_name.lower(): | |
| print(f" β Suffix match found for key '{field_name}' with JSON key '{key}'") | |
| return value | |
| # Try partial matching - remove parentheses and special chars | |
| clean_field = re.sub(r'[^\w\s]', ' ', field_name.lower()).strip() | |
| clean_field = re.sub(r'\s+', ' ', clean_field) | |
| for key, value in flat_json.items(): | |
| clean_key = re.sub(r'[^\w\s]', ' ', key.lower()).strip() | |
| clean_key = re.sub(r'\s+', ' ', clean_key) | |
| if clean_field == clean_key: | |
| print(f" β Clean match found for key '{field_name}' with JSON key '{key}'") | |
| return value | |
| # Enhanced fuzzy matching with better scoring | |
| field_words = set(word.lower() for word in re.findall(r'\b\w+\b', field_name) if len(word) > 2) | |
| if not field_words: | |
| return None | |
| best_match = None | |
| best_score = 0 | |
| best_key = None | |
| for key, value in flat_json.items(): | |
| key_words = set(word.lower() for word in re.findall(r'\b\w+\b', key) if len(word) > 2) | |
| if not key_words: | |
| continue | |
| # Calculate similarity score | |
| common_words = field_words.intersection(key_words) | |
| if common_words: | |
| # Use Jaccard similarity: intersection / union | |
| similarity = len(common_words) / len(field_words.union(key_words)) | |
| # Bonus for high word coverage in field_name | |
| coverage = len(common_words) / len(field_words) | |
| final_score = (similarity * 0.6) + (coverage * 0.4) | |
| if final_score > best_score: | |
| best_score = final_score | |
| best_match = value | |
| best_key = key | |
| if best_match and best_score >= 0.25: # Lowered threshold for better coverage | |
| print(f" β Fuzzy match found for key '{field_name}' with JSON key '{best_key}' (score: {best_score:.2f})") | |
| return best_match | |
| print(f" β No match found for '{field_name}'") | |
| return None | |
| def get_clean_text(cell): | |
| text = "" | |
| for paragraph in cell.paragraphs: | |
| for run in paragraph.runs: | |
| text += run.text | |
| return text.strip() | |
| def has_red_text(cell): | |
| for paragraph in cell.paragraphs: | |
| for run in paragraph.runs: | |
| if is_red(run) and run.text.strip(): | |
| return True | |
| return False | |
| def extract_red_text_segments(cell): | |
| """Enhanced red text extraction with better multi-line handling""" | |
| red_segments = [] | |
| for para_idx, paragraph in enumerate(cell.paragraphs): | |
| current_segment = "" | |
| segment_runs = [] | |
| for run_idx, run in enumerate(paragraph.runs): | |
| if is_red(run): | |
| if run.text: | |
| current_segment += run.text | |
| segment_runs.append((para_idx, run_idx, run)) | |
| else: | |
| # End of current red segment | |
| if segment_runs: | |
| red_segments.append({ | |
| 'text': current_segment, | |
| 'runs': segment_runs.copy(), | |
| 'paragraph_idx': para_idx | |
| }) | |
| current_segment = "" | |
| segment_runs = [] | |
| # Handle segment at end of paragraph | |
| if segment_runs: | |
| red_segments.append({ | |
| 'text': current_segment, | |
| 'runs': segment_runs.copy(), | |
| 'paragraph_idx': para_idx | |
| }) | |
| return red_segments | |
| def replace_red_text_in_cell(cell, replacement_text): | |
| """Enhanced cell replacement with improved multi-line handling""" | |
| red_segments = extract_red_text_segments(cell) | |
| if not red_segments: | |
| return 0 | |
| if len(red_segments) > 1: | |
| replacements_made = 0 | |
| for segment in red_segments: | |
| segment_text = segment['text'].strip() | |
| if segment_text: | |
| pass | |
| if replacements_made == 0: | |
| return replace_all_red_segments(red_segments, replacement_text) | |
| return replace_all_red_segments(red_segments, replacement_text) | |
| def replace_all_red_segments(red_segments, replacement_text): | |
| """Enhanced replacement with better line handling""" | |
| if not red_segments: | |
| return 0 | |
| if '\n' in replacement_text: | |
| replacement_lines = replacement_text.split('\n') | |
| else: | |
| replacement_lines = [replacement_text] | |
| replacements_made = 0 | |
| if red_segments and replacement_lines: | |
| first_segment = red_segments[0] | |
| if first_segment['runs']: | |
| first_run = first_segment['runs'][0][2] | |
| first_run.text = replacement_lines[0] | |
| first_run.font.color.rgb = RGBColor(0, 0, 0) | |
| replacements_made = 1 | |
| for _, _, run in first_segment['runs'][1:]: | |
| run.text = '' | |
| for segment in red_segments[1:]: | |
| for _, _, run in segment['runs']: | |
| run.text = '' | |
| if len(replacement_lines) > 1 and red_segments: | |
| try: | |
| first_run = red_segments[0]['runs'][0][2] | |
| paragraph = first_run.element.getparent() | |
| for line in replacement_lines[1:]: | |
| if line.strip(): | |
| from docx.oxml import OxmlElement, ns | |
| br = OxmlElement('w:br') | |
| first_run.element.append(br) | |
| new_run = paragraph.add_run(line.strip()) | |
| new_run.font.color.rgb = RGBColor(0, 0, 0) | |
| except: | |
| if red_segments and red_segments[0]['runs']: | |
| first_run = red_segments[0]['runs'][0][2] | |
| first_run.text = ' '.join(replacement_lines) | |
| first_run.font.color.rgb = RGBColor(0, 0, 0) | |
| return replacements_made | |
| def analyze_table_structure(table): | |
| """NEW: Dynamic table structure analysis""" | |
| structure = { | |
| 'type': 'unknown', | |
| 'orientation': 'unknown', | |
| 'has_headers': False, | |
| 'column_count': 0, | |
| 'row_count': 0, | |
| 'red_text_locations': [] | |
| } | |
| if not table.rows: | |
| return structure | |
| structure['row_count'] = len(table.rows) | |
| structure['column_count'] = len(table.rows[0].cells) if table.rows else 0 | |
| # Analyze first row for headers | |
| first_row_text = [] | |
| for cell in table.rows[0].cells: | |
| cell_text = get_clean_text(cell).strip() | |
| first_row_text.append(cell_text) | |
| # Detect table type based on content patterns | |
| combined_text = " ".join(first_row_text).lower() | |
| if any(indicator in combined_text for indicator in ["registration", "vehicle", "maintenance", "mass"]): | |
| structure['type'] = 'vehicle_registration' | |
| elif any(indicator in combined_text for indicator in ["print name", "position", "auditor", "operator"]): | |
| structure['type'] = 'declaration' | |
| elif any(indicator in combined_text for indicator in ["std", "standard", "compliance"]): | |
| structure['type'] = 'compliance_matrix' | |
| elif len(table.rows[0].cells) == 2 and not any(indicator in combined_text for indicator in ["no.", "number"]): | |
| structure['type'] = 'key_value' | |
| else: | |
| structure['type'] = 'data_grid' | |
| # Find red text locations | |
| for row_idx, row in enumerate(table.rows): | |
| for cell_idx, cell in enumerate(row.cells): | |
| if has_red_text(cell): | |
| structure['red_text_locations'].append((row_idx, cell_idx)) | |
| structure['has_headers'] = len(structure['red_text_locations']) > 0 and (0, 0) not in structure['red_text_locations'] | |
| return structure | |
| def handle_multiple_red_segments_in_cell(cell, flat_json): | |
| """Enhanced multi-segment handling""" | |
| red_segments = extract_red_text_segments(cell) | |
| if not red_segments: | |
| return 0 | |
| print(f" π Found {len(red_segments)} red text segments in cell") | |
| replacements_made = 0 | |
| unmatched_segments = [] | |
| for i, segment in enumerate(red_segments): | |
| segment_text = segment['text'].strip() | |
| if not segment_text: | |
| continue | |
| print(f" Segment {i+1}: '{segment_text[:50]}...'") | |
| json_value = find_matching_json_value(segment_text, flat_json) | |
| if json_value is not None: | |
| replacement_text = get_value_as_string(json_value, segment_text) | |
| if isinstance(json_value, list) and len(json_value) > 1: | |
| replacement_text = "\n".join(str(item) for item in json_value if str(item).strip()) | |
| success = replace_single_segment(segment, replacement_text) | |
| if success: | |
| replacements_made += 1 | |
| print(f" β Replaced segment '{segment_text[:30]}...' with '{replacement_text[:30]}...'") | |
| else: | |
| unmatched_segments.append(segment) | |
| print(f" β³ No individual match for segment '{segment_text[:30]}...'") | |
| if unmatched_segments and replacements_made == 0: | |
| combined_text = " ".join(seg['text'] for seg in red_segments).strip() | |
| print(f" π Trying combined text match: '{combined_text[:50]}...'") | |
| json_value = find_matching_json_value(combined_text, flat_json) | |
| if json_value is not None: | |
| replacement_text = get_value_as_string(json_value, combined_text) | |
| if isinstance(json_value, list) and len(json_value) > 1: | |
| replacement_text = "\n".join(str(item) for item in json_value if str(item).strip()) | |
| replacements_made = replace_all_red_segments(red_segments, replacement_text) | |
| print(f" β Replaced combined text with '{replacement_text[:50]}...'") | |
| return replacements_made | |
| def replace_single_segment(segment, replacement_text): | |
| """Enhanced single segment replacement""" | |
| if not segment['runs']: | |
| return False | |
| first_run = segment['runs'][0][2] | |
| first_run.text = replacement_text | |
| first_run.font.color.rgb = RGBColor(0, 0, 0) | |
| for _, _, run in segment['runs'][1:]: | |
| run.text = '' | |
| return True | |
| def detect_table_type(table): | |
| """Enhanced table type detection""" | |
| structure = analyze_table_structure(table) | |
| return structure['type'] | |
| def try_context_based_replacement(cell, row, table, flat_json): | |
| """Enhanced context-based replacement""" | |
| replacements_made = 0 | |
| row_context = "" | |
| if len(row.cells) > 1: | |
| first_cell_text = get_clean_text(row.cells[0]).strip() | |
| if first_cell_text and not has_red_text(row.cells[0]): | |
| row_context = first_cell_text | |
| red_segments = extract_red_text_segments(cell) | |
| for segment in red_segments: | |
| red_text = segment['text'].strip() | |
| if not red_text: | |
| continue | |
| if row_context: | |
| context_queries = [ | |
| f"{row_context} {red_text}", | |
| f"{row_context}", | |
| red_text | |
| ] | |
| for query in context_queries: | |
| json_value = find_matching_json_value(query, flat_json) | |
| if json_value is not None: | |
| replacement_text = get_value_as_string(json_value, query) | |
| success = replace_single_segment(segment, replacement_text) | |
| if success: | |
| replacements_made += 1 | |
| print(f" β Context-based replacement: '{query}' -> '{replacement_text[:30]}...'") | |
| break | |
| return replacements_made | |
| def smart_fallback_processor(element, flat_json): | |
| """NEW: Smart fallback for missed red text""" | |
| replacements_made = 0 | |
| # Check if element has red text that wasn't processed | |
| if hasattr(element, 'paragraphs'): | |
| for paragraph in element.paragraphs: | |
| for run in paragraph.runs: | |
| if is_red(run) and run.text.strip(): | |
| # Try advanced pattern matching | |
| red_text = run.text.strip() | |
| # Try semantic matching | |
| json_value = semantic_text_matching(red_text, flat_json) | |
| if json_value: | |
| replacement_text = get_value_as_string(json_value, red_text) | |
| run.text = replacement_text | |
| run.font.color.rgb = RGBColor(0, 0, 0) | |
| replacements_made += 1 | |
| print(f" π― Fallback match: '{red_text}' -> '{replacement_text[:30]}...'") | |
| return replacements_made | |
| def semantic_text_matching(text, flat_json): | |
| """NEW: Advanced semantic matching for edge cases""" | |
| text_lower = text.lower().strip() | |
| # Common semantic patterns | |
| semantic_patterns = { | |
| 'name': ['name', 'manager', 'operator', 'auditor', 'driver'], | |
| 'date': ['date', 'expiry', 'conducted', 'completed'], | |
| 'address': ['address', 'location', 'road', 'street'], | |
| 'number': ['number', 'registration', 'phone', 'telephone'], | |
| 'email': ['email', 'mail'], | |
| 'position': ['position', 'title', 'role'] | |
| } | |
| # Find semantic category | |
| for category, keywords in semantic_patterns.items(): | |
| if any(keyword in text_lower for keyword in keywords): | |
| # Look for JSON keys in this semantic category | |
| for key, value in flat_json.items(): | |
| key_lower = key.lower() | |
| if any(keyword in key_lower for keyword in keywords): | |
| return value | |
| return None | |
| def handle_australian_company_number(row, company_numbers): | |
| """Enhanced ACN handling""" | |
| replacements_made = 0 | |
| for i, digit in enumerate(company_numbers): | |
| cell_idx = i + 1 | |
| if cell_idx < len(row.cells): | |
| cell = row.cells[cell_idx] | |
| if has_red_text(cell): | |
| cell_replacements = replace_red_text_in_cell(cell, str(digit)) | |
| replacements_made += cell_replacements | |
| print(f" -> Placed digit '{digit}' in cell {cell_idx + 1}") | |
| return replacements_made | |
| def handle_vehicle_registration_table(table, flat_json): | |
| """Enhanced vehicle registration table handling""" | |
| replacements_made = 0 | |
| # Try to find vehicle registration data | |
| vehicle_section = None | |
| for key, value in flat_json.items(): | |
| if "vehicle registration numbers of records examined" in key.lower(): | |
| if isinstance(value, dict): | |
| vehicle_section = value | |
| print(f" β Found vehicle data in key: '{key}'") | |
| break | |
| if not vehicle_section: | |
| potential_columns = {} | |
| for key, value in flat_json.items(): | |
| if any(col_name in key.lower() for col_name in ["registration number", "sub-contractor", "weight verification", "rfs suspension"]): | |
| if "." in key: | |
| column_name = key.split(".")[-1] | |
| else: | |
| column_name = key | |
| potential_columns[column_name] = value | |
| if potential_columns: | |
| vehicle_section = potential_columns | |
| print(f" β Found vehicle data from flattened keys: {list(vehicle_section.keys())}") | |
| else: | |
| print(f" β Vehicle registration data not found in JSON") | |
| return 0 | |
| print(f" β Found vehicle registration data with {len(vehicle_section)} columns") | |
| # Find header row | |
| header_row_idx = -1 | |
| header_row = None | |
| for row_idx, row in enumerate(table.rows): | |
| row_text = "".join(get_clean_text(cell).lower() for cell in row.cells) | |
| if "registration" in row_text and "number" in row_text: | |
| header_row_idx = row_idx | |
| header_row = row | |
| break | |
| if header_row_idx == -1: | |
| print(f" β Could not find header row in vehicle table") | |
| return 0 | |
| print(f" β Found header row at index {header_row_idx}") | |
| # Enhanced column mapping | |
| column_mapping = {} | |
| for col_idx, cell in enumerate(header_row.cells): | |
| header_text = get_clean_text(cell).strip() | |
| if not header_text or header_text.lower() == "no.": | |
| continue | |
| best_match = None | |
| best_score = 0 | |
| normalized_header = header_text.lower().replace("(", " (").replace(")", ") ").strip() | |
| for json_key in vehicle_section.keys(): | |
| normalized_json = json_key.lower().strip() | |
| if normalized_header == normalized_json: | |
| best_match = json_key | |
| best_score = 1.0 | |
| break | |
| header_words = set(word.lower() for word in normalized_header.split() if len(word) > 2) | |
| json_words = set(word.lower() for word in normalized_json.split() if len(word) > 2) | |
| if header_words and json_words: | |
| common_words = header_words.intersection(json_words) | |
| score = len(common_words) / max(len(header_words), len(json_words)) | |
| if score > best_score and score >= 0.3: | |
| best_score = score | |
| best_match = json_key | |
| header_clean = normalized_header.replace(" ", "").replace("-", "").replace("(", "").replace(")", "") | |
| json_clean = normalized_json.replace(" ", "").replace("-", "").replace("(", "").replace(")", "") | |
| if header_clean in json_clean or json_clean in header_clean: | |
| if len(header_clean) > 5 and len(json_clean) > 5: | |
| substring_score = min(len(header_clean), len(json_clean)) / max(len(header_clean), len(json_clean)) | |
| if substring_score > best_score and substring_score >= 0.6: | |
| best_score = substring_score | |
| best_match = json_key | |
| if best_match: | |
| column_mapping[col_idx] = best_match | |
| print(f" π Column {col_idx + 1} ('{header_text}') -> '{best_match}' (score: {best_score:.2f})") | |
| if not column_mapping: | |
| print(f" β No column mappings found") | |
| return 0 | |
| # Determine data rows needed | |
| max_data_rows = 0 | |
| for json_key, data in vehicle_section.items(): | |
| if isinstance(data, list): | |
| max_data_rows = max(max_data_rows, len(data)) | |
| print(f" π Need to populate {max_data_rows} data rows") | |
| # Process data rows | |
| for data_row_index in range(max_data_rows): | |
| table_row_idx = header_row_idx + 1 + data_row_index | |
| if table_row_idx >= len(table.rows): | |
| print(f" β οΈ Row {table_row_idx + 1} doesn't exist - table only has {len(table.rows)} rows") | |
| print(f" β Adding new row for vehicle {data_row_index + 1}") | |
| new_row = table.add_row() | |
| print(f" β Successfully added row {len(table.rows)} to the table") | |
| row = table.rows[table_row_idx] | |
| print(f" π Processing data row {table_row_idx + 1} (vehicle {data_row_index + 1})") | |
| for col_idx, json_key in column_mapping.items(): | |
| if col_idx < len(row.cells): | |
| cell = row.cells[col_idx] | |
| column_data = vehicle_section.get(json_key, []) | |
| if isinstance(column_data, list) and data_row_index < len(column_data): | |
| replacement_value = str(column_data[data_row_index]) | |
| cell_text = get_clean_text(cell) | |
| if has_red_text(cell) or not cell_text.strip(): | |
| if not cell_text.strip(): | |
| cell.text = replacement_value | |
| replacements_made += 1 | |
| print(f" -> Added '{replacement_value}' to empty cell (column '{json_key}')") | |
| else: | |
| cell_replacements = replace_red_text_in_cell(cell, replacement_value) | |
| replacements_made += cell_replacements | |
| if cell_replacements > 0: | |
| print(f" -> Replaced red text with '{replacement_value}' (column '{json_key}')") | |
| return replacements_made | |
| def handle_print_accreditation_section(table, flat_json): | |
| """Enhanced print accreditation handling""" | |
| replacements_made = 0 | |
| print_data = flat_json.get("print accreditation name.print accreditation name", []) | |
| if not isinstance(print_data, list) or len(print_data) < 2: | |
| return 0 | |
| name_value = print_data[0] | |
| position_value = print_data[1] | |
| print(f" π Print accreditation data: Name='{name_value}', Position='{position_value}'") | |
| for row_idx, row in enumerate(table.rows): | |
| if len(row.cells) >= 2: | |
| cell1_text = get_clean_text(row.cells[0]).lower() | |
| cell2_text = get_clean_text(row.cells[1]).lower() | |
| if "print name" in cell1_text and "position title" in cell2_text: | |
| print(f" π Found header row {row_idx + 1}: '{cell1_text}' | '{cell2_text}'") | |
| if row_idx + 1 < len(table.rows): | |
| data_row = table.rows[row_idx + 1] | |
| if len(data_row.cells) >= 2: | |
| if has_red_text(data_row.cells[0]): | |
| cell_replacements = replace_red_text_in_cell(data_row.cells[0], name_value) | |
| replacements_made += cell_replacements | |
| if cell_replacements > 0: | |
| print(f" β Replaced Print Name: '{name_value}'") | |
| if has_red_text(data_row.cells[1]): | |
| cell_replacements = replace_red_text_in_cell(data_row.cells[1], position_value) | |
| replacements_made += cell_replacements | |
| if cell_replacements > 0: | |
| print(f" β Replaced Position Title: '{position_value}'") | |
| break | |
| return replacements_made | |
| def process_single_column_sections(cell, field_name, flat_json): | |
| """Enhanced single column processing""" | |
| json_value = find_matching_json_value(field_name, flat_json) | |
| if json_value is not None: | |
| replacement_text = get_value_as_string(json_value, field_name) | |
| if isinstance(json_value, list) and len(json_value) > 1: | |
| replacement_text = "\n".join(str(item) for item in json_value) | |
| if has_red_text(cell): | |
| print(f" β Replacing red text in single-column section: '{field_name}'") | |
| print(f" β Replacement text:\n{replacement_text}") | |
| cell_replacements = replace_red_text_in_cell(cell, replacement_text) | |
| if cell_replacements > 0: | |
| print(f" -> Replaced with: '{replacement_text[:100]}...'") | |
| return cell_replacements | |
| return 0 | |
| def process_tables(document, flat_json): | |
| """ENHANCED: Your existing function + smart enhancements""" | |
| replacements_made = 0 | |
| for table_idx, table in enumerate(document.tables): | |
| print(f"\nπ Processing table {table_idx + 1}:") | |
| # ENHANCED: Dynamic table analysis | |
| table_structure = analyze_table_structure(table) | |
| print(f" π Table structure: {table_structure['type']} ({table_structure['row_count']}x{table_structure['column_count']})") | |
| # Your existing logic with enhancements | |
| table_text = "" | |
| for row in table.rows[:3]: | |
| for cell in row.cells: | |
| table_text += get_clean_text(cell).lower() + " " | |
| # Enhanced vehicle registration detection | |
| vehicle_indicators = ["registration number", "sub-contractor", "weight verification", "rfs suspension"] | |
| indicator_count = sum(1 for indicator in vehicle_indicators if indicator in table_text) | |
| if indicator_count >= 2 or table_structure['type'] == 'vehicle_registration': # Lowered threshold | |
| print(f" π Detected Vehicle Registration table") | |
| vehicle_replacements = handle_vehicle_registration_table(table, flat_json) | |
| replacements_made += vehicle_replacements | |
| continue | |
| # Enhanced print accreditation detection | |
| print_accreditation_indicators = ["print name", "position title"] | |
| indicator_count = sum(1 for indicator in print_accreditation_indicators if indicator in table_text) | |
| if indicator_count >= 1 or table_structure['type'] == 'declaration': # Lowered threshold | |
| print(f" π Detected Print Accreditation table") | |
| print_accreditation_replacements = handle_print_accreditation_section(table, flat_json) | |
| replacements_made += print_accreditation_replacements | |
| continue | |
| # Your existing row processing with enhancements | |
| for row_idx, row in enumerate(table.rows): | |
| if len(row.cells) < 1: | |
| continue | |
| key_cell = row.cells[0] | |
| key_text = get_clean_text(key_cell) | |
| if not key_text: | |
| continue | |
| print(f" π Row {row_idx + 1}: Key = '{key_text}'") | |
| json_value = find_matching_json_value(key_text, flat_json) | |
| if json_value is not None: | |
| replacement_text = get_value_as_string(json_value, key_text) | |
| # Enhanced ACN handling | |
| if ("australian company number" in key_text.lower() or "company number" in key_text.lower()) and isinstance(json_value, list): | |
| cell_replacements = handle_australian_company_number(row, json_value) | |
| replacements_made += cell_replacements | |
| # Enhanced section header handling | |
| elif ("attendance list" in key_text.lower() or "nature of" in key_text.lower()) and row_idx + 1 < len(table.rows): | |
| print(f" β Section header detected, checking next row for content...") | |
| next_row = table.rows[row_idx + 1] | |
| for cell_idx, cell in enumerate(next_row.cells): | |
| if has_red_text(cell): | |
| print(f" β Found red text in next row, cell {cell_idx + 1}") | |
| if isinstance(json_value, list): | |
| replacement_text = "\n".join(str(item) for item in json_value) | |
| cell_replacements = replace_red_text_in_cell(cell, replacement_text) | |
| replacements_made += cell_replacements | |
| if cell_replacements > 0: | |
| print(f" -> Replaced section content with: '{replacement_text[:100]}...'") | |
| elif len(row.cells) == 1 or (len(row.cells) > 1 and not any(has_red_text(row.cells[i]) for i in range(1, len(row.cells)))): | |
| if has_red_text(key_cell): | |
| cell_replacements = process_single_column_sections(key_cell, key_text, flat_json) | |
| replacements_made += cell_replacements | |
| else: | |
| for cell_idx in range(1, len(row.cells)): | |
| value_cell = row.cells[cell_idx] | |
| if has_red_text(value_cell): | |
| print(f" β Found red text in column {cell_idx + 1}") | |
| cell_replacements = replace_red_text_in_cell(value_cell, replacement_text) | |
| replacements_made += cell_replacements | |
| else: | |
| # Enhanced fallback processing for unmatched keys | |
| if len(row.cells) == 1 and has_red_text(key_cell): | |
| red_text = "" | |
| for paragraph in key_cell.paragraphs: | |
| for run in paragraph.runs: | |
| if is_red(run): | |
| red_text += run.text | |
| if red_text.strip(): | |
| section_value = find_matching_json_value(red_text.strip(), flat_json) | |
| if section_value is not None: | |
| section_replacement = get_value_as_string(section_value, red_text.strip()) | |
| cell_replacements = replace_red_text_in_cell(key_cell, section_replacement) | |
| replacements_made += cell_replacements | |
| # Enhanced red text processing for all cells | |
| for cell_idx in range(len(row.cells)): | |
| cell = row.cells[cell_idx] | |
| if has_red_text(cell): | |
| cell_replacements = handle_multiple_red_segments_in_cell(cell, flat_json) | |
| replacements_made += cell_replacements | |
| # ENHANCED: Fallback for still unmatched red text | |
| if cell_replacements == 0: | |
| context_replacements = try_context_based_replacement(cell, row, table, flat_json) | |
| replacements_made += context_replacements | |
| # ENHANCED: Smart fallback processor | |
| if context_replacements == 0: | |
| fallback_replacements = smart_fallback_processor(cell, flat_json) | |
| replacements_made += fallback_replacements | |
| return replacements_made | |
| def process_paragraphs(document, flat_json): | |
| """ENHANCED: Your existing function + smart fallbacks""" | |
| replacements_made = 0 | |
| print(f"\nπ Processing paragraphs:") | |
| for para_idx, paragraph in enumerate(document.paragraphs): | |
| red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] | |
| if red_runs: | |
| full_text = paragraph.text.strip() | |
| red_text_only = "".join(run.text for run in red_runs).strip() | |
| print(f" π Paragraph {para_idx + 1}: Found red text: '{red_text_only}'") | |
| # Your existing matching logic | |
| json_value = find_matching_json_value(red_text_only, flat_json) | |
| if json_value is None: | |
| # Enhanced pattern matching for signatures and dates | |
| if "AUDITOR SIGNATURE" in red_text_only.upper() or "DATE" in red_text_only.upper(): | |
| json_value = find_matching_json_value("auditor signature", flat_json) | |
| elif "OPERATOR SIGNATURE" in red_text_only.upper(): | |
| json_value = find_matching_json_value("operator signature", flat_json) | |
| # ENHANCED: Try semantic matching | |
| elif json_value is None: | |
| json_value = semantic_text_matching(red_text_only, flat_json) | |
| if json_value is not None: | |
| replacement_text = get_value_as_string(json_value) | |
| print(f" β Replacing red text with: '{replacement_text}'") | |
| red_runs[0].text = replacement_text | |
| red_runs[0].font.color.rgb = RGBColor(0, 0, 0) | |
| for run in red_runs[1:]: | |
| run.text = '' | |
| replacements_made += 1 | |
| else: | |
| # ENHANCED: Try smart fallback | |
| fallback_replacements = smart_fallback_processor(paragraph, flat_json) | |
| replacements_made += fallback_replacements | |
| return replacements_made | |
| def process_headings(document, flat_json): | |
| """ENHANCED: Your existing function + comprehensive coverage""" | |
| replacements_made = 0 | |
| print(f"\nπ Processing headings:") | |
| paragraphs = document.paragraphs | |
| for para_idx, paragraph in enumerate(paragraphs): | |
| paragraph_text = paragraph.text.strip() | |
| if not paragraph_text: | |
| continue | |
| # Enhanced heading detection | |
| matched_heading = None | |
| for category, patterns in HEADING_PATTERNS.items(): | |
| for pattern in patterns: | |
| if re.search(pattern, paragraph_text, re.IGNORECASE): | |
| matched_heading = pattern | |
| break | |
| if matched_heading: | |
| break | |
| if matched_heading: | |
| print(f" π Found heading at paragraph {para_idx + 1}: '{paragraph_text}'") | |
| # Check current heading paragraph | |
| if has_red_text_in_paragraph(paragraph): | |
| print(f" π΄ Found red text in heading itself") | |
| heading_replacements = process_red_text_in_paragraph(paragraph, paragraph_text, flat_json) | |
| replacements_made += heading_replacements | |
| # Enhanced: Look further ahead for related content | |
| for next_para_offset in range(1, 6): # Extended range | |
| next_para_idx = para_idx + next_para_offset | |
| if next_para_idx >= len(paragraphs): | |
| break | |
| next_paragraph = paragraphs[next_para_idx] | |
| next_text = next_paragraph.text.strip() | |
| if not next_text: | |
| continue | |
| # Stop if we hit another heading | |
| is_another_heading = False | |
| for category, patterns in HEADING_PATTERNS.items(): | |
| for pattern in patterns: | |
| if re.search(pattern, next_text, re.IGNORECASE): | |
| is_another_heading = True | |
| break | |
| if is_another_heading: | |
| break | |
| if is_another_heading: | |
| break | |
| # Process red text with enhanced context | |
| if has_red_text_in_paragraph(next_paragraph): | |
| print(f" π΄ Found red text in paragraph {next_para_idx + 1} after heading: '{next_text[:50]}...'") | |
| context_replacements = process_red_text_in_paragraph( | |
| next_paragraph, | |
| paragraph_text, | |
| flat_json | |
| ) | |
| replacements_made += context_replacements | |
| # ENHANCED: Smart fallback if still no match | |
| if context_replacements == 0: | |
| fallback_replacements = smart_fallback_processor(next_paragraph, flat_json) | |
| replacements_made += fallback_replacements | |
| return replacements_made | |
| def has_red_text_in_paragraph(paragraph): | |
| """Enhanced paragraph red text detection""" | |
| for run in paragraph.runs: | |
| if is_red(run) and run.text.strip(): | |
| return True | |
| return False | |
| def process_red_text_in_paragraph(paragraph, context_text, flat_json): | |
| """ENHANCED: Your existing function + smarter matching""" | |
| replacements_made = 0 | |
| red_text_segments = [] | |
| for run in paragraph.runs: | |
| if is_red(run) and run.text.strip(): | |
| red_text_segments.append(run.text.strip()) | |
| if not red_text_segments: | |
| return 0 | |
| combined_red_text = " ".join(red_text_segments).strip() | |
| print(f" π Red text found: '{combined_red_text}'") | |
| json_value = None | |
| # Strategy 1: Direct matching | |
| json_value = find_matching_json_value(combined_red_text, flat_json) | |
| # Strategy 2: Enhanced context-based matching | |
| if json_value is None: | |
| if "NHVAS APPROVED AUDITOR" in context_text.upper(): | |
| auditor_fields = ["auditor name", "auditor", "nhvas auditor", "approved auditor", "print name"] | |
| for field in auditor_fields: | |
| json_value = find_matching_json_value(field, flat_json) | |
| if json_value is not None: | |
| print(f" β Found auditor match with field: '{field}'") | |
| break | |
| elif "OPERATOR DECLARATION" in context_text.upper(): | |
| operator_fields = ["operator name", "operator", "company name", "organisation name", "print name"] | |
| for field in operator_fields: | |
| json_value = find_matching_json_value(field, flat_json) | |
| if json_value is not None: | |
| print(f" β Found operator match with field: '{field}'") | |
| break | |
| # Strategy 3: Enhanced context combination | |
| if json_value is None: | |
| context_queries = [ | |
| f"{context_text} {combined_red_text}", | |
| combined_red_text, | |
| context_text | |
| ] | |
| for query in context_queries: | |
| json_value = find_matching_json_value(query, flat_json) | |
| if json_value is not None: | |
| print(f" β Found match with combined query: '{query[:50]}...'") | |
| break | |
| # ENHANCED: Strategy 4: Semantic matching | |
| if json_value is None: | |
| json_value = semantic_text_matching(combined_red_text, flat_json) | |
| if json_value: | |
| print(f" β Found semantic match for: '{combined_red_text}'") | |
| # Replace if match found | |
| if json_value is not None: | |
| replacement_text = get_value_as_string(json_value, combined_red_text) | |
| red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] | |
| if red_runs: | |
| red_runs[0].text = replacement_text | |
| red_runs[0].font.color.rgb = RGBColor(0, 0, 0) | |
| for run in red_runs[1:]: | |
| run.text = '' | |
| replacements_made = 1 | |
| print(f" β Replaced with: '{replacement_text}'") | |
| else: | |
| print(f" β No match found for red text: '{combined_red_text}'") | |
| return replacements_made | |
| def comprehensive_document_scan(document, flat_json): | |
| """NEW: Final comprehensive scan for any missed red text""" | |
| print(f"\nπ Comprehensive final scan for missed red text:") | |
| replacements_made = 0 | |
| # Scan all elements in document | |
| for element in document.element.body: | |
| # Check tables | |
| if element.tag.endswith('tbl'): | |
| table_obj = None | |
| for table in document.tables: | |
| if table._element == element: | |
| table_obj = table | |
| break | |
| if table_obj: | |
| for row in table_obj.rows: | |
| for cell in row.cells: | |
| if has_red_text(cell): | |
| # Try one more time with enhanced fallback | |
| cell_replacements = smart_fallback_processor(cell, flat_json) | |
| replacements_made += cell_replacements | |
| # Check paragraphs | |
| elif element.tag.endswith('p'): | |
| paragraph_obj = None | |
| for para in document.paragraphs: | |
| if para._element == element: | |
| paragraph_obj = para | |
| break | |
| if paragraph_obj and has_red_text_in_paragraph(paragraph_obj): | |
| # Try enhanced fallback | |
| para_replacements = smart_fallback_processor(paragraph_obj, flat_json) | |
| replacements_made += para_replacements | |
| if replacements_made > 0: | |
| print(f" β Final scan caught {replacements_made} additional replacements!") | |
| else: | |
| print(f" β No additional red text found - document fully processed!") | |
| return replacements_made | |
| def process_hf(json_file, docx_file, output_file): | |
| """ENHANCED: Your existing main function + comprehensive processing""" | |
| try: | |
| # Load JSON | |
| if hasattr(json_file, "read"): | |
| json_data = json.load(json_file) | |
| else: | |
| with open(json_file, 'r', encoding='utf-8') as f: | |
| json_data = json.load(f) | |
| flat_json = flatten_json(json_data) | |
| print("π Available JSON keys (sample):") | |
| for i, (key, value) in enumerate(sorted(flat_json.items())): | |
| if i < 10: | |
| print(f" - {key}: {value}") | |
| print(f" ... and {len(flat_json) - 10} more keys\n") | |
| # Load DOCX | |
| if hasattr(docx_file, "read"): | |
| doc = Document(docx_file) | |
| else: | |
| doc = Document(docx_file) | |
| # ENHANCED: Multi-pass processing for 100% coverage | |
| print("π Starting enhanced multi-pass processing...") | |
| # Pass 1: Your existing processors (enhanced) | |
| table_replacements = process_tables(doc, flat_json) | |
| paragraph_replacements = process_paragraphs(doc, flat_json) | |
| heading_replacements = process_headings(doc, flat_json) | |
| # Pass 2: NEW - Comprehensive final scan | |
| final_scan_replacements = comprehensive_document_scan(doc, flat_json) | |
| total_replacements = table_replacements + paragraph_replacements + heading_replacements + final_scan_replacements | |
| # Save output | |
| if hasattr(output_file, "write"): | |
| doc.save(output_file) | |
| else: | |
| doc.save(output_file) | |
| print(f"\nβ Document saved as: {output_file}") | |
| print(f"β Total replacements: {total_replacements}") | |
| print(f" π Tables: {table_replacements}") | |
| print(f" π Paragraphs: {paragraph_replacements}") | |
| print(f" π Headings: {heading_replacements}") | |
| print(f" π― Final scan: {final_scan_replacements}") | |
| print(f"π Processing complete with enhanced coverage!") | |
| except FileNotFoundError as e: | |
| print(f"β File not found: {e}") | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) != 4: | |
| print("Usage: python enhanced_pipeline.py <input_docx> <updated_json> <output_docx>") | |
| exit(1) | |
| docx_path = sys.argv[1] | |
| json_path = sys.argv[2] | |
| output_path = sys.argv[3] | |
| process_hf(json_path, docx_path, output_path) |