Spaces:
Build error
Build error
| """PROSODIC PROFILING MODULE | |
| F0 contour analysis, rhythm metrics (PVI, %V, nPVI, rPVI), | |
| stress patterns, intonation contour classification. | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from typing import Any | |
| import numpy as np | |
| class RhythmMetrics: | |
| percent_v: float # %V — proportion of vocalic intervals | |
| delta_v: float # deltaV — variability of vocalic intervals | |
| delta_c: float # deltaC — variability of consonantal intervals | |
| npvi_v: float # nPVI — normalized pairwise variability (vocalic) | |
| rpvi_c: float # rPVI — raw pairwise variability (consonantal) | |
| varco_v: float # VarcoV — variation coefficient of vocalic intervals | |
| varco_c: float # VarcoC — variation coefficient of consonantal intervals | |
| rhythm_class: str # "stress-timed", "syllable-timed", "mora-timed" | |
| class IntonationContour: | |
| pattern: str # "falling", "rising", "fall-rise", "rise-fall", "flat" | |
| boundary_tones: list[str] # ToBI style: H%, L%, H-L%, L-H% | |
| pitch_accents: list[dict[str, Any]] | |
| declination_rate: float # Hz/second baseline declination | |
| class StressPattern: | |
| word: str | |
| stressed_syllable: int | |
| stress_type: str # "primary", "secondary" | |
| duration_ratio: float # stressed/unstressed duration ratio | |
| intensity_ratio: float # stressed/unstressed intensity ratio | |
| f0_ratio: float # stressed/unstressed F0 ratio | |
| class ProsodicProfile: | |
| rhythm: RhythmMetrics | |
| intonation: IntonationContour | |
| stress_patterns: list[StressPattern] | |
| speech_rate_syl_per_sec: float | |
| mean_syllable_duration_ms: float | |
| pause_to_speech_ratio: float | |
| prosodic_score: float # 0-100 overall prosodic nativeness | |
| def _compute_rhythm_metrics( | |
| word_timestamps: list[dict[str, Any]], | |
| ) -> RhythmMetrics: | |
| """Compute rhythm metrics from word-level timing.""" | |
| vocalic_intervals: list[float] = [] | |
| consonantal_intervals: list[float] = [] | |
| for w in word_timestamps: | |
| word = w.get("word", "") | |
| dur = (w.get("end", 0) - w.get("start", 0)) * 1000 | |
| if dur <= 0: | |
| continue | |
| # Approximate V/C intervals from orthography | |
| vowels_in_word = sum(1 for c in word.lower() if c in "aeiou") | |
| consonants_in_word = sum(1 for c in word.lower() if c.isalpha() and c not in "aeiou") | |
| total_letters = vowels_in_word + consonants_in_word | |
| if total_letters > 0: | |
| v_dur = dur * (vowels_in_word / total_letters) | |
| c_dur = dur * (consonants_in_word / total_letters) | |
| if v_dur > 0: | |
| vocalic_intervals.append(v_dur) | |
| if c_dur > 0: | |
| consonantal_intervals.append(c_dur) | |
| v_arr = np.array(vocalic_intervals) if vocalic_intervals else np.array([0]) | |
| c_arr = np.array(consonantal_intervals) if consonantal_intervals else np.array([0]) | |
| total_duration = np.sum(v_arr) + np.sum(c_arr) | |
| percent_v = (np.sum(v_arr) / total_duration * 100) if total_duration > 0 else 0 | |
| delta_v = float(np.std(v_arr)) | |
| delta_c = float(np.std(c_arr)) | |
| # nPVI (normalized pairwise variability index) | |
| def npvi(intervals: np.ndarray) -> float: | |
| if len(intervals) < 2: | |
| return 0.0 | |
| diffs = [] | |
| for i in range(len(intervals) - 1): | |
| mean_pair = (intervals[i] + intervals[i + 1]) / 2 | |
| if mean_pair > 0: | |
| diffs.append(abs(intervals[i] - intervals[i + 1]) / mean_pair) | |
| return float(np.mean(diffs) * 100) if diffs else 0.0 | |
| # rPVI (raw pairwise variability) | |
| def rpvi(intervals: np.ndarray) -> float: | |
| if len(intervals) < 2: | |
| return 0.0 | |
| return float(np.mean([abs(intervals[i] - intervals[i + 1]) for i in range(len(intervals) - 1)])) | |
| npvi_v = npvi(v_arr) | |
| rpvi_c = rpvi(c_arr) | |
| varco_v = (delta_v / np.mean(v_arr) * 100) if np.mean(v_arr) > 0 else 0 | |
| varco_c = (delta_c / np.mean(c_arr) * 100) if np.mean(c_arr) > 0 else 0 | |
| # Rhythm class heuristic — calibrated 2026-04-21 on 50-clip Svarah set. | |
| # Bangla L1: nPVI mean 56, varco 55 (syllable-timed in Indian English reads); | |
| # Hindi L1: nPVI 71, varco 86 (stress-timed); Tamil L1: nPVI 60, varco 51. | |
| # Old thresholds (nPVI>55 stress, <35 syllable) placed Bangla in stress-timed. | |
| if npvi_v > 75 or varco_v > 80: | |
| rhythm_class = "stress-timed" | |
| elif npvi_v < 60 and varco_v < 70: | |
| rhythm_class = "syllable-timed" | |
| else: | |
| rhythm_class = "mixed" | |
| return RhythmMetrics( | |
| percent_v=round(percent_v, 2), | |
| delta_v=round(delta_v, 2), | |
| delta_c=round(delta_c, 2), | |
| npvi_v=round(npvi_v, 2), | |
| rpvi_c=round(rpvi_c, 2), | |
| varco_v=round(float(varco_v), 2), | |
| varco_c=round(float(varco_c), 2), | |
| rhythm_class=rhythm_class, | |
| ) | |
| def _analyze_intonation(pitch_contour: list[float]) -> IntonationContour: | |
| """Classify intonation pattern from pitch contour.""" | |
| if not pitch_contour or len(pitch_contour) < 4: | |
| return IntonationContour( | |
| pattern="flat", | |
| boundary_tones=[], | |
| pitch_accents=[], | |
| declination_rate=0.0, | |
| ) | |
| arr = np.array(pitch_contour) | |
| first_quarter = np.mean(arr[: len(arr) // 4]) | |
| last_quarter = np.mean(arr[3 * len(arr) // 4 :]) | |
| mid = np.mean(arr[len(arr) // 4 : 3 * len(arr) // 4]) | |
| # Pattern classification | |
| if last_quarter < first_quarter * 0.85: | |
| pattern = "falling" | |
| boundary_tones = ["L%"] | |
| elif last_quarter > first_quarter * 1.15: | |
| pattern = "rising" | |
| boundary_tones = ["H%"] | |
| elif mid > first_quarter * 1.1 and last_quarter < mid * 0.9: | |
| pattern = "rise-fall" | |
| boundary_tones = ["L-H%", "H-L%"] | |
| elif mid < first_quarter * 0.9 and last_quarter > mid * 1.1: | |
| pattern = "fall-rise" | |
| boundary_tones = ["H-L%", "L-H%"] | |
| else: | |
| pattern = "flat" | |
| boundary_tones = ["L%"] | |
| # Pitch accents (local maxima) | |
| accents: list[dict[str, Any]] = [] | |
| for i in range(1, len(arr) - 1): | |
| if arr[i] > arr[i - 1] and arr[i] > arr[i + 1]: | |
| accents.append({ | |
| "position": i, | |
| "f0": round(float(arr[i]), 1), | |
| "type": "H*", | |
| }) | |
| # Declination rate (linear regression slope) | |
| x = np.arange(len(arr)) | |
| if len(arr) > 2: | |
| slope = float(np.polyfit(x, arr, 1)[0]) | |
| # Convert to Hz/sec (assuming ~10ms per frame) | |
| decl_rate = slope * 100 | |
| else: | |
| decl_rate = 0.0 | |
| return IntonationContour( | |
| pattern=pattern, | |
| boundary_tones=boundary_tones, | |
| pitch_accents=accents[:20], | |
| declination_rate=round(decl_rate, 2), | |
| ) | |
| def profile_prosody( | |
| word_timestamps: list[dict[str, Any]], | |
| pitch_data: dict[str, Any], | |
| duration_seconds: float, | |
| total_pause_ms: float, | |
| ) -> ProsodicProfile: | |
| """Full prosodic profiling.""" | |
| rhythm = _compute_rhythm_metrics(word_timestamps) | |
| intonation = _analyze_intonation(pitch_data.get("pitch_contour", [])) | |
| # Stress patterns (approximate from duration + intensity) | |
| stress_patterns: list[StressPattern] = [] | |
| words = word_timestamps or [] | |
| durations = [(w.get("end", 0) - w.get("start", 0)) * 1000 for w in words] | |
| mean_dur = np.mean(durations) if durations else 100 | |
| for i, w in enumerate(words): | |
| dur = durations[i] if i < len(durations) else 100 | |
| ratio = dur / mean_dur if mean_dur > 0 else 1.0 | |
| if ratio > 1.2: | |
| stress_patterns.append(StressPattern( | |
| word=w.get("word", ""), | |
| stressed_syllable=1, | |
| stress_type="primary", | |
| duration_ratio=round(ratio, 2), | |
| intensity_ratio=1.0, | |
| f0_ratio=1.0, | |
| )) | |
| syllable_count = sum(max(1, sum(1 for c in w.get("word", "") if c.lower() in "aeiou")) for w in words) | |
| syl_rate = syllable_count / duration_seconds if duration_seconds > 0 else 0 | |
| mean_syl_dur = (duration_seconds * 1000) / syllable_count if syllable_count > 0 else 0 | |
| pause_ratio = (total_pause_ms / 1000) / duration_seconds if duration_seconds > 0 else 0 | |
| # Prosodic nativeness score (heuristic) | |
| score = 50.0 | |
| if 4.0 <= syl_rate <= 6.5: | |
| score += 15 | |
| if rhythm.rhythm_class == "stress-timed": | |
| score += 10 | |
| if intonation.pattern in ("falling", "rise-fall"): | |
| score += 10 | |
| if 0.1 <= pause_ratio <= 0.3: | |
| score += 15 | |
| score = min(100.0, max(0.0, score)) | |
| return ProsodicProfile( | |
| rhythm=rhythm, | |
| intonation=intonation, | |
| stress_patterns=stress_patterns[:30], | |
| speech_rate_syl_per_sec=round(syl_rate, 2), | |
| mean_syllable_duration_ms=round(mean_syl_dur, 2), | |
| pause_to_speech_ratio=round(pause_ratio, 4), | |
| prosodic_score=round(score, 2), | |
| ) | |