vaani-cavp-engine / modules /prosodic_profiling.py
Shaankar39's picture
init: Vaani CAVP engine (CPU, accuracy-first — Whisper large-v3, spaCy trf)
7d5f092
"""PROSODIC PROFILING MODULE
F0 contour analysis, rhythm metrics (PVI, %V, nPVI, rPVI),
stress patterns, intonation contour classification.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
import numpy as np
@dataclass
class RhythmMetrics:
percent_v: float # %V — proportion of vocalic intervals
delta_v: float # deltaV — variability of vocalic intervals
delta_c: float # deltaC — variability of consonantal intervals
npvi_v: float # nPVI — normalized pairwise variability (vocalic)
rpvi_c: float # rPVI — raw pairwise variability (consonantal)
varco_v: float # VarcoV — variation coefficient of vocalic intervals
varco_c: float # VarcoC — variation coefficient of consonantal intervals
rhythm_class: str # "stress-timed", "syllable-timed", "mora-timed"
@dataclass
class IntonationContour:
pattern: str # "falling", "rising", "fall-rise", "rise-fall", "flat"
boundary_tones: list[str] # ToBI style: H%, L%, H-L%, L-H%
pitch_accents: list[dict[str, Any]]
declination_rate: float # Hz/second baseline declination
@dataclass
class StressPattern:
word: str
stressed_syllable: int
stress_type: str # "primary", "secondary"
duration_ratio: float # stressed/unstressed duration ratio
intensity_ratio: float # stressed/unstressed intensity ratio
f0_ratio: float # stressed/unstressed F0 ratio
@dataclass
class ProsodicProfile:
rhythm: RhythmMetrics
intonation: IntonationContour
stress_patterns: list[StressPattern]
speech_rate_syl_per_sec: float
mean_syllable_duration_ms: float
pause_to_speech_ratio: float
prosodic_score: float # 0-100 overall prosodic nativeness
def _compute_rhythm_metrics(
word_timestamps: list[dict[str, Any]],
) -> RhythmMetrics:
"""Compute rhythm metrics from word-level timing."""
vocalic_intervals: list[float] = []
consonantal_intervals: list[float] = []
for w in word_timestamps:
word = w.get("word", "")
dur = (w.get("end", 0) - w.get("start", 0)) * 1000
if dur <= 0:
continue
# Approximate V/C intervals from orthography
vowels_in_word = sum(1 for c in word.lower() if c in "aeiou")
consonants_in_word = sum(1 for c in word.lower() if c.isalpha() and c not in "aeiou")
total_letters = vowels_in_word + consonants_in_word
if total_letters > 0:
v_dur = dur * (vowels_in_word / total_letters)
c_dur = dur * (consonants_in_word / total_letters)
if v_dur > 0:
vocalic_intervals.append(v_dur)
if c_dur > 0:
consonantal_intervals.append(c_dur)
v_arr = np.array(vocalic_intervals) if vocalic_intervals else np.array([0])
c_arr = np.array(consonantal_intervals) if consonantal_intervals else np.array([0])
total_duration = np.sum(v_arr) + np.sum(c_arr)
percent_v = (np.sum(v_arr) / total_duration * 100) if total_duration > 0 else 0
delta_v = float(np.std(v_arr))
delta_c = float(np.std(c_arr))
# nPVI (normalized pairwise variability index)
def npvi(intervals: np.ndarray) -> float:
if len(intervals) < 2:
return 0.0
diffs = []
for i in range(len(intervals) - 1):
mean_pair = (intervals[i] + intervals[i + 1]) / 2
if mean_pair > 0:
diffs.append(abs(intervals[i] - intervals[i + 1]) / mean_pair)
return float(np.mean(diffs) * 100) if diffs else 0.0
# rPVI (raw pairwise variability)
def rpvi(intervals: np.ndarray) -> float:
if len(intervals) < 2:
return 0.0
return float(np.mean([abs(intervals[i] - intervals[i + 1]) for i in range(len(intervals) - 1)]))
npvi_v = npvi(v_arr)
rpvi_c = rpvi(c_arr)
varco_v = (delta_v / np.mean(v_arr) * 100) if np.mean(v_arr) > 0 else 0
varco_c = (delta_c / np.mean(c_arr) * 100) if np.mean(c_arr) > 0 else 0
# Rhythm class heuristic — calibrated 2026-04-21 on 50-clip Svarah set.
# Bangla L1: nPVI mean 56, varco 55 (syllable-timed in Indian English reads);
# Hindi L1: nPVI 71, varco 86 (stress-timed); Tamil L1: nPVI 60, varco 51.
# Old thresholds (nPVI>55 stress, <35 syllable) placed Bangla in stress-timed.
if npvi_v > 75 or varco_v > 80:
rhythm_class = "stress-timed"
elif npvi_v < 60 and varco_v < 70:
rhythm_class = "syllable-timed"
else:
rhythm_class = "mixed"
return RhythmMetrics(
percent_v=round(percent_v, 2),
delta_v=round(delta_v, 2),
delta_c=round(delta_c, 2),
npvi_v=round(npvi_v, 2),
rpvi_c=round(rpvi_c, 2),
varco_v=round(float(varco_v), 2),
varco_c=round(float(varco_c), 2),
rhythm_class=rhythm_class,
)
def _analyze_intonation(pitch_contour: list[float]) -> IntonationContour:
"""Classify intonation pattern from pitch contour."""
if not pitch_contour or len(pitch_contour) < 4:
return IntonationContour(
pattern="flat",
boundary_tones=[],
pitch_accents=[],
declination_rate=0.0,
)
arr = np.array(pitch_contour)
first_quarter = np.mean(arr[: len(arr) // 4])
last_quarter = np.mean(arr[3 * len(arr) // 4 :])
mid = np.mean(arr[len(arr) // 4 : 3 * len(arr) // 4])
# Pattern classification
if last_quarter < first_quarter * 0.85:
pattern = "falling"
boundary_tones = ["L%"]
elif last_quarter > first_quarter * 1.15:
pattern = "rising"
boundary_tones = ["H%"]
elif mid > first_quarter * 1.1 and last_quarter < mid * 0.9:
pattern = "rise-fall"
boundary_tones = ["L-H%", "H-L%"]
elif mid < first_quarter * 0.9 and last_quarter > mid * 1.1:
pattern = "fall-rise"
boundary_tones = ["H-L%", "L-H%"]
else:
pattern = "flat"
boundary_tones = ["L%"]
# Pitch accents (local maxima)
accents: list[dict[str, Any]] = []
for i in range(1, len(arr) - 1):
if arr[i] > arr[i - 1] and arr[i] > arr[i + 1]:
accents.append({
"position": i,
"f0": round(float(arr[i]), 1),
"type": "H*",
})
# Declination rate (linear regression slope)
x = np.arange(len(arr))
if len(arr) > 2:
slope = float(np.polyfit(x, arr, 1)[0])
# Convert to Hz/sec (assuming ~10ms per frame)
decl_rate = slope * 100
else:
decl_rate = 0.0
return IntonationContour(
pattern=pattern,
boundary_tones=boundary_tones,
pitch_accents=accents[:20],
declination_rate=round(decl_rate, 2),
)
def profile_prosody(
word_timestamps: list[dict[str, Any]],
pitch_data: dict[str, Any],
duration_seconds: float,
total_pause_ms: float,
) -> ProsodicProfile:
"""Full prosodic profiling."""
rhythm = _compute_rhythm_metrics(word_timestamps)
intonation = _analyze_intonation(pitch_data.get("pitch_contour", []))
# Stress patterns (approximate from duration + intensity)
stress_patterns: list[StressPattern] = []
words = word_timestamps or []
durations = [(w.get("end", 0) - w.get("start", 0)) * 1000 for w in words]
mean_dur = np.mean(durations) if durations else 100
for i, w in enumerate(words):
dur = durations[i] if i < len(durations) else 100
ratio = dur / mean_dur if mean_dur > 0 else 1.0
if ratio > 1.2:
stress_patterns.append(StressPattern(
word=w.get("word", ""),
stressed_syllable=1,
stress_type="primary",
duration_ratio=round(ratio, 2),
intensity_ratio=1.0,
f0_ratio=1.0,
))
syllable_count = sum(max(1, sum(1 for c in w.get("word", "") if c.lower() in "aeiou")) for w in words)
syl_rate = syllable_count / duration_seconds if duration_seconds > 0 else 0
mean_syl_dur = (duration_seconds * 1000) / syllable_count if syllable_count > 0 else 0
pause_ratio = (total_pause_ms / 1000) / duration_seconds if duration_seconds > 0 else 0
# Prosodic nativeness score (heuristic)
score = 50.0
if 4.0 <= syl_rate <= 6.5:
score += 15
if rhythm.rhythm_class == "stress-timed":
score += 10
if intonation.pattern in ("falling", "rise-fall"):
score += 10
if 0.1 <= pause_ratio <= 0.3:
score += 15
score = min(100.0, max(0.0, score))
return ProsodicProfile(
rhythm=rhythm,
intonation=intonation,
stress_patterns=stress_patterns[:30],
speech_rate_syl_per_sec=round(syl_rate, 2),
mean_syllable_duration_ms=round(mean_syl_dur, 2),
pause_to_speech_ratio=round(pause_ratio, 4),
prosodic_score=round(score, 2),
)