VoiceFocus / utils.py
mariesig's picture
vad (#3)
32dcdfe
from typing import Optional
import io
import warnings
import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
import pyloudnorm as pyln
from matplotlib.patches import Patch
from PIL import Image
from constants import TARGET_LOUDNESS, TARGET_TP, VAD_OFF, VAD_ON
def get_vad_labels(vad_timestamps: list[list[float]], length: float) -> list[dict]:
subtitles = []
cur = 0.0
for start, end in vad_timestamps:
if start > cur:
subtitles.append(
{
"text": f"Voice Detection: {VAD_OFF}",
"timestamp": [cur, start],
}
)
subtitles.append(
{
"text": f"Voice Detection: {VAD_ON}",
"timestamp": [start, end],
}
)
cur = end
if cur < length:
subtitles.append(
{
"text": f"Voice Detection: {VAD_OFF}",
"timestamp": [cur, length],
}
)
return subtitles
def to_gradio_audio(x: np.ndarray, sr: int) -> tuple[int, np.ndarray]:
"""Return (sample_rate, int16 array) for Gradio Audio."""
x = np.asarray(x)
x = np.squeeze(x)
if x.ndim == 2 and x.shape[0] in (1, 2) and x.shape[1] > x.shape[0]:
x = x.T
if x.ndim == 2 and x.shape[1] == 1:
x = x[:, 0]
x = x.astype(np.float32)
x = np.clip(x, -1.0, 1.0)
x = (x * 32767).astype(np.int16)
return sr, x
def _merge_vad_segments(
vad_timestamps: list[list[float]],
gap_tolerance: float = 0.05,
) -> list[tuple[float, float]]:
if not vad_timestamps:
return []
segments = sorted((float(start), float(end)) for start, end in vad_timestamps)
merged: list[tuple[float, float]] = [segments[0]]
for start, end in segments[1:]:
last_start, last_end = merged[-1]
if start <= last_end + gap_tolerance:
merged[-1] = (last_start, max(last_end, end))
else:
merged.append((start, end))
return merged
def spec_image(
audio_array: np.ndarray,
sr: int,
n_fft: int = 2048,
hop_length: int = 512,
n_mels: int = 128,
fmax: Optional[float] = None,
vad_timestamps: Optional[list[list[float]]] = None,
) -> Image.Image:
y = np.asarray(audio_array, dtype=np.float32).flatten()
S = librosa.feature.melspectrogram(
y=y,
sr=sr,
n_fft=n_fft,
hop_length=hop_length,
n_mels=n_mels,
fmax=fmax or sr // 2,
)
S_db = librosa.power_to_db(S, ref=np.max)
fig, ax = plt.subplots(figsize=(8, 3), dpi=150)
img = librosa.display.specshow(
S_db,
sr=sr,
hop_length=hop_length,
x_axis="time",
y_axis="mel",
cmap="magma",
ax=ax,
)
if vad_timestamps:
vad_color = "#22C55E" # softer, cleaner green
merged_segments = _merge_vad_segments(vad_timestamps, gap_tolerance=0.05)
# Draw VAD bar as a fixed portion of the figure height (e.g., 4% of axes height)
bar_height_axes = 0.05 # 2% of axes height
bar_bottom_axes = 0.0 # 0% above the bottom
for start, end in merged_segments:
ax.fill_between(
[start, end],
[bar_bottom_axes, bar_bottom_axes],
[bar_bottom_axes + bar_height_axes, bar_bottom_axes + bar_height_axes],
color=vad_color,
alpha=0.95,
linewidth=0,
zorder=5,
transform=ax.get_xaxis_transform(),
)
vad_patch = Patch(
facecolor=vad_color,
edgecolor=vad_color,
label="Voice Activity",
)
ax.legend(
handles=[vad_patch],
loc="upper right",
fontsize=8,
frameon=True,
framealpha=0.9,
)
cbar = fig.colorbar(img, ax=ax, format="%+2.0f dB")
cbar.set_label("dB")
ax.set_title("Mel-spectrogram")
ax.set_xlabel("Time in s")
ax.set_ylabel("Frequency in Hz")
fig.tight_layout(pad=0.2)
buf = io.BytesIO()
fig.savefig(buf, format="png", bbox_inches="tight", pad_inches=0)
plt.close(fig)
buf.seek(0)
return Image.open(buf).convert("RGB")
def compute_wer(reference: str, hypothesis: str) -> float:
"""
Compute Word Error Rate (WER) between reference and hypothesis transcripts.
"""
ref_words = reference.split()
hyp_words = hypothesis.split()
d = np.zeros((len(ref_words) + 1, len(hyp_words) + 1), dtype=np.uint16)
for i in range(len(ref_words) + 1):
d[i][0] = i
for j in range(len(hyp_words) + 1):
d[0][j] = j
for i in range(1, len(ref_words) + 1):
for j in range(1, len(hyp_words) + 1):
cost = 0 if ref_words[i - 1] == hyp_words[j - 1] else 1
d[i][j] = min(
d[i - 1][j] + 1,
d[i][j - 1] + 1,
d[i - 1][j - 1] + cost,
)
return d[len(ref_words)][len(hyp_words)] / max(len(ref_words), 1)
def measure_loudness(x: np.ndarray, sr: int) -> float:
meter = pyln.Meter(sr)
return float(meter.integrated_loudness(x))
def true_peak_limiter(
x: np.ndarray,
sr: int,
max_true_peak: float = TARGET_TP,
) -> np.ndarray:
upsampled_sr = 192000
x_upsampled = librosa.resample(x, orig_sr=sr, target_sr=upsampled_sr)
true_peak = np.max(np.abs(x_upsampled))
if true_peak > 0:
true_peak_db = 20 * np.log10(true_peak)
if true_peak_db > max_true_peak:
gain_db = max_true_peak - true_peak_db
gain = 10 ** (gain_db / 20)
x_upsampled = x_upsampled * gain
x_limited = librosa.resample(x_upsampled, orig_sr=upsampled_sr, target_sr=sr)
x_limited = librosa.util.fix_length(x_limited, size=x.shape[-1])
return x_limited.astype(np.float32)
def normalize_lufs(x: np.ndarray, sr: int) -> np.ndarray:
"""
Normalize audio to a fixed integrated loudness target and limit true peak.
"""
try:
current_lufs = measure_loudness(x, sr)
if not np.isfinite(current_lufs):
return x.astype(np.float32)
gain_db = TARGET_LOUDNESS - current_lufs
gain = 10 ** (gain_db / 20)
y = x * gain
y = true_peak_limiter(y, sr, max_true_peak=TARGET_TP)
return y.astype(np.float32)
except Exception as e:
warnings.warn(f"LUFS normalization failed, returning input unchanged: {e}")
return x.astype(np.float32)