Spaces:
Running
Running
File size: 4,971 Bytes
d7a2919 c16fd7c d7a2919 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | """
Speaker Embedding Extraction using ECAPA-TDNN architecture via SpeechBrain.
Handles audio preprocessing, feature extraction, and L2-normalized embeddings.
"""
import os
import torch
import torchaudio
import numpy as np
from pathlib import Path
from typing import Union, List, Tuple
from loguru import logger
class EcapaTDNNEmbedder:
"""
Speaker embedding extractor using ECAPA-TDNN architecture.
Produces 192-dim L2-normalized speaker embeddings per audio segment.
"""
MODEL_SOURCE = "speechbrain/spkrec-ecapa-voxceleb"
SAMPLE_RATE = 16000
EMBEDDING_DIM = 192
def __init__(self, device: str = "auto", cache_dir: str = "/tmp/model_cache"):
self.device = self._resolve_device(device)
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self._model = None
logger.info(f"EcapaTDNNEmbedder initialized on device: {self.device}")
def _resolve_device(self, device: str) -> str:
if device == "auto":
return "cuda" if torch.cuda.is_available() else "cpu"
return device
def _load_model(self):
if self._model is not None:
return
try:
import speechbrain.utils.fetching as _fetching
import shutil as _shutil
from pathlib import Path as _Path
def _patched_link(src, dst, local_strategy):
dst = _Path(dst)
src = _Path(src)
dst.parent.mkdir(parents=True, exist_ok=True)
if dst.exists() or dst.is_symlink():
dst.unlink()
_shutil.copy2(str(src), str(dst))
_fetching.link_with_strategy = _patched_link
from speechbrain.inference.classifiers import EncoderClassifier
logger.info(f"Loading ECAPA-TDNN from {self.MODEL_SOURCE}...")
savedir = str(self.cache_dir / "ecapa_tdnn")
import os
os.makedirs(savedir, exist_ok=True)
self._model = EncoderClassifier.from_hparams(
source=self.MODEL_SOURCE,
savedir=savedir,
run_opts={"device": self.device},
)
self._model.eval()
logger.success("ECAPA-TDNN model loaded successfully.")
except ImportError:
raise ImportError("SpeechBrain not installed. Run: pip install speechbrain")
def preprocess_audio(
self, audio: Union[np.ndarray, torch.Tensor], sample_rate: int
) -> torch.Tensor:
"""Resample and normalize audio to 16kHz mono float32 tensor."""
if isinstance(audio, np.ndarray):
audio = torch.from_numpy(audio).float()
if audio.dim() == 1:
audio = audio.unsqueeze(0)
if audio.shape[0] > 1:
audio = audio.mean(dim=0, keepdim=True)
if sample_rate != self.SAMPLE_RATE:
resampler = torchaudio.transforms.Resample(
orig_freq=sample_rate, new_freq=self.SAMPLE_RATE
)
audio = resampler(audio)
max_val = audio.abs().max()
if max_val > 0:
audio = audio / max_val
return audio.squeeze(0)
def extract_embedding(self, audio: torch.Tensor) -> np.ndarray:
"""
Extract L2-normalized ECAPA-TDNN embedding from a preprocessed audio tensor.
Returns L2-normalized embedding of shape (192,)
"""
self._load_model()
with torch.no_grad():
audio_batch = audio.unsqueeze(0).to(self.device)
lengths = torch.tensor([1.0]).to(self.device)
embedding = self._model.encode_batch(audio_batch, lengths)
embedding = embedding.squeeze().cpu().numpy()
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
return embedding
def extract_embeddings_from_segments(
self,
audio: torch.Tensor,
sample_rate: int,
segments: List[Tuple[float, float]],
min_duration: float = 0.5,
) -> Tuple[np.ndarray, List[Tuple[float, float]]]:
"""Extract embeddings for a list of (start, end) time segments."""
processed = self.preprocess_audio(audio, sample_rate)
embeddings = []
valid_segments = []
for start, end in segments:
duration = end - start
if duration < min_duration:
continue
start_sample = int(start * self.SAMPLE_RATE)
end_sample = int(end * self.SAMPLE_RATE)
segment_audio = processed[start_sample:end_sample]
if segment_audio.shape[0] == 0:
continue
emb = self.extract_embedding(segment_audio)
embeddings.append(emb)
valid_segments.append((start, end))
if not embeddings:
return np.empty((0, self.EMBEDDING_DIM)), []
return np.stack(embeddings), valid_segments
|