from typing import Optional import io import warnings import librosa import librosa.display import matplotlib.pyplot as plt import numpy as np import pyloudnorm as pyln from matplotlib.patches import Patch from PIL import Image from constants import TARGET_LOUDNESS, TARGET_TP, VAD_OFF, VAD_ON def get_vad_labels(vad_timestamps: list[list[float]], length: float) -> list[dict]: subtitles = [] cur = 0.0 for start, end in vad_timestamps: if start > cur: subtitles.append( { "text": f"Voice Detection: {VAD_OFF}", "timestamp": [cur, start], } ) subtitles.append( { "text": f"Voice Detection: {VAD_ON}", "timestamp": [start, end], } ) cur = end if cur < length: subtitles.append( { "text": f"Voice Detection: {VAD_OFF}", "timestamp": [cur, length], } ) return subtitles def to_gradio_audio(x: np.ndarray, sr: int) -> tuple[int, np.ndarray]: """Return (sample_rate, int16 array) for Gradio Audio.""" x = np.asarray(x) x = np.squeeze(x) if x.ndim == 2 and x.shape[0] in (1, 2) and x.shape[1] > x.shape[0]: x = x.T if x.ndim == 2 and x.shape[1] == 1: x = x[:, 0] x = x.astype(np.float32) x = np.clip(x, -1.0, 1.0) x = (x * 32767).astype(np.int16) return sr, x def _merge_vad_segments( vad_timestamps: list[list[float]], gap_tolerance: float = 0.05, ) -> list[tuple[float, float]]: if not vad_timestamps: return [] segments = sorted((float(start), float(end)) for start, end in vad_timestamps) merged: list[tuple[float, float]] = [segments[0]] for start, end in segments[1:]: last_start, last_end = merged[-1] if start <= last_end + gap_tolerance: merged[-1] = (last_start, max(last_end, end)) else: merged.append((start, end)) return merged def spec_image( audio_array: np.ndarray, sr: int, n_fft: int = 2048, hop_length: int = 512, n_mels: int = 128, fmax: Optional[float] = None, vad_timestamps: Optional[list[list[float]]] = None, ) -> Image.Image: y = np.asarray(audio_array, dtype=np.float32).flatten() S = librosa.feature.melspectrogram( y=y, sr=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels, fmax=fmax or sr // 2, ) S_db = librosa.power_to_db(S, ref=np.max) fig, ax = plt.subplots(figsize=(8, 3), dpi=150) img = librosa.display.specshow( S_db, sr=sr, hop_length=hop_length, x_axis="time", y_axis="mel", cmap="magma", ax=ax, ) if vad_timestamps: vad_color = "#22C55E" # softer, cleaner green merged_segments = _merge_vad_segments(vad_timestamps, gap_tolerance=0.05) # Draw VAD bar as a fixed portion of the figure height (e.g., 4% of axes height) bar_height_axes = 0.05 # 2% of axes height bar_bottom_axes = 0.0 # 0% above the bottom for start, end in merged_segments: ax.fill_between( [start, end], [bar_bottom_axes, bar_bottom_axes], [bar_bottom_axes + bar_height_axes, bar_bottom_axes + bar_height_axes], color=vad_color, alpha=0.95, linewidth=0, zorder=5, transform=ax.get_xaxis_transform(), ) vad_patch = Patch( facecolor=vad_color, edgecolor=vad_color, label="Voice Activity", ) ax.legend( handles=[vad_patch], loc="upper right", fontsize=8, frameon=True, framealpha=0.9, ) cbar = fig.colorbar(img, ax=ax, format="%+2.0f dB") cbar.set_label("dB") ax.set_title("Mel-spectrogram") ax.set_xlabel("Time in s") ax.set_ylabel("Frequency in Hz") fig.tight_layout(pad=0.2) buf = io.BytesIO() fig.savefig(buf, format="png", bbox_inches="tight", pad_inches=0) plt.close(fig) buf.seek(0) return Image.open(buf).convert("RGB") def compute_wer(reference: str, hypothesis: str) -> float: """ Compute Word Error Rate (WER) between reference and hypothesis transcripts. """ ref_words = reference.split() hyp_words = hypothesis.split() d = np.zeros((len(ref_words) + 1, len(hyp_words) + 1), dtype=np.uint16) for i in range(len(ref_words) + 1): d[i][0] = i for j in range(len(hyp_words) + 1): d[0][j] = j for i in range(1, len(ref_words) + 1): for j in range(1, len(hyp_words) + 1): cost = 0 if ref_words[i - 1] == hyp_words[j - 1] else 1 d[i][j] = min( d[i - 1][j] + 1, d[i][j - 1] + 1, d[i - 1][j - 1] + cost, ) return d[len(ref_words)][len(hyp_words)] / max(len(ref_words), 1) def measure_loudness(x: np.ndarray, sr: int) -> float: meter = pyln.Meter(sr) return float(meter.integrated_loudness(x)) def true_peak_limiter( x: np.ndarray, sr: int, max_true_peak: float = TARGET_TP, ) -> np.ndarray: upsampled_sr = 192000 x_upsampled = librosa.resample(x, orig_sr=sr, target_sr=upsampled_sr) true_peak = np.max(np.abs(x_upsampled)) if true_peak > 0: true_peak_db = 20 * np.log10(true_peak) if true_peak_db > max_true_peak: gain_db = max_true_peak - true_peak_db gain = 10 ** (gain_db / 20) x_upsampled = x_upsampled * gain x_limited = librosa.resample(x_upsampled, orig_sr=upsampled_sr, target_sr=sr) x_limited = librosa.util.fix_length(x_limited, size=x.shape[-1]) return x_limited.astype(np.float32) def normalize_lufs(x: np.ndarray, sr: int) -> np.ndarray: """ Normalize audio to a fixed integrated loudness target and limit true peak. """ try: current_lufs = measure_loudness(x, sr) if not np.isfinite(current_lufs): return x.astype(np.float32) gain_db = TARGET_LOUDNESS - current_lufs gain = 10 ** (gain_db / 20) y = x * gain y = true_peak_limiter(y, sr, max_true_peak=TARGET_TP) return y.astype(np.float32) except Exception as e: warnings.warn(f"LUFS normalization failed, returning input unchanged: {e}") return x.astype(np.float32)