""" SpeakerProfileManager — multi-user voice identity. SpeechBrain ECAPA-TDNN extracts 192-d embeddings for speaker identification. Each confirmed user gets two files in data/profiles/: user_N_sb.npy — running-average SpeechBrain embedding (identification) user_N_ov.npy — running-average OpenVoice V2 tone-color SE (cloning) user_N_count.txt — number of utterances averaged so far Speaker matching uses cosine similarity. If similarity ≥ COSINE_THRESHOLD the new utterance is attributed to that user and their embedding is updated; otherwise a new profile is created. get_collective_embedding() (Task 2) returns the mean of all stored OV SEs. """ from __future__ import annotations import logging import threading from pathlib import Path from typing import Optional import numpy as np logger = logging.getLogger(__name__) PROFILES_DIR = Path("data/profiles") COSINE_THRESHOLD = 0.75 # empirical threshold for ECAPA-TDNN def _cosine(a: np.ndarray, b: np.ndarray) -> float: denom = np.linalg.norm(a) * np.linalg.norm(b) return float(np.dot(a, b) / denom) if denom > 1e-8 else 0.0 def _running_avg(old: np.ndarray, new: np.ndarray, count: int) -> np.ndarray: """Weighted running average — older observations decay gently.""" alpha = 1.0 / (count + 1) return (1.0 - alpha) * old + alpha * new class SpeakerProfileManager: """Thread-safe multi-user voice profile store backed by .npy files.""" def __init__(self, profiles_dir: Path = PROFILES_DIR) -> None: self._dir = Path(profiles_dir) self._dir.mkdir(parents=True, exist_ok=True) self._lock = threading.Lock() # SpeechBrain state self._sb_model = None self._sb_ready = False self._sb_error: Optional[str] = None # In-memory cache: { "user_0": {"sb": ndarray, "ov": ndarray|None, "count": int} } self._profiles: dict[str, dict] = {} self._load_profiles() # ── SpeechBrain loading ─────────────────────────────────────────────────── def preload(self) -> None: threading.Thread(target=self._load_sb, daemon=True).start() def _load_sb(self) -> None: try: try: from speechbrain.inference.classifiers import EncoderClassifier except ImportError: from speechbrain.pretrained import EncoderClassifier logger.info("SpeakerProfiles: loading SpeechBrain ECAPA-TDNN …") self._sb_model = EncoderClassifier.from_hparams( source="speechbrain/spkrec-ecapa-voxceleb", run_opts={"device": "cpu"}, savedir="data/speechbrain_cache", ) self._sb_ready = True logger.info("SpeakerProfiles: SpeechBrain ready") except Exception as exc: self._sb_error = str(exc) logger.error("SpeakerProfiles: SpeechBrain load failed: %s", exc) def _extract_sb(self, audio_np: np.ndarray) -> Optional[np.ndarray]: """Return 192-d ECAPA embedding, or None if model not ready.""" if not self._sb_ready: self._load_sb() if not self._sb_ready: return None try: import torch wav = torch.tensor(audio_np, dtype=torch.float32).unsqueeze(0) with torch.no_grad(): emb = self._sb_model.encode_batch(wav) # (1, 1, 192) return emb.squeeze().cpu().numpy() except Exception as exc: logger.error("SpeakerProfiles: SpeechBrain inference error: %s", exc) return None # ── Profile I/O ─────────────────────────────────────────────────────────── def _load_profiles(self) -> None: profiles = {} for sb_path in sorted(self._dir.glob("user_*_sb.npy")): uid = sb_path.stem[:-3] # "user_N_sb" → "user_N" ov_path = self._dir / f"{uid}_ov.npy" cnt_path = self._dir / f"{uid}_count.txt" profiles[uid] = { "sb": np.load(sb_path), "ov": np.load(ov_path) if ov_path.exists() else None, "count": int(cnt_path.read_text()) if cnt_path.exists() else 1, } with self._lock: self._profiles = profiles logger.info("SpeakerProfiles: loaded %d profile(s)", len(profiles)) def _save_profile(self, uid: str) -> None: p = self._profiles[uid] np.save(self._dir / f"{uid}_sb.npy", p["sb"]) if p["ov"] is not None: np.save(self._dir / f"{uid}_ov.npy", p["ov"]) (self._dir / f"{uid}_count.txt").write_text(str(p["count"])) # ── Task 1: Speaker identification ──────────────────────────────────────── def identify_or_create( self, audio_np: np.ndarray ) -> tuple[Optional[str], Optional[np.ndarray]]: """ Extract a SpeechBrain embedding and match it to an existing profile (cosine similarity ≥ threshold) or create a new one. Returns (user_id, sb_embedding). Returns (None, None) if SpeechBrain is not available. """ sb_emb = self._extract_sb(audio_np) if sb_emb is None: return None, None with self._lock: best_uid, best_sim = None, -1.0 for uid, profile in self._profiles.items(): sim = _cosine(sb_emb, profile["sb"]) if sim > best_sim: best_sim, best_uid = sim, uid if best_uid is not None and best_sim >= COSINE_THRESHOLD: # Known speaker — update running average p = self._profiles[best_uid] new_count = p["count"] + 1 p["sb"] = _running_avg(p["sb"], sb_emb, p["count"]) p["count"] = new_count uid = best_uid logger.debug( "SpeakerProfiles: recognised %s (sim=%.3f, n=%d)", uid, best_sim, new_count, ) else: # New speaker uid = f"user_{len(self._profiles)}" self._profiles[uid] = {"sb": sb_emb, "ov": None, "count": 1} logger.info("SpeakerProfiles: new profile → %s", uid) self._save_profile(uid) return uid, sb_emb # ── OpenVoice SE management ─────────────────────────────────────────────── def update_ov_embedding(self, uid: str, ov_emb: np.ndarray) -> None: """Store or running-average the OpenVoice tone-color SE for a user.""" with self._lock: if uid not in self._profiles: return p = self._profiles[uid] if p["ov"] is None: p["ov"] = ov_emb.copy() else: p["ov"] = _running_avg(p["ov"], ov_emb, p["count"]) self._save_profile(uid) def get_openvoice_se(self, uid: str) -> Optional[np.ndarray]: """Return the stored OpenVoice SE for this user, or None.""" with self._lock: p = self._profiles.get(uid) return p["ov"].copy() if p and p["ov"] is not None else None # ── Task 2: Collective Voice ────────────────────────────────────────────── def get_collective_embedding(self) -> Optional[np.ndarray]: """ Load all user_N_ov.npy files, return the mean vector. This is the "Median Embedding" that represents all known speakers. Returns None if no OpenVoice SEs have been collected yet. """ # Prefer in-memory cache with self._lock: ov_list = [p["ov"] for p in self._profiles.values() if p["ov"] is not None] if not ov_list: # Fall back to disk scan (e.g. after a restart that didn't re-identify) ov_list = [np.load(p) for p in sorted(self._dir.glob("user_*_ov.npy"))] if not ov_list: return None stacked = np.stack(ov_list, axis=0) return stacked.mean(axis=0) # ── Status ──────────────────────────────────────────────────────────────── def get_status(self) -> str: n = len(self._profiles) sb = "🟢" if self._sb_ready else ("🔴" if self._sb_error else "🟡") return f"{sb} SpeechBrain | {n} speaker profile{'s' if n != 1 else ''}"