Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| from typing import Optional | |
| import io | |
| import warnings | |
| import librosa | |
| import librosa.display | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pyloudnorm as pyln | |
| from matplotlib.patches import Patch | |
| from PIL import Image | |
| from constants import TARGET_LOUDNESS, TARGET_TP, VAD_OFF, VAD_ON | |
| def get_vad_labels(vad_timestamps: list[list[float]], length: float) -> list[dict]: | |
| subtitles = [] | |
| cur = 0.0 | |
| for start, end in vad_timestamps: | |
| if start > cur: | |
| subtitles.append( | |
| { | |
| "text": f"Voice Detection: {VAD_OFF}", | |
| "timestamp": [cur, start], | |
| } | |
| ) | |
| subtitles.append( | |
| { | |
| "text": f"Voice Detection: {VAD_ON}", | |
| "timestamp": [start, end], | |
| } | |
| ) | |
| cur = end | |
| if cur < length: | |
| subtitles.append( | |
| { | |
| "text": f"Voice Detection: {VAD_OFF}", | |
| "timestamp": [cur, length], | |
| } | |
| ) | |
| return subtitles | |
| def to_gradio_audio(x: np.ndarray, sr: int) -> tuple[int, np.ndarray]: | |
| """Return (sample_rate, int16 array) for Gradio Audio.""" | |
| x = np.asarray(x) | |
| x = np.squeeze(x) | |
| if x.ndim == 2 and x.shape[0] in (1, 2) and x.shape[1] > x.shape[0]: | |
| x = x.T | |
| if x.ndim == 2 and x.shape[1] == 1: | |
| x = x[:, 0] | |
| x = x.astype(np.float32) | |
| x = np.clip(x, -1.0, 1.0) | |
| x = (x * 32767).astype(np.int16) | |
| return sr, x | |
| def _merge_vad_segments( | |
| vad_timestamps: list[list[float]], | |
| gap_tolerance: float = 0.05, | |
| ) -> list[tuple[float, float]]: | |
| if not vad_timestamps: | |
| return [] | |
| segments = sorted((float(start), float(end)) for start, end in vad_timestamps) | |
| merged: list[tuple[float, float]] = [segments[0]] | |
| for start, end in segments[1:]: | |
| last_start, last_end = merged[-1] | |
| if start <= last_end + gap_tolerance: | |
| merged[-1] = (last_start, max(last_end, end)) | |
| else: | |
| merged.append((start, end)) | |
| return merged | |
| def spec_image( | |
| audio_array: np.ndarray, | |
| sr: int, | |
| n_fft: int = 2048, | |
| hop_length: int = 512, | |
| n_mels: int = 128, | |
| fmax: Optional[float] = None, | |
| vad_timestamps: Optional[list[list[float]]] = None, | |
| ) -> Image.Image: | |
| y = np.asarray(audio_array, dtype=np.float32).flatten() | |
| S = librosa.feature.melspectrogram( | |
| y=y, | |
| sr=sr, | |
| n_fft=n_fft, | |
| hop_length=hop_length, | |
| n_mels=n_mels, | |
| fmax=fmax or sr // 2, | |
| ) | |
| S_db = librosa.power_to_db(S, ref=np.max) | |
| fig, ax = plt.subplots(figsize=(8, 3), dpi=150) | |
| img = librosa.display.specshow( | |
| S_db, | |
| sr=sr, | |
| hop_length=hop_length, | |
| x_axis="time", | |
| y_axis="mel", | |
| cmap="magma", | |
| ax=ax, | |
| ) | |
| if vad_timestamps: | |
| vad_color = "#22C55E" # softer, cleaner green | |
| merged_segments = _merge_vad_segments(vad_timestamps, gap_tolerance=0.05) | |
| # Draw VAD bar as a fixed portion of the figure height (e.g., 4% of axes height) | |
| bar_height_axes = 0.05 # 2% of axes height | |
| bar_bottom_axes = 0.0 # 0% above the bottom | |
| for start, end in merged_segments: | |
| ax.fill_between( | |
| [start, end], | |
| [bar_bottom_axes, bar_bottom_axes], | |
| [bar_bottom_axes + bar_height_axes, bar_bottom_axes + bar_height_axes], | |
| color=vad_color, | |
| alpha=0.95, | |
| linewidth=0, | |
| zorder=5, | |
| transform=ax.get_xaxis_transform(), | |
| ) | |
| vad_patch = Patch( | |
| facecolor=vad_color, | |
| edgecolor=vad_color, | |
| label="Voice Activity", | |
| ) | |
| ax.legend( | |
| handles=[vad_patch], | |
| loc="upper right", | |
| fontsize=8, | |
| frameon=True, | |
| framealpha=0.9, | |
| ) | |
| cbar = fig.colorbar(img, ax=ax, format="%+2.0f dB") | |
| cbar.set_label("dB") | |
| ax.set_title("Mel-spectrogram") | |
| ax.set_xlabel("Time in s") | |
| ax.set_ylabel("Frequency in Hz") | |
| fig.tight_layout(pad=0.2) | |
| buf = io.BytesIO() | |
| fig.savefig(buf, format="png", bbox_inches="tight", pad_inches=0) | |
| plt.close(fig) | |
| buf.seek(0) | |
| return Image.open(buf).convert("RGB") | |
| def compute_wer(reference: str, hypothesis: str) -> float: | |
| """ | |
| Compute Word Error Rate (WER) between reference and hypothesis transcripts. | |
| """ | |
| ref_words = reference.split() | |
| hyp_words = hypothesis.split() | |
| d = np.zeros((len(ref_words) + 1, len(hyp_words) + 1), dtype=np.uint16) | |
| for i in range(len(ref_words) + 1): | |
| d[i][0] = i | |
| for j in range(len(hyp_words) + 1): | |
| d[0][j] = j | |
| for i in range(1, len(ref_words) + 1): | |
| for j in range(1, len(hyp_words) + 1): | |
| cost = 0 if ref_words[i - 1] == hyp_words[j - 1] else 1 | |
| d[i][j] = min( | |
| d[i - 1][j] + 1, | |
| d[i][j - 1] + 1, | |
| d[i - 1][j - 1] + cost, | |
| ) | |
| return d[len(ref_words)][len(hyp_words)] / max(len(ref_words), 1) | |
| def measure_loudness(x: np.ndarray, sr: int) -> float: | |
| meter = pyln.Meter(sr) | |
| return float(meter.integrated_loudness(x)) | |
| def true_peak_limiter( | |
| x: np.ndarray, | |
| sr: int, | |
| max_true_peak: float = TARGET_TP, | |
| ) -> np.ndarray: | |
| upsampled_sr = 192000 | |
| x_upsampled = librosa.resample(x, orig_sr=sr, target_sr=upsampled_sr) | |
| true_peak = np.max(np.abs(x_upsampled)) | |
| if true_peak > 0: | |
| true_peak_db = 20 * np.log10(true_peak) | |
| if true_peak_db > max_true_peak: | |
| gain_db = max_true_peak - true_peak_db | |
| gain = 10 ** (gain_db / 20) | |
| x_upsampled = x_upsampled * gain | |
| x_limited = librosa.resample(x_upsampled, orig_sr=upsampled_sr, target_sr=sr) | |
| x_limited = librosa.util.fix_length(x_limited, size=x.shape[-1]) | |
| return x_limited.astype(np.float32) | |
| def normalize_lufs(x: np.ndarray, sr: int) -> np.ndarray: | |
| """ | |
| Normalize audio to a fixed integrated loudness target and limit true peak. | |
| """ | |
| try: | |
| current_lufs = measure_loudness(x, sr) | |
| if not np.isfinite(current_lufs): | |
| return x.astype(np.float32) | |
| gain_db = TARGET_LOUDNESS - current_lufs | |
| gain = 10 ** (gain_db / 20) | |
| y = x * gain | |
| y = true_peak_limiter(y, sr, max_true_peak=TARGET_TP) | |
| return y.astype(np.float32) | |
| except Exception as e: | |
| warnings.warn(f"LUFS normalization failed, returning input unchanged: {e}") | |
| return x.astype(np.float32) |