""" Alignment and Visualization Generation This module provides functions for generating HTML visualizations of phoneme- and word-level alignment for Quranic recitation evaluation. It aligns predicted and canonical phoneme sequences, color-codes alignment differences (correct, substitution, insertion, deletion), renders tables (scrolling or chunked), aggregates results for segmented audio, and produces interactive UI elements for feedback (including per-segment audio playback and error reports). Key capabilities: - Per-segment alignment visualization for segmented audio submissions. - Word-level and phoneme-level feedback, including error marking and metrics. - Flexible rendering styles: inline vs. combined tables, collapsible sections, and error grouping. - UI elements for reference/user audio, interactive playback, and lazy loading of segments. - Compatibility with error-analysis rendering (for advanced UI sort/grouping). - Designed for integration into full-stack Gradio/app interfaces or downstream UI frameworks. Most functions return HTML or HTML + metrics, designed for direct embedding in frontend UIs. """ import sys from pathlib import Path # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from i8n import t from config import ( COLORS, PHONEME_TABLE_STYLE, PHONEME_TABLE_CHUNK_SIZE, SEGMENT_PHONEME_TABLE_LOCATION, SEGMENT_ERROR_TABLE_LOCATION, SEGMENT_PHONEME_TABLE_COLLAPSED, ) from .metrics import align_sequences, PER_single from .word_recovery import recover_words, recover_words_for_segment from recitation_analysis.ui.tooltip import get_tooltip_css from recitation_analysis.text_display.error_highlighting import render_combined_error_table # Note: segment audio clips are provided directly by the pipeline. def _generate_segment_divider(segment_num, total_segments, time_range, word_range, is_processing=False, status=None): """Generate a divider between segments. Args: segment_num: Current segment number (1-based) total_segments: Total number of segments time_range: Time range string (e.g., "1.2s - 3.5s") - kept for API compatibility but not displayed word_range: Word range string (e.g., "1-4") is_processing: If True, show blinking orange animation status: 'success' for ✓, 'error' for ✗, 'warning' for ⚠️, None for no indicator """ # Determine status indicator status_icon = "" if status == "success": status_icon = " ✅" elif status == "error": status_icon = " ❌" elif status == "warning": status_icon = " ⚠️" if is_processing: # Blinking orange divider for processing state return f'''
{t("segments.processing_template", n=segment_num, total=total_segments, range=word_range)}
''' else: # Normal blue divider for completed segments with status indicator return f'''
{t("segments.header_template", n=segment_num, total=total_segments, range=word_range)}{status_icon}
''' def _generate_scroll_table(expected_row, actual_row, diff_row): """Generate a single horizontally scrolling table.""" html = f'''
''' for token in expected_row: html += f'' html += f''' ''' for token in actual_row: html += f'' html += f''' ''' for symbol, color in diff_row: html += f'' html += '''
{t("phoneme_alignment.expected")}{token}
{t("phoneme_alignment.your_rec")}{token}
{t("phoneme_alignment.diff")}{symbol}
''' return html def _generate_chunked_table(expected_row, actual_row, diff_row): """Generate multiple tables split into chunks.""" chunk_size = PHONEME_TABLE_CHUNK_SIZE num_chunks = (len(expected_row) + chunk_size - 1) // chunk_size html = '
' for chunk_idx in range(num_chunks): start_idx = chunk_idx * chunk_size end_idx = min(start_idx + chunk_size, len(expected_row)) expected_chunk = expected_row[start_idx:end_idx] actual_chunk = actual_row[start_idx:end_idx] diff_chunk = diff_row[start_idx:end_idx] # Add chunk number if multiple chunks if num_chunks > 1: html += f'
Part {chunk_idx + 1} of {num_chunks}
' html += '''
''' for _ in expected_chunk: html += '' html += f''' ''' for token in expected_chunk: html += f'' html += f'''''' for token in actual_chunk: html += f'' html += f'''''' for symbol, color in diff_chunk: html += f'' html += '''
{t("phoneme_alignment.expected")}{token}
{t("phoneme_alignment.your_rec")}{token}
{t("phoneme_alignment.diff")}{symbol}
''' html += '
' return html def create_alignment_visualization(expected_phonemes, actual_phonemes, verse_ref=None, audio_data=None): """ Create an HTML visualization of phoneme alignment, color-coding substitutions, insertions, and deletions between expected and actual phoneme sequences. - If a verse reference is provided, word-level feedback is included above the phoneme table. - Handles both plain and pre-computed alignments, using error pipeline outputs if available. - Rendering style (scroll or chunked tables) and output appearance depend on config settings. Args: expected_phonemes: Space-separated expected phoneme string actual_phonemes: Space-separated actual phoneme string (from user's recitation) verse_ref: (Optional) Verse reference for word-level feedback and advanced alignment audio_data: (Optional) Tuple of (sample_rate, audio_array) for duration analysis Returns: Tuple of (html_string, accuracy, expected_count, actual_count) html_string: HTML visualization for embedding accuracy: Accuracy percentage (100 - PER) expected_count: Number of expected phonemes actual_count: Number of actual phonemes """ try: # Generate word-level feedback if verse_ref is provided word_html = "" recitation_result = None if verse_ref: word_feedback, word_error, recitation_result = recover_words(verse_ref, expected_phonemes, actual_phonemes, audio_data) if word_feedback: word_html = word_feedback elif word_error: # Show error but continue with phoneme alignment word_html = f'
ℹ️ Word-level feedback unavailable: {word_error}
' # Tokenize phonemes for alignment and metrics expected_tokens = expected_phonemes.split() actual_tokens = actual_phonemes.split() # Reuse alignment from error pipeline if available (avoids O(n*m) recomputation) if recitation_result and recitation_result.phoneme_alignment: alignment = list(recitation_result.phoneme_alignment) else: # Fallback: compute alignment (only when no verse_ref or error occurred) alignment = align_sequences(expected_tokens, actual_tokens) # Prepare data for 3-row table expected_row = [] actual_row = [] diff_row = [] for ref_tok, hyp_tok, op in alignment: if op == "C": # Correct expected_row.append(ref_tok) actual_row.append(hyp_tok) diff_row.append(("", "")) # No symbol, no color elif op == "S": # Substitution expected_row.append(ref_tok) actual_row.append(hyp_tok) diff_row.append(("✗", COLORS["substitution"])) elif op == "D": # Deletion expected_row.append(ref_tok) actual_row.append("—") diff_row.append(("−", COLORS["deletion"])) elif op == "I": # Insertion expected_row.append("—") actual_row.append(hyp_tok) diff_row.append(("+", COLORS["insertion"])) # Calculate PER per_score = PER_single(actual_phonemes, expected_phonemes) accuracy = 100 - per_score # Generate table based on style setting if PHONEME_TABLE_STYLE == "scroll": table_html = _generate_scroll_table(expected_row, actual_row, diff_row) else: table_html = _generate_chunked_table(expected_row, actual_row, diff_row) # Wrap in collapsible section (respects config setting) # Use ltr-preserve class on table only - header should follow RTL direction for Arabic text details_open = "" if SEGMENT_PHONEME_TABLE_COLLAPSED else "open" phoneme_html = f'''
{t("phoneme_alignment.header")}
{table_html}
''' # Prepend word-level feedback before phoneme alignment full_html = word_html + phoneme_html return full_html, accuracy, len(expected_tokens), len(actual_tokens) except Exception as e: error_html = f'
Error creating alignment: {str(e)}
' return error_html, 0, 0, 0 def format_metrics_html(accuracy, expected_count, actual_count): """ Format metrics as HTML for embedding in alignment visualizations— includes expected count, predicted (actual) count, and accuracy percentage. Args: accuracy: Accuracy percentage expected_count: Number of expected phonemes actual_count: Number of actual phonemes Returns: HTML string with formatted metrics """ html = f'''
Expected
{expected_count}
Predicted
{actual_count}
Accuracy
{accuracy:.1f}%
''' return html def create_segmented_alignment_visualization( segments, predicted_phonemes_per_segment, verse_ref, canonical_text, coverage_warning=None, reference_audio_clips=None, user_audio_clips=None, ): """ Create a rich HTML visualization for segmented audio recitation, generating per-segment feedback boxes with audio controls, word-level and phoneme-level alignments, and error summaries. - For each segment: - Displays audio controls (reference and user recordings). - Highlights word-level feedback and phoneme alignment table. - Marks errors, pending states, and disables alignment if segment is missing or errored. - Supports lazy loading of reference audio and playback for each segment. - Handles both inline and end-of-block error/phoneme tables based on configuration. - After all segments, optionally appends combined error and phoneme tables. Args: segments: List of SegmentInfo objects from segment_processor (audio/time/word boundaries) predicted_phonemes_per_segment: List of predicted phoneme strings for each segment (may include None for pending) verse_ref: Full verse reference string canonical_text: Full canonical Arabic text for the verse coverage_warning: (Optional) Warning about incomplete segment/audio coverage (shown at top) reference_audio_clips: (Optional) List of data URIs for reference audio per segment user_audio_clips: (Optional) List of data URIs for user's recorded audio per segment Returns: Tuple of: html_string: HTML for embedding in app UI overall_accuracy: Accuracy percentage aggregated across all segments total_expected: Total expected phoneme count over all segments total_actual: Actual predicted phoneme count over all segments all_results: List[RecitationResult or None] for each segment """ if not segments: return '
No segments detected
', 0, 0, 0, [] total_segments = len(segments) segment_html_parts = [] # Word feedback per segment all_errors = [] # Collect errors from all segments all_results = [] # Collect RecitationResult objects for unified table phoneme_tables = [] # Collect phoneme tables for all segments segment_metadata = [] # Per-segment data for re-rendering with sort modes total_expected = 0 total_actual = 0 total_correct = 0 # Add coverage warning if present if coverage_warning: # Ensure tooltip CSS is present (segment mode builds multiple blocks). segment_html_parts.append(get_tooltip_css()) segment_html_parts.append(f'''
{coverage_warning}
''') else: # Ensure tooltip CSS is present even when no warning. segment_html_parts.append(get_tooltip_css()) # Process each segment for seg_idx, (segment, predicted_phonemes) in enumerate(zip(segments, predicted_phonemes_per_segment)): seg_num = seg_idx + 1 # Time range string time_range = f"{segment.start_time:.1f}s - {segment.end_time:.1f}s" # Word range string (1-based for display) word_range = f"{segment.word_start_idx + 1}-{segment.word_end_idx + 1}" # Check if this segment is currently being processed (None = pending) is_processing = predicted_phonemes is None and not segment.error # Handle segment errors if segment.error: # Add divider with error status divider = _generate_segment_divider(seg_num, total_segments, time_range, word_range, is_processing=False, status="error") segment_html_parts.append(divider) segment_html_parts.append(f'''
⚠️ Segment {seg_num}: {segment.error}
''') all_results.append(None) # Maintain 1:1 correspondence with segments continue # Check for pending status (None) if predicted_phonemes is None: # Add blinking divider for processing divider = _generate_segment_divider(seg_num, total_segments, time_range, word_range, is_processing=True) segment_html_parts.append(divider) segment_html_parts.append(f'''
Processing segment {seg_num}...
''') all_results.append(None) # Maintain 1:1 correspondence with segments continue if not predicted_phonemes: divider = _generate_segment_divider(seg_num, total_segments, time_range, word_range, is_processing=False, status="warning") segment_html_parts.append(divider) # Show matched verse info if available, even without phoneme alignment verse_info = f" ({segment.matched_ref})" if segment.matched_ref else "" text_info = f"
{segment.matched_text}" if segment.matched_text else "" segment_html_parts.append(f'''
⚠️ Segment {seg_num}{verse_info}: Phoneme transcription failed
Audio segment processed but phoneme extraction returned empty. Check logs for details.
{text_info}
''') all_results.append(None) # Maintain 1:1 correspondence with segments continue # Collect segment content first to determine status segment_content_parts = [] segment_has_errors = False # Audio section: reference reciter (left) and user audio (right) in 2-column layout ref_clip = reference_audio_clips[seg_idx] if reference_audio_clips and seg_idx < len(reference_audio_clips) else None user_clip = user_audio_clips[seg_idx] if user_audio_clips and seg_idx < len(user_audio_clips) else None # Build segment info JSON for lazy loading (used when ref_clip is None) import json segment_info_json = json.dumps({ "verse_ref": verse_ref, "word_start": segment.word_start_idx, "word_end": segment.word_end_idx }).replace('"', '"') # Always show audio section if we have user clip OR segment info for lazy loading has_segment_info = segment.word_start_idx is not None and segment.word_end_idx is not None if ref_clip or user_clip or has_segment_info: segment_content_parts.append('
') # {t("segments.reference_audio")} audio (left column) - either embedded or lazy load if ref_clip: # Pre-loaded audio clip segment_content_parts.append(f'''
{t("segments.reference_audio")}
''') elif has_segment_info: # Lazy load placeholder - click to load segment_content_parts.append(f'''
{t("segments.reference_audio")}
Click to load reference
''') # User's recorded audio (right column) if user_clip: segment_content_parts.append(f'''
{t("segments.user_audio")}
''') segment_content_parts.append('
') # Word-level feedback for this segment (segment treated as stopping at end). segment_ref = segment.matched_ref if hasattr(segment, 'matched_ref') and segment.matched_ref else None word_html, segment_errors, resolved_canonical_phonemes, segment_result = recover_words_for_segment( verse_ref, predicted_phonemes, word_start_idx=segment.word_start_idx, word_end_idx=segment.word_end_idx, segment_ref=segment_ref, ) if word_html: segment_content_parts.append(word_html) if segment_errors: segment_has_errors = True all_errors.extend(segment_errors) # Collect RecitationResult for unified table rendering # Always append to maintain 1:1 correspondence with segments (can be None) all_results.append(segment_result) # Render inline error table if configured (no title for inline) if segment_result and SEGMENT_ERROR_TABLE_LOCATION == "inline": inline_error_table = render_combined_error_table( [segment_result], title="" ) if inline_error_table: segment_content_parts.append(inline_error_table) # Prefer the separately-phonemized canonical phonemes (segment treated as stopping); # fall back to the segment processor output if needed. segment_canonical_phonemes = resolved_canonical_phonemes or segment.canonical_phonemes if not segment_canonical_phonemes: segment_content_parts.append(f'''
Segment {seg_num}: No canonical phonemes available
''') # Still add divider and wrapped content box status = "error" if segment_has_errors else "success" divider = _generate_segment_divider(seg_num, total_segments, time_range, word_range, is_processing=False, status=status) segment_html_parts.append(divider) segment_box = f'''
{''.join(segment_content_parts)}
''' segment_html_parts.append(segment_box) continue # Create phoneme alignment for this segment try: # Reuse alignment from segment_result if available (avoids O(n*m) recomputation) if segment_result and segment_result.phoneme_alignment: alignment = list(segment_result.phoneme_alignment) expected_tokens = segment_canonical_phonemes.split() actual_tokens = predicted_phonemes.split() else: # Fallback: compute alignment expected_tokens = segment_canonical_phonemes.split() actual_tokens = predicted_phonemes.split() alignment = align_sequences(expected_tokens, actual_tokens) expected_row = [] actual_row = [] diff_row = [] correct_count = 0 for ref_tok, hyp_tok, op in alignment: if op == "C": expected_row.append(ref_tok) actual_row.append(hyp_tok) diff_row.append(("", "")) correct_count += 1 elif op == "S": expected_row.append(ref_tok) actual_row.append(hyp_tok) diff_row.append(("✗", COLORS["substitution"])) elif op == "D": expected_row.append(ref_tok) actual_row.append("—") diff_row.append(("−", COLORS["deletion"])) elif op == "I": expected_row.append("—") actual_row.append(hyp_tok) diff_row.append(("+", COLORS["insertion"])) total_expected += len(expected_tokens) total_actual += len(actual_tokens) total_correct += correct_count segment_accuracy = (correct_count / len(expected_tokens) * 100) if expected_tokens else 0 # Render inline or store for combined display at end if SEGMENT_PHONEME_TABLE_LOCATION == "inline": # Generate table if PHONEME_TABLE_STYLE == "scroll": table_html = _generate_scroll_table(expected_row, actual_row, diff_row) else: table_html = _generate_chunked_table(expected_row, actual_row, diff_row) # If collapsed mode, wrap in collapsible details; otherwise show directly if SEGMENT_PHONEME_TABLE_COLLAPSED: inline_phoneme_html = f'''
{t("phoneme_alignment.header")}
{table_html}
''' else: # Show directly without collapsible wrapper inline_phoneme_html = f'''
{table_html}
''' # Add to segment content (inside the box) segment_content_parts.append(inline_phoneme_html) else: # Store phoneme table data for combined display at end phoneme_tables.append({ 'seg_num': seg_num, 'time_range': time_range, 'word_range': word_range, 'expected_row': expected_row, 'actual_row': actual_row, 'diff_row': diff_row, 'accuracy': segment_accuracy, 'has_errors': segment_has_errors, }) except Exception as e: segment_content_parts.append(f'''
Error creating alignment for segment {seg_num}: {str(e)}
''') # Now add divider with status based on whether segment has errors status = "error" if segment_has_errors else "success" divider = _generate_segment_divider(seg_num, total_segments, time_range, word_range, is_processing=False, status=status) segment_html_parts.append(divider) # Wrap all segment content in a single box segment_box = f'''
{''.join(segment_content_parts)}
''' segment_html_parts.append(segment_box) # Count errors by handler type for summary table error_counts = {} if segment_result and segment_result.errors: for error in segment_result.errors: handler = getattr(error, 'source_handler', '') or '' # Handle multiple handlers (e.g., "Handler1+Handler2") for part in handler.split('+'): # Extract handler name (before ":") handler_name = part.split(':')[0].strip() if ':' in part else part.strip() if handler_name: error_counts[handler_name] = error_counts.get(handler_name, 0) + 1 # Track segment data for re-rendering with sort modes segment_metadata.append({ 'segment_idx': seg_idx, 'divider_html': divider, 'segment_html': segment_box, 'has_errors': segment_has_errors, 'segment_num': seg_num, 'error_counts': error_counts, }) # Build final HTML: segments feedback -> combined errors (final) -> phoneme tables all_html_parts = segment_html_parts.copy() # Add combined error table once all segments are processed (skip if only 1 segment) # Only render at end if configured for "end" mode (inline mode renders per-segment) if SEGMENT_ERROR_TABLE_LOCATION == "end": if all(p is not None for p in predicted_phonemes_per_segment) and total_segments > 1: combined_table = render_combined_error_table(all_results, title="All Errors") if combined_table: all_html_parts.append(combined_table) # Add combined phoneme alignment tables at the end (only populated in "end" mode) if phoneme_tables: details_open = "" if SEGMENT_PHONEME_TABLE_COLLAPSED else "open" phoneme_html = f'''
{t("phoneme_alignment.header")}
''' for table_data in phoneme_tables: # Add mini divider for each segment's phoneme table with status indicator status_icon = " ❌" if table_data.get('has_errors') else " ✅" phoneme_html += f'''
Segment {table_data['seg_num']}{status_icon}
''' # Generate table if PHONEME_TABLE_STYLE == "scroll": table_html = _generate_scroll_table( table_data['expected_row'], table_data['actual_row'], table_data['diff_row'] ) else: table_html = _generate_chunked_table( table_data['expected_row'], table_data['actual_row'], table_data['diff_row'] ) phoneme_html += table_html phoneme_html += '''
''' all_html_parts.append(phoneme_html) # Calculate overall accuracy overall_accuracy = (total_correct / total_expected * 100) if total_expected > 0 else 0 # Cache segment metadata for re-rendering (only when all segments are processed) # Check that we have metadata for all segments (no pending/error segments skipped) if len(segment_metadata) == total_segments and all(p is not None for p in predicted_phonemes_per_segment): from shared_state import set_last_error_segment_data set_last_error_segment_data(segment_metadata) full_html = ''.join(all_html_parts) return full_html, overall_accuracy, total_expected, total_actual, all_results