File size: 5,503 Bytes
d7a2919
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c16fd7c
d7a2919
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76debc2
d7a2919
 
 
76debc2
d7a2919
 
 
 
 
76debc2
d7a2919
 
 
 
76debc2
d7a2919
 
76debc2
d7a2919
 
 
 
 
 
76debc2
d7a2919
 
 
76debc2
 
 
 
 
 
 
 
 
 
 
d7a2919
76debc2
 
d7a2919
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""
Speaker Embedding Extraction using ECAPA-TDNN architecture via SpeechBrain.
Handles audio preprocessing, feature extraction, and L2-normalized embeddings.
"""

import os
import torch
import torchaudio
import numpy as np
from pathlib import Path
from typing import Union, List, Tuple
from loguru import logger


class EcapaTDNNEmbedder:
    """
    Speaker embedding extractor using ECAPA-TDNN architecture.
    Produces 192-dim L2-normalized speaker embeddings per audio segment.
    """

    MODEL_SOURCE = "speechbrain/spkrec-ecapa-voxceleb"
    SAMPLE_RATE = 16000
    EMBEDDING_DIM = 192

    def __init__(self, device: str = "auto", cache_dir: str = "/tmp/model_cache"):
        self.device = self._resolve_device(device)
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self._model = None
        logger.info(f"EcapaTDNNEmbedder initialized on device: {self.device}")

    def _resolve_device(self, device: str) -> str:
        if device == "auto":
            return "cuda" if torch.cuda.is_available() else "cpu"
        return device

    def _load_model(self):
        if self._model is not None:
            return

        try:
            import shutil
            import speechbrain.utils.fetching as _fetching

            def _patched_link(src, dst, local_strategy):
                from pathlib import Path as _Path
                dst = _Path(dst)
                src = _Path(src)
                dst.parent.mkdir(parents=True, exist_ok=True)
                if dst.exists() or dst.is_symlink():
                    dst.unlink()
                shutil.copy2(str(src), str(dst))

            _fetching.link_with_strategy = _patched_link

            from speechbrain.inference.classifiers import EncoderClassifier

            logger.info(f"Loading ECAPA-TDNN from {self.MODEL_SOURCE}...")

            savedir = "/tmp/model_cache/ecapa_tdnn"
            os.makedirs(savedir, exist_ok=True)

            self._model = EncoderClassifier.from_hparams(
                source=self.MODEL_SOURCE,
                savedir=savedir,
                run_opts={"device": self.device},
                huggingface_cache_dir="/tmp/hf_cache",
            )
            self._model.eval()
            logger.success("ECAPA-TDNN model loaded successfully.")
        except TypeError as e:
            # huggingface_cache_dir not supported in this version, try without
            from speechbrain.inference.classifiers import EncoderClassifier
            savedir = "/tmp/model_cache/ecapa_tdnn"
            self._model = EncoderClassifier.from_hparams(
                source=self.MODEL_SOURCE,
                savedir=savedir,
                run_opts={"device": self.device},
            )
            self._model.eval()
            logger.success("ECAPA-TDNN model loaded (fallback).")
        except ImportError:
            raise ImportError("SpeechBrain not installed.")
        
    def preprocess_audio(
        self, audio: Union[np.ndarray, torch.Tensor], sample_rate: int
    ) -> torch.Tensor:
        """Resample and normalize audio to 16kHz mono float32 tensor."""
        if isinstance(audio, np.ndarray):
            audio = torch.from_numpy(audio).float()

        if audio.dim() == 1:
            audio = audio.unsqueeze(0)

        if audio.shape[0] > 1:
            audio = audio.mean(dim=0, keepdim=True)

        if sample_rate != self.SAMPLE_RATE:
            resampler = torchaudio.transforms.Resample(
                orig_freq=sample_rate, new_freq=self.SAMPLE_RATE
            )
            audio = resampler(audio)

        max_val = audio.abs().max()
        if max_val > 0:
            audio = audio / max_val

        return audio.squeeze(0)

    def extract_embedding(self, audio: torch.Tensor) -> np.ndarray:
        """
        Extract L2-normalized ECAPA-TDNN embedding from a preprocessed audio tensor.
        Returns L2-normalized embedding of shape (192,)
        """
        self._load_model()

        with torch.no_grad():
            audio_batch = audio.unsqueeze(0).to(self.device)
            lengths = torch.tensor([1.0]).to(self.device)
            embedding = self._model.encode_batch(audio_batch, lengths)
            embedding = embedding.squeeze().cpu().numpy()

        norm = np.linalg.norm(embedding)
        if norm > 0:
            embedding = embedding / norm

        return embedding

    def extract_embeddings_from_segments(
        self,
        audio: torch.Tensor,
        sample_rate: int,
        segments: List[Tuple[float, float]],
        min_duration: float = 0.5,
    ) -> Tuple[np.ndarray, List[Tuple[float, float]]]:
        """Extract embeddings for a list of (start, end) time segments."""
        processed = self.preprocess_audio(audio, sample_rate)
        embeddings = []
        valid_segments = []

        for start, end in segments:
            duration = end - start
            if duration < min_duration:
                continue

            start_sample = int(start * self.SAMPLE_RATE)
            end_sample = int(end * self.SAMPLE_RATE)
            segment_audio = processed[start_sample:end_sample]

            if segment_audio.shape[0] == 0:
                continue

            emb = self.extract_embedding(segment_audio)
            embeddings.append(emb)
            valid_segments.append((start, end))

        if not embeddings:
            return np.empty((0, self.EMBEDDING_DIM)), []

        return np.stack(embeddings), valid_segments