"""
Alignment and Visualization Generation
This module provides functions for generating HTML visualizations of phoneme- and word-level
alignment for Quranic recitation evaluation. It aligns predicted and canonical phoneme
sequences, color-codes alignment differences (correct, substitution, insertion, deletion),
renders tables (scrolling or chunked), aggregates results for segmented audio, and produces
interactive UI elements for feedback (including per-segment audio playback and error reports).
Key capabilities:
- Per-segment alignment visualization for segmented audio submissions.
- Word-level and phoneme-level feedback, including error marking and metrics.
- Flexible rendering styles: inline vs. combined tables, collapsible sections, and error
grouping.
- UI elements for reference/user audio, interactive playback, and lazy loading of segments.
- Compatibility with error-analysis rendering (for advanced UI sort/grouping).
- Designed for integration into full-stack Gradio/app interfaces or downstream UI frameworks.
Most functions return HTML or HTML + metrics, designed for direct embedding in frontend UIs.
"""
import sys
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from i8n import t
from config import (
COLORS,
PHONEME_TABLE_STYLE,
PHONEME_TABLE_CHUNK_SIZE,
SEGMENT_PHONEME_TABLE_LOCATION,
SEGMENT_ERROR_TABLE_LOCATION,
SEGMENT_PHONEME_TABLE_COLLAPSED,
)
from .metrics import align_sequences, PER_single
from .word_recovery import recover_words, recover_words_for_segment
from recitation_analysis.ui.tooltip import get_tooltip_css
from recitation_analysis.text_display.error_highlighting import render_combined_error_table
# Note: segment audio clips are provided directly by the pipeline.
def _generate_segment_divider(segment_num, total_segments, time_range, word_range, is_processing=False, status=None):
"""Generate a divider between segments.
Args:
segment_num: Current segment number (1-based)
total_segments: Total number of segments
time_range: Time range string (e.g., "1.2s - 3.5s") - kept for API compatibility but not displayed
word_range: Word range string (e.g., "1-4")
is_processing: If True, show blinking orange animation
status: 'success' for ✓, 'error' for ✗, 'warning' for ⚠️, None for no indicator
"""
# Determine status indicator
status_icon = ""
if status == "success":
status_icon = " ✅"
elif status == "error":
status_icon = " ❌"
elif status == "warning":
status_icon = " ⚠️"
if is_processing:
# Blinking orange divider for processing state
return f'''
'''
def _generate_scroll_table(expected_row, actual_row, diff_row):
"""Generate a single horizontally scrolling table."""
html = f'''
{t("phoneme_alignment.expected")}
'''
for token in expected_row:
html += f'
{token}
'
html += f'''
{t("phoneme_alignment.your_rec")}
'''
for token in actual_row:
html += f'
{token}
'
html += f'''
{t("phoneme_alignment.diff")}
'''
for symbol, color in diff_row:
html += f'
{symbol}
'
html += '''
'''
return html
def _generate_chunked_table(expected_row, actual_row, diff_row):
"""Generate multiple tables split into chunks."""
chunk_size = PHONEME_TABLE_CHUNK_SIZE
num_chunks = (len(expected_row) + chunk_size - 1) // chunk_size
html = '
'
for chunk_idx in range(num_chunks):
start_idx = chunk_idx * chunk_size
end_idx = min(start_idx + chunk_size, len(expected_row))
expected_chunk = expected_row[start_idx:end_idx]
actual_chunk = actual_row[start_idx:end_idx]
diff_chunk = diff_row[start_idx:end_idx]
# Add chunk number if multiple chunks
if num_chunks > 1:
html += f'
Part {chunk_idx + 1} of {num_chunks}
'
html += '''
'''
for _ in expected_chunk:
html += '
'
html += f'''
{t("phoneme_alignment.expected")}
'''
for token in expected_chunk:
html += f'
{token}
'
html += f'''
{t("phoneme_alignment.your_rec")}
'''
for token in actual_chunk:
html += f'
{token}
'
html += f'''
{t("phoneme_alignment.diff")}
'''
for symbol, color in diff_chunk:
html += f'
{symbol}
'
html += '''
'''
html += '
'
return html
def create_alignment_visualization(expected_phonemes, actual_phonemes, verse_ref=None, audio_data=None):
"""
Create an HTML visualization of phoneme alignment, color-coding substitutions, insertions,
and deletions between expected and actual phoneme sequences.
- If a verse reference is provided, word-level feedback is included above the phoneme table.
- Handles both plain and pre-computed alignments, using error pipeline outputs if available.
- Rendering style (scroll or chunked tables) and output appearance depend on config settings.
Args:
expected_phonemes: Space-separated expected phoneme string
actual_phonemes: Space-separated actual phoneme string (from user's recitation)
verse_ref: (Optional) Verse reference for word-level feedback and advanced alignment
audio_data: (Optional) Tuple of (sample_rate, audio_array) for duration analysis
Returns:
Tuple of (html_string, accuracy, expected_count, actual_count)
html_string: HTML visualization for embedding
accuracy: Accuracy percentage (100 - PER)
expected_count: Number of expected phonemes
actual_count: Number of actual phonemes
"""
try:
# Generate word-level feedback if verse_ref is provided
word_html = ""
recitation_result = None
if verse_ref:
word_feedback, word_error, recitation_result = recover_words(verse_ref, expected_phonemes, actual_phonemes, audio_data)
if word_feedback:
word_html = word_feedback
elif word_error:
# Show error but continue with phoneme alignment
word_html = f'
ℹ️ Word-level feedback unavailable: {word_error}
'
# Tokenize phonemes for alignment and metrics
expected_tokens = expected_phonemes.split()
actual_tokens = actual_phonemes.split()
# Reuse alignment from error pipeline if available (avoids O(n*m) recomputation)
if recitation_result and recitation_result.phoneme_alignment:
alignment = list(recitation_result.phoneme_alignment)
else:
# Fallback: compute alignment (only when no verse_ref or error occurred)
alignment = align_sequences(expected_tokens, actual_tokens)
# Prepare data for 3-row table
expected_row = []
actual_row = []
diff_row = []
for ref_tok, hyp_tok, op in alignment:
if op == "C": # Correct
expected_row.append(ref_tok)
actual_row.append(hyp_tok)
diff_row.append(("", "")) # No symbol, no color
elif op == "S": # Substitution
expected_row.append(ref_tok)
actual_row.append(hyp_tok)
diff_row.append(("✗", COLORS["substitution"]))
elif op == "D": # Deletion
expected_row.append(ref_tok)
actual_row.append("—")
diff_row.append(("−", COLORS["deletion"]))
elif op == "I": # Insertion
expected_row.append("—")
actual_row.append(hyp_tok)
diff_row.append(("+", COLORS["insertion"]))
# Calculate PER
per_score = PER_single(actual_phonemes, expected_phonemes)
accuracy = 100 - per_score
# Generate table based on style setting
if PHONEME_TABLE_STYLE == "scroll":
table_html = _generate_scroll_table(expected_row, actual_row, diff_row)
else:
table_html = _generate_chunked_table(expected_row, actual_row, diff_row)
# Wrap in collapsible section (respects config setting)
# Use ltr-preserve class on table only - header should follow RTL direction for Arabic text
details_open = "" if SEGMENT_PHONEME_TABLE_COLLAPSED else "open"
phoneme_html = f'''
{t("phoneme_alignment.header")}
{table_html}
'''
# Prepend word-level feedback before phoneme alignment
full_html = word_html + phoneme_html
return full_html, accuracy, len(expected_tokens), len(actual_tokens)
except Exception as e:
error_html = f'
Error creating alignment: {str(e)}
'
return error_html, 0, 0, 0
def format_metrics_html(accuracy, expected_count, actual_count):
"""
Format metrics as HTML for embedding in alignment visualizations—
includes expected count, predicted (actual) count, and accuracy percentage.
Args:
accuracy: Accuracy percentage
expected_count: Number of expected phonemes
actual_count: Number of actual phonemes
Returns:
HTML string with formatted metrics
"""
html = f'''
Expected
{expected_count}
Predicted
{actual_count}
Accuracy
{accuracy:.1f}%
'''
return html
def create_segmented_alignment_visualization(
segments,
predicted_phonemes_per_segment,
verse_ref,
canonical_text,
coverage_warning=None,
reference_audio_clips=None,
user_audio_clips=None,
):
"""
Create a rich HTML visualization for segmented audio recitation,
generating per-segment feedback boxes with audio controls,
word-level and phoneme-level alignments, and error summaries.
- For each segment:
- Displays audio controls (reference and user recordings).
- Highlights word-level feedback and phoneme alignment table.
- Marks errors, pending states, and disables alignment if segment is missing or errored.
- Supports lazy loading of reference audio and playback for each segment.
- Handles both inline and end-of-block error/phoneme tables based on configuration.
- After all segments, optionally appends combined error and phoneme tables.
Args:
segments: List of SegmentInfo objects from segment_processor (audio/time/word boundaries)
predicted_phonemes_per_segment: List of predicted phoneme strings for each segment (may include None for pending)
verse_ref: Full verse reference string
canonical_text: Full canonical Arabic text for the verse
coverage_warning: (Optional) Warning about incomplete segment/audio coverage (shown at top)
reference_audio_clips: (Optional) List of data URIs for reference audio per segment
user_audio_clips: (Optional) List of data URIs for user's recorded audio per segment
Returns:
Tuple of:
html_string: HTML for embedding in app UI
overall_accuracy: Accuracy percentage aggregated across all segments
total_expected: Total expected phoneme count over all segments
total_actual: Actual predicted phoneme count over all segments
all_results: List[RecitationResult or None] for each segment
"""
if not segments:
return '
No segments detected
', 0, 0, 0, []
total_segments = len(segments)
segment_html_parts = [] # Word feedback per segment
all_errors = [] # Collect errors from all segments
all_results = [] # Collect RecitationResult objects for unified table
phoneme_tables = [] # Collect phoneme tables for all segments
segment_metadata = [] # Per-segment data for re-rendering with sort modes
total_expected = 0
total_actual = 0
total_correct = 0
# Add coverage warning if present
if coverage_warning:
# Ensure tooltip CSS is present (segment mode builds multiple blocks).
segment_html_parts.append(get_tooltip_css())
segment_html_parts.append(f'''
{coverage_warning}
''')
else:
# Ensure tooltip CSS is present even when no warning.
segment_html_parts.append(get_tooltip_css())
# Process each segment
for seg_idx, (segment, predicted_phonemes) in enumerate(zip(segments, predicted_phonemes_per_segment)):
seg_num = seg_idx + 1
# Time range string
time_range = f"{segment.start_time:.1f}s - {segment.end_time:.1f}s"
# Word range string (1-based for display)
word_range = f"{segment.word_start_idx + 1}-{segment.word_end_idx + 1}"
# Check if this segment is currently being processed (None = pending)
is_processing = predicted_phonemes is None and not segment.error
# Handle segment errors
if segment.error:
# Add divider with error status
divider = _generate_segment_divider(seg_num, total_segments, time_range, word_range, is_processing=False, status="error")
segment_html_parts.append(divider)
segment_html_parts.append(f'''
⚠️ Segment {seg_num}: {segment.error}
''')
all_results.append(None) # Maintain 1:1 correspondence with segments
continue
# Check for pending status (None)
if predicted_phonemes is None:
# Add blinking divider for processing
divider = _generate_segment_divider(seg_num, total_segments, time_range, word_range, is_processing=True)
segment_html_parts.append(divider)
segment_html_parts.append(f'''
Processing segment {seg_num}...
''')
all_results.append(None) # Maintain 1:1 correspondence with segments
continue
if not predicted_phonemes:
divider = _generate_segment_divider(seg_num, total_segments, time_range, word_range, is_processing=False, status="warning")
segment_html_parts.append(divider)
# Show matched verse info if available, even without phoneme alignment
verse_info = f" ({segment.matched_ref})" if segment.matched_ref else ""
text_info = f" {segment.matched_text}" if segment.matched_text else ""
segment_html_parts.append(f'''
Audio segment processed but phoneme extraction returned empty. Check logs for details.
{text_info}
''')
all_results.append(None) # Maintain 1:1 correspondence with segments
continue
# Collect segment content first to determine status
segment_content_parts = []
segment_has_errors = False
# Audio section: reference reciter (left) and user audio (right) in 2-column layout
ref_clip = reference_audio_clips[seg_idx] if reference_audio_clips and seg_idx < len(reference_audio_clips) else None
user_clip = user_audio_clips[seg_idx] if user_audio_clips and seg_idx < len(user_audio_clips) else None
# Build segment info JSON for lazy loading (used when ref_clip is None)
import json
segment_info_json = json.dumps({
"verse_ref": verse_ref,
"word_start": segment.word_start_idx,
"word_end": segment.word_end_idx
}).replace('"', '"')
# Always show audio section if we have user clip OR segment info for lazy loading
has_segment_info = segment.word_start_idx is not None and segment.word_end_idx is not None
if ref_clip or user_clip or has_segment_info:
segment_content_parts.append('
')
# {t("segments.reference_audio")} audio (left column) - either embedded or lazy load
if ref_clip:
# Pre-loaded audio clip
segment_content_parts.append(f'''
''')
# User's recorded audio (right column)
if user_clip:
segment_content_parts.append(f'''
{t("segments.user_audio")}
''')
segment_content_parts.append('
')
# Word-level feedback for this segment (segment treated as stopping at end).
segment_ref = segment.matched_ref if hasattr(segment, 'matched_ref') and segment.matched_ref else None
word_html, segment_errors, resolved_canonical_phonemes, segment_result = recover_words_for_segment(
verse_ref,
predicted_phonemes,
word_start_idx=segment.word_start_idx,
word_end_idx=segment.word_end_idx,
segment_ref=segment_ref,
)
if word_html:
segment_content_parts.append(word_html)
if segment_errors:
segment_has_errors = True
all_errors.extend(segment_errors)
# Collect RecitationResult for unified table rendering
# Always append to maintain 1:1 correspondence with segments (can be None)
all_results.append(segment_result)
# Render inline error table if configured (no title for inline)
if segment_result and SEGMENT_ERROR_TABLE_LOCATION == "inline":
inline_error_table = render_combined_error_table(
[segment_result], title=""
)
if inline_error_table:
segment_content_parts.append(inline_error_table)
# Prefer the separately-phonemized canonical phonemes (segment treated as stopping);
# fall back to the segment processor output if needed.
segment_canonical_phonemes = resolved_canonical_phonemes or segment.canonical_phonemes
if not segment_canonical_phonemes:
segment_content_parts.append(f'''
Segment {seg_num}: No canonical phonemes available
''')
# Still add divider and wrapped content box
status = "error" if segment_has_errors else "success"
divider = _generate_segment_divider(seg_num, total_segments, time_range, word_range, is_processing=False, status=status)
segment_html_parts.append(divider)
segment_box = f'''
{''.join(segment_content_parts)}
'''
segment_html_parts.append(segment_box)
continue
# Create phoneme alignment for this segment
try:
# Reuse alignment from segment_result if available (avoids O(n*m) recomputation)
if segment_result and segment_result.phoneme_alignment:
alignment = list(segment_result.phoneme_alignment)
expected_tokens = segment_canonical_phonemes.split()
actual_tokens = predicted_phonemes.split()
else:
# Fallback: compute alignment
expected_tokens = segment_canonical_phonemes.split()
actual_tokens = predicted_phonemes.split()
alignment = align_sequences(expected_tokens, actual_tokens)
expected_row = []
actual_row = []
diff_row = []
correct_count = 0
for ref_tok, hyp_tok, op in alignment:
if op == "C":
expected_row.append(ref_tok)
actual_row.append(hyp_tok)
diff_row.append(("", ""))
correct_count += 1
elif op == "S":
expected_row.append(ref_tok)
actual_row.append(hyp_tok)
diff_row.append(("✗", COLORS["substitution"]))
elif op == "D":
expected_row.append(ref_tok)
actual_row.append("—")
diff_row.append(("−", COLORS["deletion"]))
elif op == "I":
expected_row.append("—")
actual_row.append(hyp_tok)
diff_row.append(("+", COLORS["insertion"]))
total_expected += len(expected_tokens)
total_actual += len(actual_tokens)
total_correct += correct_count
segment_accuracy = (correct_count / len(expected_tokens) * 100) if expected_tokens else 0
# Render inline or store for combined display at end
if SEGMENT_PHONEME_TABLE_LOCATION == "inline":
# Generate table
if PHONEME_TABLE_STYLE == "scroll":
table_html = _generate_scroll_table(expected_row, actual_row, diff_row)
else:
table_html = _generate_chunked_table(expected_row, actual_row, diff_row)
# If collapsed mode, wrap in collapsible details; otherwise show directly
if SEGMENT_PHONEME_TABLE_COLLAPSED:
inline_phoneme_html = f'''
{t("phoneme_alignment.header")}
{table_html}
'''
else:
# Show directly without collapsible wrapper
inline_phoneme_html = f'''
{table_html}
'''
# Add to segment content (inside the box)
segment_content_parts.append(inline_phoneme_html)
else:
# Store phoneme table data for combined display at end
phoneme_tables.append({
'seg_num': seg_num,
'time_range': time_range,
'word_range': word_range,
'expected_row': expected_row,
'actual_row': actual_row,
'diff_row': diff_row,
'accuracy': segment_accuracy,
'has_errors': segment_has_errors,
})
except Exception as e:
segment_content_parts.append(f'''
Error creating alignment for segment {seg_num}: {str(e)}
''')
# Now add divider with status based on whether segment has errors
status = "error" if segment_has_errors else "success"
divider = _generate_segment_divider(seg_num, total_segments, time_range, word_range, is_processing=False, status=status)
segment_html_parts.append(divider)
# Wrap all segment content in a single box
segment_box = f'''
{''.join(segment_content_parts)}
'''
segment_html_parts.append(segment_box)
# Count errors by handler type for summary table
error_counts = {}
if segment_result and segment_result.errors:
for error in segment_result.errors:
handler = getattr(error, 'source_handler', '') or ''
# Handle multiple handlers (e.g., "Handler1+Handler2")
for part in handler.split('+'):
# Extract handler name (before ":")
handler_name = part.split(':')[0].strip() if ':' in part else part.strip()
if handler_name:
error_counts[handler_name] = error_counts.get(handler_name, 0) + 1
# Track segment data for re-rendering with sort modes
segment_metadata.append({
'segment_idx': seg_idx,
'divider_html': divider,
'segment_html': segment_box,
'has_errors': segment_has_errors,
'segment_num': seg_num,
'error_counts': error_counts,
})
# Build final HTML: segments feedback -> combined errors (final) -> phoneme tables
all_html_parts = segment_html_parts.copy()
# Add combined error table once all segments are processed (skip if only 1 segment)
# Only render at end if configured for "end" mode (inline mode renders per-segment)
if SEGMENT_ERROR_TABLE_LOCATION == "end":
if all(p is not None for p in predicted_phonemes_per_segment) and total_segments > 1:
combined_table = render_combined_error_table(all_results, title="All Errors")
if combined_table:
all_html_parts.append(combined_table)
# Add combined phoneme alignment tables at the end (only populated in "end" mode)
if phoneme_tables:
details_open = "" if SEGMENT_PHONEME_TABLE_COLLAPSED else "open"
phoneme_html = f'''
{t("phoneme_alignment.header")}
'''
for table_data in phoneme_tables:
# Add mini divider for each segment's phoneme table with status indicator
status_icon = " ❌" if table_data.get('has_errors') else " ✅"
phoneme_html += f'''
'''
all_html_parts.append(phoneme_html)
# Calculate overall accuracy
overall_accuracy = (total_correct / total_expected * 100) if total_expected > 0 else 0
# Cache segment metadata for re-rendering (only when all segments are processed)
# Check that we have metadata for all segments (no pending/error segments skipped)
if len(segment_metadata) == total_segments and all(p is not None for p in predicted_phonemes_per_segment):
from shared_state import set_last_error_segment_data
set_last_error_segment_data(segment_metadata)
full_html = ''.join(all_html_parts)
return full_html, overall_accuracy, total_expected, total_actual, all_results