Spaces:
Sleeping
Sleeping
| # /// script | |
| # requires-python = ">=3.12" | |
| # dependencies = [ | |
| # "charset-normalizer==3.4.2", | |
| # "great-tables==0.17.0", | |
| # "marimo", | |
| # "pandas==2.3.0", | |
| # ] | |
| # /// | |
| import marimo | |
| __generated_with = "0.14.6" | |
| app = marimo.App(width="full", app_title="LLM Text Preprocessing Checker") | |
| def _(): | |
| import marimo as mo | |
| return (mo,) | |
| def _(mo): | |
| mo.md( | |
| r""" | |
| # LLM Text Preprocessing Checker | |
| Checks two files and provides the diff output as well as metrics on deleted and inserted characters. | |
| Additionaly, provides a breakdown by Unicode character class of deletions and insertions. | |
| Note that this uses a pure-Python Myers diff algorithm for the comparison and may not be performant for larger diffs. | |
| """ | |
| ) | |
| return | |
| def _(): | |
| import unicodedata | |
| from typing import List, Dict, Any | |
| from dataclasses import dataclass | |
| from enum import IntEnum | |
| import html as python_html | |
| from great_tables import GT, loc, style | |
| import pandas as pd | |
| class Operation(IntEnum): | |
| DELETE = 0 | |
| INSERT = 1 | |
| EQUAL = 2 | |
| class Edit: | |
| operation: Operation | |
| old_start: int | |
| old_end: int | |
| new_start: int | |
| new_end: int | |
| old_text: str = "" | |
| new_text: str = "" | |
| DEL_STYLE = "background-color:#ffcccc;color:#880000;text-decoration:line-through;" | |
| INS_STYLE = "background-color:#ccffcc;color:#008800;" | |
| EQUAL_STYLE = "color:#666666;" | |
| CONTAINER_STYLE = ( | |
| "font-family: ui-monospace, monospace; " | |
| "white-space: pre-wrap; " | |
| "line-height: 1.6; " | |
| "padding: 20px; " | |
| "background-color: #f8f9fa; " | |
| "border-radius: 8px; " | |
| "border: 1px solid #dee2e6;" | |
| ) | |
| def classify_char(char: str) -> str: | |
| """Classify a character using Unicode categories.""" | |
| if not char: | |
| return "empty" | |
| category = unicodedata.category(char) | |
| # Map Unicode categories to readable classifications | |
| category_map = { | |
| "Ll": "lowercase", | |
| "Lu": "uppercase", | |
| "Lt": "titlecase", | |
| "Lm": "modifier_letter", | |
| "Lo": "other_letter", | |
| "Nd": "decimal_digit", | |
| "Nl": "letter_number", | |
| "No": "other_number", | |
| "Pc": "connector_punctuation", | |
| "Pd": "dash_punctuation", | |
| "Ps": "open_punctuation", | |
| "Pe": "close_punctuation", | |
| "Pi": "initial_punctuation", | |
| "Pf": "final_punctuation", | |
| "Po": "other_punctuation", | |
| "Sm": "math_symbol", | |
| "Sc": "currency_symbol", | |
| "Sk": "modifier_symbol", | |
| "So": "other_symbol", | |
| "Zs": "space", | |
| "Zl": "line_separator", | |
| "Zp": "paragraph_separator", | |
| "Cc": "control", | |
| "Cf": "format", | |
| "Co": "private_use", | |
| "Cn": "unassigned", | |
| } | |
| # Special handling for CJK | |
| if "\u4e00" <= char <= "\u9fff": | |
| return "cjk_ideograph" | |
| elif "\u3040" <= char <= "\u309f": | |
| return "hiragana" | |
| elif "\u30a0" <= char <= "\u30ff": | |
| return "katakana" | |
| elif "\uac00" <= char <= "\ud7af": | |
| return "hangul" | |
| return category_map.get(category, category) | |
| def _myers_backtrack(trace: List[List[int]], a: str, b: str) -> List[Edit]: | |
| """Back-tracking helper to materialise the edit script.""" | |
| edits: List[Edit] = [] | |
| n, m = len(a), len(b) | |
| x, y = n, m | |
| offset = len(trace[0]) // 2 | |
| # Walk the layers backwards | |
| for d in range(len(trace) - 1, 0, -1): | |
| v = trace[d] | |
| k = x - y | |
| idx = k + offset | |
| # Determine the predecessor k' | |
| if k == -d or (k != d and v[idx - 1] < v[idx + 1]): | |
| k_prev = k + 1 # came from below (insertion) | |
| else: | |
| k_prev = k - 1 # came from right (deletion) | |
| x_prev = trace[d - 1][k_prev + offset] | |
| y_prev = x_prev - k_prev | |
| # Emit the matching "snake" | |
| while x > x_prev and y > y_prev: | |
| x -= 1 | |
| y -= 1 | |
| edits.append(Edit(Operation.EQUAL, x, x + 1, y, y + 1, a[x], b[y])) | |
| # Emit the single edit (INSERT or DELETE) that led to the snake | |
| if x_prev == x: # insertion | |
| y -= 1 | |
| edits.append(Edit(Operation.INSERT, x, x, y, y + 1, "", b[y])) | |
| else: # deletion | |
| x -= 1 | |
| edits.append(Edit(Operation.DELETE, x, x + 1, y, y, a[x], "")) | |
| # Leading snake (d = 0) – everything matched at the start | |
| while x > 0 and y > 0: | |
| x -= 1 | |
| y -= 1 | |
| edits.append(Edit(Operation.EQUAL, x, x + 1, y, y + 1, a[x], b[y])) | |
| # Any remaining leading insertions / deletions | |
| while x > 0: | |
| x -= 1 | |
| edits.append(Edit(Operation.DELETE, x, x + 1, y, y, a[x], "")) | |
| while y > 0: | |
| y -= 1 | |
| edits.append(Edit(Operation.INSERT, x, x, y, y + 1, "", b[y])) | |
| edits.reverse() | |
| return edits | |
| def myers_diff(a: str, b: str) -> List[Edit]: | |
| """ | |
| Very fast Myers diff (O((N+M)·D) time, O(N+M) memory). | |
| Returns a list of Edit objects (DELETE / INSERT / EQUAL). | |
| """ | |
| n, m = len(a), len(b) | |
| if n == 0: | |
| return [Edit(Operation.INSERT, 0, 0, 0, m, "", b)] if m else [] | |
| if m == 0: | |
| return [Edit(Operation.DELETE, 0, n, 0, 0, a, "")] if n else [] | |
| max_d = n + m | |
| offset = max_d # map k ∈ [-max_d .. +max_d] → index | |
| v = [0] * (2 * max_d + 1) # current frontier | |
| trace = [] # keeps a copy of v for every d | |
| # Forward phase – build the "trace" that will be backtracked | |
| for d in range(max_d + 1): | |
| v_next = v[:] # copy *once* per layer | |
| for k in range(-d, d + 1, 2): | |
| idx = k + offset | |
| # Choosing the predecessor (insertion vs deletion) | |
| if k == -d or (k != d and v[idx - 1] < v[idx + 1]): | |
| x = v[idx + 1] # insertion (move down) | |
| else: | |
| x = v[idx - 1] + 1 # deletion (move right) | |
| y = x - k | |
| # Greedy snake – march diagonally while chars match | |
| while x < n and y < m and a[x] == b[y]: | |
| x += 1 | |
| y += 1 | |
| v_next[idx] = x | |
| # Reached the end – stop early | |
| if x >= n and y >= m: | |
| trace.append(v_next) | |
| return _myers_backtrack(trace, a, b) | |
| trace.append(v_next) | |
| v = v_next # reuse buffer | |
| # Should never get here | |
| raise RuntimeError("diff failed") | |
| def classify_text(text: str) -> Dict[str, int]: | |
| """Count characters by classification.""" | |
| if not text: | |
| return {} | |
| classifications = {} | |
| for char in text: | |
| char_class = classify_char(char) | |
| classifications[char_class] = classifications.get(char_class, 0) + 1 | |
| return classifications | |
| def classify_edits(edits: List[Edit]) -> Dict[Operation, Dict[str, int]]: | |
| """ | |
| Classify edit operations by character class. | |
| Returns a nested dictionary: {operation: {char_class: count}} | |
| """ | |
| # Filter out EQUAL operations to save memory | |
| change_edits = [e for e in edits if e.operation != Operation.EQUAL] | |
| # Group all edits by operation type (not consecutive grouping) | |
| edits_by_op = {} | |
| for edit in change_edits: | |
| if edit.operation not in edits_by_op: | |
| edits_by_op[edit.operation] = [] | |
| edits_by_op[edit.operation].append(edit) | |
| result = {} | |
| for op, edit_list in edits_by_op.items(): | |
| combined_text = "" | |
| if op == Operation.DELETE: | |
| combined_text = "".join(e.old_text for e in edit_list) | |
| elif op == Operation.INSERT: | |
| combined_text = "".join(e.new_text for e in edit_list) | |
| result[op] = classify_text(combined_text) | |
| return result | |
| def calculate_change_metrics( | |
| original: str, | |
| edits: List[Edit], | |
| classifications: Dict[Operation, Dict[str, int]], | |
| ) -> Dict[str, Any]: | |
| """Calculate detailed change metrics including percentages.""" | |
| metrics = { | |
| "total_original_chars": len(original), | |
| "total_deleted_chars": 0, | |
| "total_inserted_chars": 0, | |
| "deletion_percentage": 0.0, | |
| "insertion_percentage": 0.0, | |
| "net_change_percentage": 0.0, | |
| "char_class_metrics": {}, | |
| } | |
| # Calculate total changes | |
| for edit in edits: | |
| if edit.operation == Operation.DELETE: | |
| metrics["total_deleted_chars"] += len(edit.old_text) | |
| elif edit.operation == Operation.INSERT: | |
| metrics["total_inserted_chars"] += len(edit.new_text) | |
| # Calculate percentages | |
| if metrics["total_original_chars"] > 0: | |
| metrics["deletion_percentage"] = ( | |
| metrics["total_deleted_chars"] / metrics["total_original_chars"] | |
| ) * 100 | |
| metrics["insertion_percentage"] = ( | |
| metrics["total_inserted_chars"] / metrics["total_original_chars"] | |
| ) * 100 | |
| net_change = ( | |
| metrics["total_inserted_chars"] - metrics["total_deleted_chars"] | |
| ) | |
| metrics["net_change_percentage"] = ( | |
| net_change / metrics["total_original_chars"] | |
| ) * 100 | |
| # Get character classification of original text | |
| original_classifications = classify_text(original) | |
| # Calculate per-character-class metrics | |
| all_char_classes = set() | |
| for op_classes in classifications.values(): | |
| all_char_classes.update(op_classes.keys()) | |
| all_char_classes.update(original_classifications.keys()) | |
| for char_class in all_char_classes: | |
| original_count = original_classifications.get(char_class, 0) | |
| deleted_count = classifications.get(Operation.DELETE, {}).get(char_class, 0) | |
| inserted_count = classifications.get(Operation.INSERT, {}).get( | |
| char_class, 0 | |
| ) | |
| class_metrics = { | |
| "original_count": original_count, | |
| "deleted_count": deleted_count, | |
| "inserted_count": inserted_count, | |
| "deletion_percentage": 0.0, | |
| "insertion_percentage": 0.0, | |
| } | |
| if original_count > 0: | |
| class_metrics["deletion_percentage"] = ( | |
| deleted_count / original_count | |
| ) * 100 | |
| # Insertion percentage relative to original count of this class | |
| if original_count > 0: | |
| class_metrics["insertion_percentage"] = ( | |
| inserted_count / original_count | |
| ) * 100 | |
| elif inserted_count > 0: | |
| # If there were none originally, show as new | |
| class_metrics["insertion_percentage"] = float("inf") | |
| metrics["char_class_metrics"][char_class] = class_metrics | |
| return metrics | |
| def escape_html(text: str) -> str: | |
| """Escape HTML and make whitespace visible.""" | |
| # First escape HTML | |
| text = python_html.escape(text) | |
| # Make whitespace visible | |
| ws_trans = str.maketrans({" ": "·", "\t": "→ ", "\n": "¶\n"}) | |
| return text.translate(ws_trans) | |
| def generate_html_diff( | |
| edits: List[Edit], show_equal: bool = True, max_equal_length: int = 100 | |
| ) -> str: | |
| """Generate HTML visualization of the diff with performance optimizations.""" | |
| # Pre-allocate list for better performance | |
| html_parts = [] | |
| # Group consecutive edits of the same type to reduce HTML tags | |
| grouped_edits = [] | |
| current_group = [] | |
| current_op = None | |
| for edit in edits: | |
| if ( | |
| edit.operation == current_op and len(current_group) < 100 | |
| ): # Batch up to 100 | |
| current_group.append(edit) | |
| else: | |
| if current_group: | |
| grouped_edits.append((current_op, current_group)) | |
| current_group = [edit] | |
| current_op = edit.operation | |
| if current_group: | |
| grouped_edits.append((current_op, current_group)) | |
| # Process grouped edits | |
| for op, group in grouped_edits: | |
| if op == Operation.DELETE: | |
| combined_text = "".join(e.old_text for e in group) | |
| escaped = escape_html(combined_text) | |
| html_parts.append( | |
| f'<span style="{DEL_STYLE}" title="Deleted">{escaped}</span>' | |
| ) | |
| elif op == Operation.INSERT: | |
| combined_text = "".join(e.new_text for e in group) | |
| escaped = escape_html(combined_text) | |
| html_parts.append( | |
| f'<span style="{INS_STYLE}" title="Added">{escaped}</span>' | |
| ) | |
| elif op == Operation.EQUAL and show_equal: | |
| combined_text = "".join(e.old_text for e in group) | |
| # Truncate very long equal sections | |
| if len(combined_text) > max_equal_length: | |
| start = escape_html(combined_text[: max_equal_length // 2]) | |
| end = escape_html(combined_text[-max_equal_length // 2 :]) | |
| omitted = len(combined_text) - max_equal_length | |
| html_parts.append( | |
| f'<span style="{EQUAL_STYLE}">{start}' | |
| f"<em>...{omitted} chars omitted...</em>" | |
| f"{end}</span>" | |
| ) | |
| else: | |
| escaped = escape_html(combined_text) | |
| html_parts.append(f'<span style="{EQUAL_STYLE}">{escaped}</span>') | |
| return f'<div style="{CONTAINER_STYLE}">{"".join(html_parts)}</div>' | |
| def generate_side_by_side_html(edits: List[Edit]) -> str: | |
| """Generate side-by-side HTML diff view.""" | |
| old_parts = [] | |
| new_parts = [] | |
| for edit in edits: | |
| if edit.operation == Operation.DELETE: | |
| escaped = escape_html(edit.old_text) | |
| old_parts.append(f'<span style="{DEL_STYLE}">{escaped}</span>') | |
| elif edit.operation == Operation.INSERT: | |
| escaped = escape_html(edit.new_text) | |
| new_parts.append(f'<span style="{INS_STYLE}">{escaped}</span>') | |
| elif edit.operation == Operation.EQUAL: | |
| escaped = escape_html(edit.old_text) | |
| old_parts.append(f"<span>{escaped}</span>") | |
| new_parts.append(f"<span>{escaped}</span>") | |
| return f''' | |
| <div style="display: grid; grid-template-columns: 1fr 1fr; gap: 20px;"> | |
| <div> | |
| <h4 style="margin: 0 0 10px 0;">Original</h4> | |
| <div style="{CONTAINER_STYLE}">{"".join(old_parts)}</div> | |
| </div> | |
| <div> | |
| <h4 style="margin: 0 0 10px 0;">Processed</h4> | |
| <div style="{CONTAINER_STYLE}">{"".join(new_parts)}</div> | |
| </div> | |
| </div> | |
| ''' | |
| def generate_html_diff_fast(edits: List[Edit], context_chars: int = 5) -> str: | |
| """ | |
| Ultra-fast HTML diff generation showing only changes with context. | |
| """ | |
| html_parts = [] | |
| # Filter to only show changes and surrounding context | |
| change_indices = [ | |
| i for i, e in enumerate(edits) if e.operation != Operation.EQUAL | |
| ] | |
| if not change_indices: | |
| return '<div style="{CONTAINER_STYLE}">No changes found.</div>' | |
| # Build ranges to show (change + context) | |
| ranges_to_show = [] | |
| start = max(0, change_indices[0] - context_chars) | |
| end = min(len(edits), change_indices[0] + context_chars + 1) | |
| for idx in change_indices[1:]: | |
| if idx - end <= context_chars * 2: | |
| # Extend current range | |
| end = min(len(edits), idx + context_chars + 1) | |
| else: | |
| # Save current range and start new one | |
| ranges_to_show.append((start, end)) | |
| start = max(0, idx - context_chars) | |
| end = min(len(edits), idx + context_chars + 1) | |
| ranges_to_show.append((start, end)) | |
| # Generate HTML for ranges | |
| for i, (start, end) in enumerate(ranges_to_show): | |
| if i > 0: | |
| html_parts.append( | |
| '<div style="color:#999;text-align:center;margin:10px 0;">...</div>' | |
| ) | |
| for j in range(start, end): | |
| edit = edits[j] | |
| if edit.operation == Operation.DELETE: | |
| escaped = escape_html(edit.old_text) | |
| html_parts.append(f'<span style="{DEL_STYLE}">{escaped}</span>') | |
| elif edit.operation == Operation.INSERT: | |
| escaped = escape_html(edit.new_text) | |
| html_parts.append(f'<span style="{INS_STYLE}">{escaped}</span>') | |
| else: # EQUAL | |
| escaped = escape_html(edit.old_text) | |
| html_parts.append(f'<span style="{EQUAL_STYLE}">{escaped}</span>') | |
| return f'<div style="{CONTAINER_STYLE}">{"".join(html_parts)}</div>' | |
| def generate_side_by_side_html_fast( | |
| edits: List[Edit], context_chars: int = 5 | |
| ) -> str: | |
| """ | |
| Fast side-by-side HTML diff generation showing only changes with context. | |
| """ | |
| # Filter to only show changes and surrounding context | |
| change_indices = [ | |
| i for i, e in enumerate(edits) if e.operation != Operation.EQUAL | |
| ] | |
| if not change_indices: | |
| return """ | |
| <div style="display: grid; grid-template-columns: 1fr 1fr; gap: 20px;"> | |
| <div> | |
| <h4 style="margin: 0 0 10px 0;">Original</h4> | |
| <div style="{CONTAINER_STYLE}">No changes found.</div> | |
| </div> | |
| <div> | |
| <h4 style="margin: 0 0 10px 0;">Processed</h4> | |
| <div style="{CONTAINER_STYLE}">No changes found.</div> | |
| </div> | |
| </div> | |
| """ | |
| # Build ranges to show (change + context) | |
| ranges_to_show = [] | |
| start = max(0, change_indices[0] - context_chars) | |
| end = min(len(edits), change_indices[0] + context_chars + 1) | |
| for idx in change_indices[1:]: | |
| if idx - end <= context_chars * 2: | |
| # Extend current range | |
| end = min(len(edits), idx + context_chars + 1) | |
| else: | |
| # Save current range and start new one | |
| ranges_to_show.append((start, end)) | |
| start = max(0, idx - context_chars) | |
| end = min(len(edits), idx + context_chars + 1) | |
| ranges_to_show.append((start, end)) | |
| # Generate HTML for ranges | |
| old_parts = [] | |
| new_parts = [] | |
| for i, (start, end) in enumerate(ranges_to_show): | |
| if i > 0: | |
| separator = ( | |
| '<div style="color:#999;text-align:center;margin:10px 0;">...</div>' | |
| ) | |
| old_parts.append(separator) | |
| new_parts.append(separator) | |
| for j in range(start, end): | |
| edit = edits[j] | |
| if edit.operation == Operation.DELETE: | |
| escaped = escape_html(edit.old_text) | |
| old_parts.append(f'<span style="{DEL_STYLE}">{escaped}</span>') | |
| elif edit.operation == Operation.INSERT: | |
| escaped = escape_html(edit.new_text) | |
| new_parts.append(f'<span style="{INS_STYLE}">{escaped}</span>') | |
| else: # EQUAL | |
| escaped = escape_html(edit.old_text) | |
| old_parts.append(f'<span style="{EQUAL_STYLE}">{escaped}</span>') | |
| new_parts.append(f'<span style="{EQUAL_STYLE}">{escaped}</span>') | |
| return f''' | |
| <div style="display: grid; grid-template-columns: 1fr 1fr; gap: 20px;"> | |
| <div> | |
| <h4 style="margin: 0 0 10px 0;">Original</h4> | |
| <div style="{CONTAINER_STYLE}">{"".join(old_parts)}</div> | |
| </div> | |
| <div> | |
| <h4 style="margin: 0 0 10px 0;">Processed</h4> | |
| <div style="{CONTAINER_STYLE}">{"".join(new_parts)}</div> | |
| </div> | |
| </div> | |
| ''' | |
| def operation_to_past(op: Operation) -> str: | |
| if op == Operation.INSERT: | |
| return "inserted" | |
| else: | |
| return str(op) + "d" | |
| def format_diff_summary( | |
| edits: List[Edit], | |
| classifications: Dict[Operation, Dict[str, int]], | |
| metrics: Dict[str, Any], | |
| ) -> str: | |
| """Create a human-readable summary of the diff.""" | |
| lines = ["## Diff Summary\n"] | |
| # Overall statistics | |
| lines.append("### Overall Statistics") | |
| lines.append( | |
| f"- **Original text**: {metrics['total_original_chars']:,} characters" | |
| ) | |
| # Format deletions | |
| del_pct = format_percentage(metrics["deletion_percentage"]) | |
| lines.append( | |
| f"- **Deletions**: {metrics['total_deleted_chars']:,} characters ({del_pct})" | |
| ) | |
| # Format insertions | |
| ins_pct = format_percentage(metrics["insertion_percentage"]) | |
| lines.append( | |
| f"- **Insertions**: {metrics['total_inserted_chars']:,} characters ({ins_pct})" | |
| ) | |
| # Format net change | |
| net_pct = metrics["net_change_percentage"] | |
| if abs(net_pct) < 0.01: | |
| net_pct_str = f"{net_pct:+.3f}%" | |
| else: | |
| net_pct_str = f"{net_pct:+.1f}%" | |
| lines.append( | |
| f"- **Net change**: {net_pct_str} " | |
| f"({'increase' if metrics['net_change_percentage'] > 0 else 'decrease' if metrics['net_change_percentage'] < 0 else 'no change'})" | |
| ) | |
| # Character classifications | |
| if classifications: | |
| lines.append("\n### Character Classifications") | |
| # Show changes by character class | |
| for op in [Operation.DELETE, Operation.INSERT]: | |
| if op in classifications and classifications[op]: | |
| lines.append(f"\n**{operation_to_past(op).title()} Characters:**") | |
| for char_class, count in sorted( | |
| classifications[op].items(), key=lambda x: -x[1] | |
| ): | |
| lines.append( | |
| f"- {char_class.replace('_', ' ').title()}: {count}" | |
| ) | |
| # Show percentage changes by character class | |
| lines.append("\n### Change Percentages by Character Class") | |
| # Sort by most changed (highest deletion or insertion percentage) | |
| sorted_classes = sorted( | |
| metrics["char_class_metrics"].items(), | |
| key=lambda x: max( | |
| x[1]["deletion_percentage"], | |
| 0 | |
| if x[1]["insertion_percentage"] == float("inf") | |
| else x[1]["insertion_percentage"], | |
| ), | |
| reverse=True, | |
| ) | |
| for char_class, class_metrics in sorted_classes: | |
| if ( | |
| class_metrics["deleted_count"] > 0 | |
| or class_metrics["inserted_count"] > 0 | |
| ): | |
| class_name = char_class.replace("_", " ").title() | |
| # Format the line | |
| line_parts = [f"- **{class_name}**:"] | |
| if class_metrics["original_count"] > 0: | |
| line_parts.append( | |
| f"Original: {class_metrics['original_count']}" | |
| ) | |
| if class_metrics["deleted_count"] > 0: | |
| line_parts.append( | |
| f"Deleted: {class_metrics['deleted_count']} " | |
| f"({class_metrics['deletion_percentage']:.1f}%)" | |
| ) | |
| if class_metrics["inserted_count"] > 0: | |
| if class_metrics["insertion_percentage"] == float("inf"): | |
| line_parts.append( | |
| f"Inserted: {class_metrics['inserted_count']} (new)" | |
| ) | |
| else: | |
| line_parts.append( | |
| f"Inserted: {class_metrics['inserted_count']} " | |
| f"({class_metrics['insertion_percentage']:.1f}%)" | |
| ) | |
| lines.append(" | ".join(line_parts)) | |
| return "\n".join(lines) | |
| def format_percentage(value: float, min_decimals: int = 1) -> str: | |
| """Format percentage with adaptive decimal places.""" | |
| if value == 0: | |
| return "0%" | |
| elif value < 0.01: | |
| return f"{value:.3f}%" # Show 3 decimals for very small values | |
| elif value < 0.1: | |
| return f"{value:.2f}%" # Show 2 decimals for small values | |
| elif value < 1: | |
| return f"{value:.1f}%" # Show 1 decimal for values < 1% | |
| else: | |
| return f"{value:.0f}%" # No decimals for values >= 1% | |
| def classify_edits_with_chars( | |
| edits: List[Edit], | |
| ) -> Dict[Operation, Dict[str, Dict[str, int]]]: | |
| """ | |
| Classify edit operations by character class and track character frequencies. | |
| Returns: {operation: {char_class: {char: count}}} | |
| """ | |
| from collections import defaultdict, Counter | |
| # Filter out EQUAL operations | |
| change_edits = [e for e in edits if e.operation != Operation.EQUAL] | |
| # Track characters by operation and classification | |
| result = defaultdict(lambda: defaultdict(Counter)) | |
| for edit in change_edits: | |
| text = ( | |
| edit.old_text if edit.operation == Operation.DELETE else edit.new_text | |
| ) | |
| for char in text: | |
| char_class = classify_char(char) | |
| result[edit.operation][char_class][char] += 1 | |
| return dict(result) | |
| def get_top_chars(char_counter: Dict[str, int], n: int = 5) -> str: | |
| """Get top n characters by frequency, formatted for display.""" | |
| if not char_counter: | |
| return "-" | |
| # Sort by frequency and take top n | |
| top_chars = sorted(char_counter.items(), key=lambda x: -x[1])[:n] | |
| # Format characters for display | |
| formatted_chars = [] | |
| for char, _ in top_chars: | |
| if char == " ": | |
| formatted_chars.append("·") # Middle dot for space | |
| elif char == "\n": | |
| formatted_chars.append("¶") # Pilcrow for newline | |
| elif char == "\t": | |
| formatted_chars.append("→") # Arrow for tab | |
| elif ord(char) < 32 or ord(char) == 127: | |
| formatted_chars.append(f"\\x{ord(char):02x}") # Hex for control chars | |
| else: | |
| formatted_chars.append(char) | |
| return " ".join(formatted_chars) | |
| def create_summary_tables( | |
| edits: List[Edit], | |
| classifications: Dict[Operation, Dict[str, int]], | |
| metrics: Dict[str, Any], | |
| ) -> Dict[str, GT]: | |
| """Create great_tables tables for the diff summary.""" | |
| # Get detailed character data | |
| detailed_classifications = classify_edits_with_chars(edits) | |
| # Table 1: Overall Statistics (unchanged) | |
| overall_data = pd.DataFrame( | |
| { | |
| "Metric": [ | |
| "Original Length", | |
| "Characters Deleted", | |
| "Characters Inserted", | |
| "Net Change", | |
| ], | |
| "Count": [ | |
| metrics["total_original_chars"], | |
| metrics["total_deleted_chars"], | |
| metrics["total_inserted_chars"], | |
| metrics["total_inserted_chars"] - metrics["total_deleted_chars"], | |
| ], | |
| "Percentage": [ | |
| "-", | |
| format_percentage(metrics["deletion_percentage"]), | |
| format_percentage(metrics["insertion_percentage"]), | |
| f"{metrics['net_change_percentage']:+.3f}%" | |
| if abs(metrics["net_change_percentage"]) < 0.01 | |
| else f"{metrics['net_change_percentage']:+.1f}%", | |
| ], | |
| } | |
| ) | |
| overall_table = ( | |
| GT(overall_data) | |
| .tab_header( | |
| title="Text Change Summary", | |
| subtitle=f"Total edits: {len([e for e in edits if e.operation != Operation.EQUAL])}", | |
| ) | |
| .fmt_number(columns="Count", decimals=0, use_seps=True) | |
| .tab_style( | |
| style=[style.fill(color="#f0f0f0"), style.text(weight="bold")], | |
| locations=loc.body(rows=[3]), | |
| ) | |
| .cols_align(align="center", columns=["Count", "Percentage"]) | |
| .opt_stylize(style=1, color="blue") | |
| ) | |
| # Table 2: Character Class Changes with top characters | |
| char_class_data = [] | |
| # Get all character classes | |
| all_classes = set() | |
| for op_classes in classifications.values(): | |
| all_classes.update(op_classes.keys()) | |
| all_classes.update(metrics["char_class_metrics"].keys()) | |
| # Build rows | |
| for char_class in sorted(all_classes): | |
| class_metrics = metrics["char_class_metrics"].get(char_class, {}) | |
| # Get top characters for this class | |
| del_chars = detailed_classifications.get(Operation.DELETE, {}).get( | |
| char_class, {} | |
| ) | |
| ins_chars = detailed_classifications.get(Operation.INSERT, {}).get( | |
| char_class, {} | |
| ) | |
| row = { | |
| "Character Class": char_class.replace("_", " ").title(), | |
| "Original": class_metrics.get("original_count", 0), | |
| "Deleted": class_metrics.get("deleted_count", 0), | |
| "Top Deleted": get_top_chars(del_chars, 5), | |
| "Inserted": class_metrics.get("inserted_count", 0), | |
| "Top Inserted": get_top_chars(ins_chars, 5), | |
| "Del %": format_percentage(class_metrics.get("deletion_percentage", 0)) | |
| if class_metrics.get("deletion_percentage", 0) > 0 | |
| else "-", | |
| "Ins %": ( | |
| "new" | |
| if class_metrics.get("insertion_percentage", 0) == float("inf") | |
| else format_percentage(class_metrics.get("insertion_percentage", 0)) | |
| if class_metrics.get("insertion_percentage", 0) > 0 | |
| else "-" | |
| ), | |
| } | |
| # Only include rows with changes | |
| if row["Deleted"] > 0 or row["Inserted"] > 0: | |
| char_class_data.append(row) | |
| if char_class_data: | |
| char_class_df = pd.DataFrame(char_class_data) | |
| char_class_table = ( | |
| GT(char_class_df) | |
| .tab_header(title="Changes by Character Classification") | |
| .fmt_number( | |
| columns=["Original", "Deleted", "Inserted"], | |
| decimals=0, | |
| use_seps=True, | |
| ) | |
| .tab_style( | |
| style=style.fill(color="#ffcccc"), | |
| locations=loc.body(columns=["Deleted", "Top Deleted"]), | |
| ) | |
| .tab_style( | |
| style=style.fill(color="#ccffcc"), | |
| locations=loc.body(columns=["Inserted", "Top Inserted"]), | |
| ) | |
| .tab_style( | |
| style=style.text(font="monospace"), | |
| locations=loc.body(columns=["Top Deleted", "Top Inserted"]), | |
| ) | |
| .cols_align( | |
| align="center", | |
| columns=["Original", "Deleted", "Inserted", "Del %", "Ins %"], | |
| ) | |
| .cols_align(align="left", columns=["Top Deleted", "Top Inserted"]) | |
| .tab_spanner( | |
| label="Counts", columns=["Original", "Deleted", "Inserted"] | |
| ) | |
| .tab_spanner( | |
| label="Characters", columns=["Top Deleted", "Top Inserted"] | |
| ) | |
| .tab_spanner(label="Percentages", columns=["Del %", "Ins %"]) | |
| .cols_width( | |
| { | |
| "Character Class": "20%", | |
| "Original": "10%", | |
| "Deleted": "10%", | |
| "Top Deleted": "15%", | |
| "Inserted": "10%", | |
| "Top Inserted": "15%", | |
| "Del %": "10%", | |
| "Ins %": "10%", | |
| } | |
| ) | |
| .opt_stylize(style=1, color="blue") | |
| ) | |
| else: | |
| char_class_table = None | |
| # Table 3: Compact Combined View (unchanged except for percentage formatting) | |
| compact_data = [] | |
| # Add summary row | |
| compact_data.append( | |
| { | |
| "Type": "Total", | |
| "Deleted": metrics["total_deleted_chars"], | |
| "Inserted": metrics["total_inserted_chars"], | |
| "Net": metrics["total_inserted_chars"] - metrics["total_deleted_chars"], | |
| "Change": f"{metrics['net_change_percentage']:+.3f}%" | |
| if abs(metrics["net_change_percentage"]) < 0.01 | |
| else f"{metrics['net_change_percentage']:+.0f}%", | |
| } | |
| ) | |
| # Add top character classes (sorted by total change) | |
| class_changes = [] | |
| for char_class, class_metrics in metrics["char_class_metrics"].items(): | |
| if ( | |
| class_metrics["deleted_count"] > 0 | |
| or class_metrics["inserted_count"] > 0 | |
| ): | |
| class_changes.append( | |
| { | |
| "Type": char_class.replace("_", " ").title(), | |
| "Deleted": class_metrics["deleted_count"], | |
| "Inserted": class_metrics["inserted_count"], | |
| "Net": class_metrics["inserted_count"] | |
| - class_metrics["deleted_count"], | |
| "Change": class_metrics["deleted_count"] | |
| + class_metrics["inserted_count"], | |
| } | |
| ) | |
| # Sort by total change and take top 5 | |
| class_changes.sort(key=lambda x: x["Change"], reverse=True) | |
| for item in class_changes[:5]: | |
| item["Change"] = f"{item['Net']:+d}" if item["Net"] != 0 else "±0" | |
| compact_data.append(item) | |
| compact_df = pd.DataFrame(compact_data) | |
| compact_table = ( | |
| GT(compact_df) | |
| .tab_header(title="Edit Summary - Compact View") | |
| .fmt_number( | |
| columns=["Deleted", "Inserted", "Net"], decimals=0, use_seps=True | |
| ) | |
| .tab_style( | |
| style=[ | |
| style.fill(color="#e8e8e8"), | |
| style.text(weight="bold"), | |
| style.borders(sides=["top", "bottom"], color="#666", weight="2px"), | |
| ], | |
| locations=loc.body(rows=[0]), | |
| ) | |
| .tab_style( | |
| style=style.text(color="#880000"), | |
| locations=loc.body(columns=["Deleted"]), | |
| ) | |
| .tab_style( | |
| style=style.text(color="#008800"), | |
| locations=loc.body(columns=["Inserted"]), | |
| ) | |
| .cols_align( | |
| align="center", columns=["Deleted", "Inserted", "Net", "Change"] | |
| ) | |
| .cols_width( | |
| { | |
| "Type": "40%", | |
| "Deleted": "15%", | |
| "Inserted": "15%", | |
| "Net": "15%", | |
| "Change": "15%", | |
| } | |
| ) | |
| .opt_stylize(style=1, color="cyan") | |
| ) | |
| return { | |
| "overall": overall_table, | |
| "char_class": char_class_table, | |
| "compact": compact_table, | |
| } | |
| def create_operation_matrix_table( | |
| edits: List[Edit], classifications: Dict[Operation, Dict[str, int]] | |
| ) -> GT: | |
| """Create a matrix view of operations by character class.""" | |
| # Get all character classes | |
| all_classes = set() | |
| for op_classes in classifications.values(): | |
| all_classes.update(op_classes.keys()) | |
| # Build matrix data | |
| matrix_data = [] | |
| for char_class in sorted(all_classes): | |
| row = { | |
| "Character Type": char_class.replace("_", " ").title(), | |
| "Deletions": classifications.get(Operation.DELETE, {}).get( | |
| char_class, 0 | |
| ), | |
| "Insertions": classifications.get(Operation.INSERT, {}).get( | |
| char_class, 0 | |
| ), | |
| "Balance": ( | |
| classifications.get(Operation.INSERT, {}).get(char_class, 0) | |
| - classifications.get(Operation.DELETE, {}).get(char_class, 0) | |
| ), | |
| } | |
| matrix_data.append(row) | |
| # Sort by total changes | |
| matrix_data.sort(key=lambda x: x["Deletions"] + x["Insertions"], reverse=True) | |
| # Convert to DataFrame | |
| matrix_df = pd.DataFrame(matrix_data) | |
| # Calculate max values for domains | |
| max_del = max((r["Deletions"] for r in matrix_data), default=1) | |
| max_ins = max((r["Insertions"] for r in matrix_data), default=1) | |
| max_balance = max((abs(r["Balance"]) for r in matrix_data), default=1) | |
| matrix_table = ( | |
| GT(matrix_df) | |
| .tab_header(title="Operation Matrix by Character Type") | |
| .fmt_number(columns=["Deletions", "Insertions", "Balance"], decimals=0) | |
| .data_color( | |
| columns=["Deletions"], | |
| palette=["white", "#ffcccc"], | |
| domain=[0, max_del], | |
| ) | |
| .data_color( | |
| columns=["Insertions"], | |
| palette=["white", "#ccffcc"], | |
| domain=[0, max_ins], | |
| ) | |
| .data_color( | |
| columns=["Balance"], | |
| palette=["#ffcccc", "white", "#ccffcc"], | |
| domain=[-max_balance, max_balance], | |
| ) | |
| .cols_align(align="center", columns=["Deletions", "Insertions", "Balance"]) | |
| .opt_stylize(style=2, color="gray") | |
| ) | |
| return matrix_table | |
| def is_long_diff(edits: List[Edit], original: str) -> bool: | |
| """Determine if a diff should use fast rendering.""" | |
| return len(edits) > 1000 or len(original) > 10000 | |
| def analyze_text_changes( | |
| original: str, | |
| processed: str, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Main function to analyze changes between two texts. | |
| """ | |
| edits = myers_diff(original, processed) | |
| classifications = classify_edits(edits) | |
| metrics = calculate_change_metrics(original, edits, classifications) | |
| summary = format_diff_summary(edits, classifications, metrics) | |
| result = { | |
| "edits": edits, | |
| "classifications": classifications, | |
| "metrics": metrics, | |
| "summary": summary, | |
| "tables": create_summary_tables(edits, classifications, metrics), | |
| "matrix_table": create_operation_matrix_table(edits, classifications), | |
| } | |
| return result | |
| def render_html_diff( | |
| edits: List[Edit], | |
| original: str, | |
| context_chars: int = 5, | |
| side_by_side: bool = False, | |
| use_fast_html: bool | None = None, | |
| ) -> str: | |
| """ | |
| Unified function to render HTML diffs with automatic optimization. | |
| Args: | |
| edits: List of Edit operations | |
| original: Original text (for length checking) | |
| context_chars: Number of context lines to show in fast mode | |
| side_by_side: Whether to use side-by-side view | |
| use_fast_html: Force fast mode (None for auto-detect) | |
| Returns: | |
| HTML string of the diff | |
| """ | |
| if use_fast_html is None: | |
| use_fast_html = is_long_diff(edits, original) | |
| if use_fast_html: | |
| if side_by_side: | |
| return generate_side_by_side_html_fast( | |
| edits, context_chars=context_chars | |
| ) | |
| else: | |
| return generate_html_diff_fast(edits, context_chars=context_chars) | |
| else: | |
| if side_by_side: | |
| # For non-fast mode, still use length-based optimization | |
| if len(edits) > 500: | |
| return generate_side_by_side_html_fast(edits, max_length=50000) | |
| else: | |
| return generate_side_by_side_html(edits) | |
| else: | |
| return generate_html_diff(edits, show_equal=True, max_equal_length=200) | |
| return analyze_text_changes, render_html_diff | |
| def _(mo): | |
| o_file_upload = mo.ui.file(label="Original text", kind="area") | |
| p_file_upload = mo.ui.file(label="Preprocessed text", kind="area") | |
| file_stack = mo.hstack([o_file_upload, p_file_upload], widths="equal") | |
| return file_stack, o_file_upload, p_file_upload | |
| def _(mo): | |
| o_textbox = mo.ui.text_area(label="Original text", full_width=True) | |
| p_textbox = mo.ui.text_area(label="Preprocessed text", full_width=True) | |
| text_stack = mo.hstack([o_textbox, p_textbox], widths="equal") | |
| return o_textbox, p_textbox, text_stack | |
| def _(file_stack, mo, text_stack): | |
| mo.ui.tabs({"Text": text_stack, "File": file_stack}) | |
| return | |
| def check_text_similarity(text1: str, text2: str, threshold: float = 0.1) -> bool: | |
| """Check if texts are similar enough based on length and character overlap.""" | |
| if not text1 or not text2: | |
| return False | |
| return len(set(text1) & set(text2)) / len( | |
| set(text1) | set(text2) | |
| ) >= threshold and abs(len(text1) - len(text2)) / max(len(text1), len(text2)) <= ( | |
| 1 - threshold | |
| ) | |
| def _(mo, o_file_upload, o_textbox, p_file_upload, p_textbox): | |
| from charset_normalizer import detect | |
| def detect_encoding(b: bytes) -> str: | |
| result = detect(b) | |
| return result["encoding"] | |
| o_text, p_text = ( | |
| "Example text will be used if none provided!", | |
| "Example Text will be used, if none provided.", | |
| ) | |
| try: | |
| if o_file_upload.contents(): | |
| encoding = detect_encoding(o_file_upload.contents()) | |
| try: | |
| o_text = o_file_upload.contents().decode(encoding) | |
| except UnicodeDecodeError: | |
| o_text = o_file_upload.contents().decode("utf-8") | |
| elif o_textbox.value: | |
| o_text = o_textbox.value | |
| if p_file_upload.contents(): | |
| encoding = detect_encoding(o_file_upload.contents()) | |
| try: | |
| p_text = p_file_upload.contents().decode(encoding) | |
| except UnicodeDecodeError: | |
| p_text = p_file_upload.contents().decode("utf-8") | |
| elif p_textbox.value: | |
| p_text = p_textbox.value | |
| except UnicodeDecodeError: | |
| mo.stop( | |
| True, | |
| mo.md("Error decoding files. Please try UTF-8.").callout(kind="danger"), | |
| ) | |
| mo.stop( | |
| not check_text_similarity(o_text, p_text), | |
| mo.md( | |
| f"Texts are too dissimilar! Aborting comparison.\n\n{o_text[:50]}\n\n{p_text[:50]}" | |
| ).callout(kind="danger"), | |
| ) | |
| return o_text, p_text | |
| def _(analyze_text_changes, o_text, p_text): | |
| results = analyze_text_changes(o_text, p_text) | |
| return (results,) | |
| def _(mo, results): | |
| results_tables = mo.vstack( | |
| [ | |
| results["tables"]["overall"], | |
| results["tables"]["char_class"], | |
| results["tables"]["compact"], | |
| ] | |
| ) | |
| return (results_tables,) | |
| def _(mo, o_text, render_html_diff, results, results_tables): | |
| diff_view = mo.ui.tabs( | |
| { | |
| "Combined diff": mo.Html( | |
| render_html_diff( | |
| results["edits"], | |
| o_text, | |
| ) | |
| ), | |
| "Side-by-side diff": mo.Html( | |
| render_html_diff( | |
| results["edits"], | |
| o_text, | |
| side_by_side=True, | |
| ) | |
| ), | |
| } | |
| ) | |
| mo.md(f""" | |
| # Results | |
| {results_tables} | |
| {diff_view} | |
| """) | |
| return | |
| def _(): | |
| return | |
| if __name__ == "__main__": | |
| app.run() | |