ground-zero / src /voice /speaker_profiles.py
jefffffff9
Add multi-user speaker profiles, collective voice, and mode toggle
49910a9
Raw
History Blame Contribute Delete
8.97 kB
"""
SpeakerProfileManager — multi-user voice identity.
SpeechBrain ECAPA-TDNN extracts 192-d embeddings for speaker identification.
Each confirmed user gets two files in data/profiles/:
user_N_sb.npy — running-average SpeechBrain embedding (identification)
user_N_ov.npy — running-average OpenVoice V2 tone-color SE (cloning)
user_N_count.txt — number of utterances averaged so far
Speaker matching uses cosine similarity. If similarity ≥ COSINE_THRESHOLD the
new utterance is attributed to that user and their embedding is updated;
otherwise a new profile is created.
get_collective_embedding() (Task 2) returns the mean of all stored OV SEs.
"""
from __future__ import annotations
import logging
import threading
from pathlib import Path
from typing import Optional
import numpy as np
logger = logging.getLogger(__name__)
PROFILES_DIR = Path("data/profiles")
COSINE_THRESHOLD = 0.75 # empirical threshold for ECAPA-TDNN
def _cosine(a: np.ndarray, b: np.ndarray) -> float:
denom = np.linalg.norm(a) * np.linalg.norm(b)
return float(np.dot(a, b) / denom) if denom > 1e-8 else 0.0
def _running_avg(old: np.ndarray, new: np.ndarray, count: int) -> np.ndarray:
"""Weighted running average — older observations decay gently."""
alpha = 1.0 / (count + 1)
return (1.0 - alpha) * old + alpha * new
class SpeakerProfileManager:
"""Thread-safe multi-user voice profile store backed by .npy files."""
def __init__(self, profiles_dir: Path = PROFILES_DIR) -> None:
self._dir = Path(profiles_dir)
self._dir.mkdir(parents=True, exist_ok=True)
self._lock = threading.Lock()
# SpeechBrain state
self._sb_model = None
self._sb_ready = False
self._sb_error: Optional[str] = None
# In-memory cache: { "user_0": {"sb": ndarray, "ov": ndarray|None, "count": int} }
self._profiles: dict[str, dict] = {}
self._load_profiles()
# ── SpeechBrain loading ───────────────────────────────────────────────────
def preload(self) -> None:
threading.Thread(target=self._load_sb, daemon=True).start()
def _load_sb(self) -> None:
try:
try:
from speechbrain.inference.classifiers import EncoderClassifier
except ImportError:
from speechbrain.pretrained import EncoderClassifier
logger.info("SpeakerProfiles: loading SpeechBrain ECAPA-TDNN …")
self._sb_model = EncoderClassifier.from_hparams(
source="speechbrain/spkrec-ecapa-voxceleb",
run_opts={"device": "cpu"},
savedir="data/speechbrain_cache",
)
self._sb_ready = True
logger.info("SpeakerProfiles: SpeechBrain ready")
except Exception as exc:
self._sb_error = str(exc)
logger.error("SpeakerProfiles: SpeechBrain load failed: %s", exc)
def _extract_sb(self, audio_np: np.ndarray) -> Optional[np.ndarray]:
"""Return 192-d ECAPA embedding, or None if model not ready."""
if not self._sb_ready:
self._load_sb()
if not self._sb_ready:
return None
try:
import torch
wav = torch.tensor(audio_np, dtype=torch.float32).unsqueeze(0)
with torch.no_grad():
emb = self._sb_model.encode_batch(wav) # (1, 1, 192)
return emb.squeeze().cpu().numpy()
except Exception as exc:
logger.error("SpeakerProfiles: SpeechBrain inference error: %s", exc)
return None
# ── Profile I/O ───────────────────────────────────────────────────────────
def _load_profiles(self) -> None:
profiles = {}
for sb_path in sorted(self._dir.glob("user_*_sb.npy")):
uid = sb_path.stem[:-3] # "user_N_sb" → "user_N"
ov_path = self._dir / f"{uid}_ov.npy"
cnt_path = self._dir / f"{uid}_count.txt"
profiles[uid] = {
"sb": np.load(sb_path),
"ov": np.load(ov_path) if ov_path.exists() else None,
"count": int(cnt_path.read_text()) if cnt_path.exists() else 1,
}
with self._lock:
self._profiles = profiles
logger.info("SpeakerProfiles: loaded %d profile(s)", len(profiles))
def _save_profile(self, uid: str) -> None:
p = self._profiles[uid]
np.save(self._dir / f"{uid}_sb.npy", p["sb"])
if p["ov"] is not None:
np.save(self._dir / f"{uid}_ov.npy", p["ov"])
(self._dir / f"{uid}_count.txt").write_text(str(p["count"]))
# ── Task 1: Speaker identification ────────────────────────────────────────
def identify_or_create(
self, audio_np: np.ndarray
) -> tuple[Optional[str], Optional[np.ndarray]]:
"""
Extract a SpeechBrain embedding and match it to an existing profile
(cosine similarity ≥ threshold) or create a new one.
Returns (user_id, sb_embedding).
Returns (None, None) if SpeechBrain is not available.
"""
sb_emb = self._extract_sb(audio_np)
if sb_emb is None:
return None, None
with self._lock:
best_uid, best_sim = None, -1.0
for uid, profile in self._profiles.items():
sim = _cosine(sb_emb, profile["sb"])
if sim > best_sim:
best_sim, best_uid = sim, uid
if best_uid is not None and best_sim >= COSINE_THRESHOLD:
# Known speaker — update running average
p = self._profiles[best_uid]
new_count = p["count"] + 1
p["sb"] = _running_avg(p["sb"], sb_emb, p["count"])
p["count"] = new_count
uid = best_uid
logger.debug(
"SpeakerProfiles: recognised %s (sim=%.3f, n=%d)",
uid, best_sim, new_count,
)
else:
# New speaker
uid = f"user_{len(self._profiles)}"
self._profiles[uid] = {"sb": sb_emb, "ov": None, "count": 1}
logger.info("SpeakerProfiles: new profile → %s", uid)
self._save_profile(uid)
return uid, sb_emb
# ── OpenVoice SE management ───────────────────────────────────────────────
def update_ov_embedding(self, uid: str, ov_emb: np.ndarray) -> None:
"""Store or running-average the OpenVoice tone-color SE for a user."""
with self._lock:
if uid not in self._profiles:
return
p = self._profiles[uid]
if p["ov"] is None:
p["ov"] = ov_emb.copy()
else:
p["ov"] = _running_avg(p["ov"], ov_emb, p["count"])
self._save_profile(uid)
def get_openvoice_se(self, uid: str) -> Optional[np.ndarray]:
"""Return the stored OpenVoice SE for this user, or None."""
with self._lock:
p = self._profiles.get(uid)
return p["ov"].copy() if p and p["ov"] is not None else None
# ── Task 2: Collective Voice ──────────────────────────────────────────────
def get_collective_embedding(self) -> Optional[np.ndarray]:
"""
Load all user_N_ov.npy files, return the mean vector.
This is the "Median Embedding" that represents all known speakers.
Returns None if no OpenVoice SEs have been collected yet.
"""
# Prefer in-memory cache
with self._lock:
ov_list = [p["ov"] for p in self._profiles.values() if p["ov"] is not None]
if not ov_list:
# Fall back to disk scan (e.g. after a restart that didn't re-identify)
ov_list = [np.load(p) for p in sorted(self._dir.glob("user_*_ov.npy"))]
if not ov_list:
return None
stacked = np.stack(ov_list, axis=0)
return stacked.mean(axis=0)
# ── Status ────────────────────────────────────────────────────────────────
def get_status(self) -> str:
n = len(self._profiles)
sb = "🟢" if self._sb_ready else ("🔴" if self._sb_error else "🟡")
return f"{sb} SpeechBrain | {n} speaker profile{'s' if n != 1 else ''}"