Spaces:
Sleeping
Sleeping
| """ | |
| SpeakerProfileManager — multi-user voice identity. | |
| SpeechBrain ECAPA-TDNN extracts 192-d embeddings for speaker identification. | |
| Each confirmed user gets two files in data/profiles/: | |
| user_N_sb.npy — running-average SpeechBrain embedding (identification) | |
| user_N_ov.npy — running-average OpenVoice V2 tone-color SE (cloning) | |
| user_N_count.txt — number of utterances averaged so far | |
| Speaker matching uses cosine similarity. If similarity ≥ COSINE_THRESHOLD the | |
| new utterance is attributed to that user and their embedding is updated; | |
| otherwise a new profile is created. | |
| get_collective_embedding() (Task 2) returns the mean of all stored OV SEs. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import threading | |
| from pathlib import Path | |
| from typing import Optional | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| PROFILES_DIR = Path("data/profiles") | |
| COSINE_THRESHOLD = 0.75 # empirical threshold for ECAPA-TDNN | |
| def _cosine(a: np.ndarray, b: np.ndarray) -> float: | |
| denom = np.linalg.norm(a) * np.linalg.norm(b) | |
| return float(np.dot(a, b) / denom) if denom > 1e-8 else 0.0 | |
| def _running_avg(old: np.ndarray, new: np.ndarray, count: int) -> np.ndarray: | |
| """Weighted running average — older observations decay gently.""" | |
| alpha = 1.0 / (count + 1) | |
| return (1.0 - alpha) * old + alpha * new | |
| class SpeakerProfileManager: | |
| """Thread-safe multi-user voice profile store backed by .npy files.""" | |
| def __init__(self, profiles_dir: Path = PROFILES_DIR) -> None: | |
| self._dir = Path(profiles_dir) | |
| self._dir.mkdir(parents=True, exist_ok=True) | |
| self._lock = threading.Lock() | |
| # SpeechBrain state | |
| self._sb_model = None | |
| self._sb_ready = False | |
| self._sb_error: Optional[str] = None | |
| # In-memory cache: { "user_0": {"sb": ndarray, "ov": ndarray|None, "count": int} } | |
| self._profiles: dict[str, dict] = {} | |
| self._load_profiles() | |
| # ── SpeechBrain loading ─────────────────────────────────────────────────── | |
| def preload(self) -> None: | |
| threading.Thread(target=self._load_sb, daemon=True).start() | |
| def _load_sb(self) -> None: | |
| try: | |
| try: | |
| from speechbrain.inference.classifiers import EncoderClassifier | |
| except ImportError: | |
| from speechbrain.pretrained import EncoderClassifier | |
| logger.info("SpeakerProfiles: loading SpeechBrain ECAPA-TDNN …") | |
| self._sb_model = EncoderClassifier.from_hparams( | |
| source="speechbrain/spkrec-ecapa-voxceleb", | |
| run_opts={"device": "cpu"}, | |
| savedir="data/speechbrain_cache", | |
| ) | |
| self._sb_ready = True | |
| logger.info("SpeakerProfiles: SpeechBrain ready") | |
| except Exception as exc: | |
| self._sb_error = str(exc) | |
| logger.error("SpeakerProfiles: SpeechBrain load failed: %s", exc) | |
| def _extract_sb(self, audio_np: np.ndarray) -> Optional[np.ndarray]: | |
| """Return 192-d ECAPA embedding, or None if model not ready.""" | |
| if not self._sb_ready: | |
| self._load_sb() | |
| if not self._sb_ready: | |
| return None | |
| try: | |
| import torch | |
| wav = torch.tensor(audio_np, dtype=torch.float32).unsqueeze(0) | |
| with torch.no_grad(): | |
| emb = self._sb_model.encode_batch(wav) # (1, 1, 192) | |
| return emb.squeeze().cpu().numpy() | |
| except Exception as exc: | |
| logger.error("SpeakerProfiles: SpeechBrain inference error: %s", exc) | |
| return None | |
| # ── Profile I/O ─────────────────────────────────────────────────────────── | |
| def _load_profiles(self) -> None: | |
| profiles = {} | |
| for sb_path in sorted(self._dir.glob("user_*_sb.npy")): | |
| uid = sb_path.stem[:-3] # "user_N_sb" → "user_N" | |
| ov_path = self._dir / f"{uid}_ov.npy" | |
| cnt_path = self._dir / f"{uid}_count.txt" | |
| profiles[uid] = { | |
| "sb": np.load(sb_path), | |
| "ov": np.load(ov_path) if ov_path.exists() else None, | |
| "count": int(cnt_path.read_text()) if cnt_path.exists() else 1, | |
| } | |
| with self._lock: | |
| self._profiles = profiles | |
| logger.info("SpeakerProfiles: loaded %d profile(s)", len(profiles)) | |
| def _save_profile(self, uid: str) -> None: | |
| p = self._profiles[uid] | |
| np.save(self._dir / f"{uid}_sb.npy", p["sb"]) | |
| if p["ov"] is not None: | |
| np.save(self._dir / f"{uid}_ov.npy", p["ov"]) | |
| (self._dir / f"{uid}_count.txt").write_text(str(p["count"])) | |
| # ── Task 1: Speaker identification ──────────────────────────────────────── | |
| def identify_or_create( | |
| self, audio_np: np.ndarray | |
| ) -> tuple[Optional[str], Optional[np.ndarray]]: | |
| """ | |
| Extract a SpeechBrain embedding and match it to an existing profile | |
| (cosine similarity ≥ threshold) or create a new one. | |
| Returns (user_id, sb_embedding). | |
| Returns (None, None) if SpeechBrain is not available. | |
| """ | |
| sb_emb = self._extract_sb(audio_np) | |
| if sb_emb is None: | |
| return None, None | |
| with self._lock: | |
| best_uid, best_sim = None, -1.0 | |
| for uid, profile in self._profiles.items(): | |
| sim = _cosine(sb_emb, profile["sb"]) | |
| if sim > best_sim: | |
| best_sim, best_uid = sim, uid | |
| if best_uid is not None and best_sim >= COSINE_THRESHOLD: | |
| # Known speaker — update running average | |
| p = self._profiles[best_uid] | |
| new_count = p["count"] + 1 | |
| p["sb"] = _running_avg(p["sb"], sb_emb, p["count"]) | |
| p["count"] = new_count | |
| uid = best_uid | |
| logger.debug( | |
| "SpeakerProfiles: recognised %s (sim=%.3f, n=%d)", | |
| uid, best_sim, new_count, | |
| ) | |
| else: | |
| # New speaker | |
| uid = f"user_{len(self._profiles)}" | |
| self._profiles[uid] = {"sb": sb_emb, "ov": None, "count": 1} | |
| logger.info("SpeakerProfiles: new profile → %s", uid) | |
| self._save_profile(uid) | |
| return uid, sb_emb | |
| # ── OpenVoice SE management ─────────────────────────────────────────────── | |
| def update_ov_embedding(self, uid: str, ov_emb: np.ndarray) -> None: | |
| """Store or running-average the OpenVoice tone-color SE for a user.""" | |
| with self._lock: | |
| if uid not in self._profiles: | |
| return | |
| p = self._profiles[uid] | |
| if p["ov"] is None: | |
| p["ov"] = ov_emb.copy() | |
| else: | |
| p["ov"] = _running_avg(p["ov"], ov_emb, p["count"]) | |
| self._save_profile(uid) | |
| def get_openvoice_se(self, uid: str) -> Optional[np.ndarray]: | |
| """Return the stored OpenVoice SE for this user, or None.""" | |
| with self._lock: | |
| p = self._profiles.get(uid) | |
| return p["ov"].copy() if p and p["ov"] is not None else None | |
| # ── Task 2: Collective Voice ────────────────────────────────────────────── | |
| def get_collective_embedding(self) -> Optional[np.ndarray]: | |
| """ | |
| Load all user_N_ov.npy files, return the mean vector. | |
| This is the "Median Embedding" that represents all known speakers. | |
| Returns None if no OpenVoice SEs have been collected yet. | |
| """ | |
| # Prefer in-memory cache | |
| with self._lock: | |
| ov_list = [p["ov"] for p in self._profiles.values() if p["ov"] is not None] | |
| if not ov_list: | |
| # Fall back to disk scan (e.g. after a restart that didn't re-identify) | |
| ov_list = [np.load(p) for p in sorted(self._dir.glob("user_*_ov.npy"))] | |
| if not ov_list: | |
| return None | |
| stacked = np.stack(ov_list, axis=0) | |
| return stacked.mean(axis=0) | |
| # ── Status ──────────────────────────────────────────────────────────────── | |
| def get_status(self) -> str: | |
| n = len(self._profiles) | |
| sb = "🟢" if self._sb_ready else ("🔴" if self._sb_error else "🟡") | |
| return f"{sb} SpeechBrain | {n} speaker profile{'s' if n != 1 else ''}" | |