| |
| """Voice-Taxonomy-57: Classify 57 voice dimensions from audio. |
| |
| Pipeline: |
| Audio → 16kHz mono → WhisperFeatureExtractor → mel spectrograms |
| → BUD-E-Whisper V1.0 encoder → [B, 1500, 768] |
| → BUD-E-Whisper V1.1 encoder → [B, 1500, 768] |
| → Duration-aware truncation → split first/second half |
| → Mean pool each half → concat 4×768 → [B, 3072] |
| → PCA(96) per dimension → 57 MLP classifiers → 57 predictions (0–6) |
| |
| Usage: |
| # CLI |
| python inference.py --input audio_folder/ --output results.json --batch-size 16 |
| python inference.py --input file.wav --output results.json |
| |
| # Python API |
| from inference import VoiceTaxonomy57 |
| model = VoiceTaxonomy57.from_pretrained("laion/Voice-Taxonomy-57") |
| results = model.predict("audio.wav") |
| results = model.predict(["a.wav", "b.mp3", "c.flac"]) |
| """ |
|
|
| import argparse |
| import json |
| import os |
| import pickle |
| import subprocess |
| import sys |
| import time |
| import warnings |
| from pathlib import Path |
| from typing import Dict, List, Optional, Union |
|
|
| import numpy as np |
| import torch |
| import torch.nn as nn |
|
|
| warnings.filterwarnings("ignore") |
|
|
|
|
| |
| |
| |
| class MLPClassifier(nn.Module): |
| def __init__(self, in_dim: int, hidden_dim: int, n_classes: int): |
| super().__init__() |
| self.net = nn.Sequential( |
| nn.Linear(in_dim, hidden_dim), |
| nn.ReLU(), |
| nn.Linear(hidden_dim, n_classes), |
| ) |
|
|
| def forward(self, x): |
| return self.net(x) |
|
|
|
|
| |
| |
| |
| def load_audio_ffmpeg(path: str, sr: int = 16000) -> Optional[np.ndarray]: |
| """Load audio file to mono float32 numpy array using ffmpeg.""" |
| try: |
| result = subprocess.run( |
| [ |
| "ffmpeg", "-i", str(path), |
| "-f", "f32le", "-acodec", "pcm_f32le", |
| "-ar", str(sr), "-ac", "1", "pipe:1", |
| ], |
| capture_output=True, timeout=30, |
| ) |
| if result.returncode == 0 and result.stdout: |
| return np.frombuffer(result.stdout, dtype=np.float32) |
| except Exception: |
| pass |
| return None |
|
|
|
|
| def load_audio(path: str, sr: int = 16000) -> Optional[np.ndarray]: |
| """Load audio file, trying ffmpeg first, then librosa as fallback.""" |
| waveform = load_audio_ffmpeg(path, sr) |
| if waveform is not None: |
| return waveform |
| try: |
| import librosa |
| waveform, _ = librosa.load(path, sr=sr, mono=True) |
| return waveform |
| except Exception: |
| return None |
|
|
|
|
| |
| |
| |
| class VoiceTaxonomy57: |
| """57-dimension voice taxonomy classifier. |
| |
| Uses BUD-E-Whisper V1.0 + V1.1 encoders with per-dimension PCA + MLP. |
| """ |
|
|
| def __init__( |
| self, |
| classifiers: dict, |
| tags_short: dict, |
| tags_sentences: dict, |
| dimensions: dict, |
| config: dict, |
| device: str = "cuda:0", |
| dtype: torch.dtype = torch.float16, |
| ): |
| self.classifiers = classifiers |
| self.tags_short = tags_short |
| self.tags_sentences = tags_sentences |
| self.dimensions = dimensions |
| self.config = config |
| self.device = device if torch.cuda.is_available() else "cpu" |
| self.dtype = dtype if self.device != "cpu" else torch.float32 |
| self.sample_rate = config.get("sample_rate", 16000) |
| self.max_audio_seconds = config.get("max_audio_seconds", 30) |
| self.dim_names = sorted(classifiers.keys()) |
|
|
| |
| self._v10_encoder = None |
| self._v11_encoder = None |
| self._feature_extractor = None |
|
|
| @classmethod |
| def from_pretrained( |
| cls, |
| path: str, |
| device: str = "cuda:0", |
| dtype: torch.dtype = torch.float16, |
| load_encoders: bool = True, |
| ) -> "VoiceTaxonomy57": |
| """Load from a local directory or HuggingFace repo. |
| |
| Args: |
| path: Local directory or HuggingFace model ID (e.g. "laion/Voice-Taxonomy-57") |
| device: Device for encoder inference ("cuda:0", "cpu", etc.) |
| dtype: Encoder dtype (torch.float16 for GPU, torch.float32 for CPU) |
| load_encoders: If True, load Whisper encoders immediately. |
| If False, defer loading until first predict() call. |
| """ |
| |
| if os.path.isdir(path): |
| model_dir = path |
| else: |
| from huggingface_hub import snapshot_download |
| model_dir = snapshot_download(repo_id=path) |
|
|
| |
| clf_path = os.path.join(model_dir, "taxonomy_classifiers.pkl") |
| with open(clf_path, "rb") as f: |
| raw = pickle.load(f) |
|
|
| |
| classifiers = {} |
| for dim, data in raw.items(): |
| dim = str(dim) |
| mc = data["model_config"] |
| model = MLPClassifier(mc["in_dim"], mc["hidden_dim"], mc["n_classes"]) |
| model.load_state_dict(data["model_state"]) |
| model.eval() |
|
|
| classifiers[dim] = { |
| "model": model, |
| "pca_components": data["pca_components"], |
| "pca_mean": data["pca_mean"], |
| "label_to_val": {int(k): int(v) for k, v in data["label_to_val"].items()}, |
| "unique_vals": [int(v) for v in data["unique_vals"]], |
| "n_classes": mc["n_classes"], |
| "acc": float(data["acc"]), |
| "adj1": float(data["adj1"]), |
| } |
|
|
| |
| with open(os.path.join(model_dir, "taxonomy_tags_short.json")) as f: |
| tags_short = json.load(f) |
| with open(os.path.join(model_dir, "taxonomy_tags_sentences.json")) as f: |
| tags_sentences = json.load(f) |
| with open(os.path.join(model_dir, "taxonomy_dimensions.json")) as f: |
| dimensions = json.load(f) |
| with open(os.path.join(model_dir, "config.json")) as f: |
| config = json.load(f) |
|
|
| instance = cls( |
| classifiers=classifiers, |
| tags_short=tags_short, |
| tags_sentences=tags_sentences, |
| dimensions=dimensions, |
| config=config, |
| device=device, |
| dtype=dtype, |
| ) |
|
|
| if load_encoders: |
| instance._load_encoders() |
|
|
| return instance |
|
|
| def _load_encoders(self): |
| """Load V1.0 and V1.1 Whisper encoders.""" |
| from transformers import WhisperModel, WhisperFeatureExtractor |
|
|
| v10_name = self.config["whisper_models"]["v10"] |
| v11_name = self.config["whisper_models"]["v11"] |
|
|
| print(f"Loading {v10_name}...") |
| m10 = WhisperModel.from_pretrained(v10_name, torch_dtype=self.dtype) |
| self._v10_encoder = m10.encoder.to(self.device).eval() |
| del m10 |
|
|
| print(f"Loading {v11_name}...") |
| m11 = WhisperModel.from_pretrained(v11_name, torch_dtype=self.dtype) |
| self._v11_encoder = m11.encoder.to(self.device).eval() |
| del m11 |
|
|
| try: |
| self._feature_extractor = WhisperFeatureExtractor.from_pretrained(v10_name) |
| except OSError: |
| self._feature_extractor = WhisperFeatureExtractor.from_pretrained( |
| "openai/whisper-small" |
| ) |
| torch.cuda.empty_cache() if self.device != "cpu" else None |
| print("Encoders loaded.") |
|
|
| def _ensure_encoders(self): |
| if self._v10_encoder is None: |
| self._load_encoders() |
|
|
| def _extract_features_batch( |
| self, |
| waveforms: List[np.ndarray], |
| durations: List[float], |
| ) -> np.ndarray: |
| """Extract 3072-dim features from a batch of waveforms. |
| |
| Returns: |
| np.ndarray of shape (B, 3072) |
| """ |
| self._ensure_encoders() |
| B = len(waveforms) |
| max_samples = self.sample_rate * self.max_audio_seconds |
|
|
| |
| padded = [] |
| for wf in waveforms: |
| wf64 = wf.astype(np.float64) |
| if len(wf64) < max_samples: |
| wf64 = np.pad(wf64, (0, max_samples - len(wf64))) |
| else: |
| wf64 = wf64[:max_samples] |
| padded.append(wf64) |
|
|
| |
| inputs = self._feature_extractor( |
| padded, sampling_rate=self.sample_rate, return_tensors="pt" |
| ) |
| mel = inputs.input_features.to(self.device, dtype=self.dtype) |
|
|
| |
| features = np.zeros((B, 3072), dtype=np.float32) |
| with torch.no_grad(): |
| |
| out_v11 = self._v11_encoder(mel).last_hidden_state |
| |
| out_v10 = self._v10_encoder(mel).last_hidden_state |
|
|
| for j in range(B): |
| dur = durations[j] |
| n_frames = max(min(int(dur * 50), 1500), 2) |
|
|
| |
| h11 = out_v11[j, :n_frames] |
| mid11 = n_frames // 2 |
| v11_first = h11[:mid11].mean(dim=0).cpu().float().numpy() |
| v11_second = h11[mid11:].mean(dim=0).cpu().float().numpy() |
|
|
| |
| h10 = out_v10[j, :n_frames] |
| mid10 = n_frames // 2 |
| v10_first = h10[:mid10].mean(dim=0).cpu().float().numpy() |
| v10_second = h10[mid10:].mean(dim=0).cpu().float().numpy() |
|
|
| |
| features[j] = np.concatenate([v11_first, v11_second, v10_first, v10_second]) |
|
|
| return features |
|
|
| def _classify_batch(self, features: np.ndarray) -> List[Dict]: |
| """Run 57 MLP classifiers on extracted features. |
| |
| Args: |
| features: (B, 3072) feature array |
| |
| Returns: |
| List of B dicts, each mapping dim_name → {value, confidence, tag_short, tag_sentence} |
| """ |
| B = features.shape[0] |
| results = [{} for _ in range(B)] |
|
|
| for dim in self.dim_names: |
| clf = self.classifiers[dim] |
|
|
| |
| centered = features - clf["pca_mean"] |
| projected = centered @ clf["pca_components"].T |
|
|
| |
| xt = torch.tensor(projected, dtype=torch.float32) |
| with torch.no_grad(): |
| logits = clf["model"](xt) |
| pred_labels = logits.argmax(dim=1) |
| probs = torch.softmax(logits, dim=1) |
|
|
| for j in range(B): |
| label = pred_labels[j].item() |
| conf = probs[j, label].item() |
| val = clf["label_to_val"][label] |
|
|
| tag_short = self.tags_short.get(dim, {}).get(str(val), "") |
| tag_sent = self.tags_sentences.get(dim, {}).get(str(val), "") |
|
|
| results[j][dim] = { |
| "value": val, |
| "confidence": round(conf, 3), |
| "tag_short": tag_short, |
| "tag_sentence": tag_sent, |
| } |
|
|
| return results |
|
|
| def predict( |
| self, |
| audio: Union[str, List[str], np.ndarray, List[np.ndarray]], |
| batch_size: int = 16, |
| ) -> Union[Dict, List[Dict]]: |
| """Predict 57 voice taxonomy dimensions for audio file(s). |
| |
| Args: |
| audio: Path(s) to audio file(s) or numpy waveform(s) at 16kHz mono. |
| batch_size: Batch size for encoder inference. |
| |
| Returns: |
| Single dict or list of dicts with predictions per dimension. |
| """ |
| single = isinstance(audio, (str, Path, np.ndarray)) |
| if single: |
| audio = [audio] |
|
|
| |
| waveforms = [] |
| durations = [] |
| valid_mask = [] |
| for item in audio: |
| if isinstance(item, (str, Path)): |
| wf = load_audio(str(item), sr=self.sample_rate) |
| else: |
| wf = item |
| if wf is not None and len(wf) >= int(self.sample_rate * 0.1): |
| waveforms.append(wf) |
| durations.append(len(wf) / self.sample_rate) |
| valid_mask.append(True) |
| else: |
| waveforms.append(None) |
| durations.append(0.0) |
| valid_mask.append(False) |
|
|
| |
| all_results = [None] * len(audio) |
| valid_indices = [i for i, ok in enumerate(valid_mask) if ok] |
| valid_waveforms = [waveforms[i] for i in valid_indices] |
| valid_durations = [durations[i] for i in valid_indices] |
|
|
| for start in range(0, len(valid_waveforms), batch_size): |
| end = min(start + batch_size, len(valid_waveforms)) |
| batch_wf = valid_waveforms[start:end] |
| batch_dur = valid_durations[start:end] |
|
|
| features = self._extract_features_batch(batch_wf, batch_dur) |
| batch_results = self._classify_batch(features) |
|
|
| for j, res in enumerate(batch_results): |
| orig_idx = valid_indices[start + j] |
| all_results[orig_idx] = res |
|
|
| |
| for i in range(len(all_results)): |
| if all_results[i] is None: |
| all_results[i] = {"error": "Failed to load or process audio"} |
|
|
| if single: |
| return all_results[0] |
| return all_results |
|
|
| def predict_from_encoder_outputs( |
| self, |
| v10_hidden_states: torch.Tensor, |
| v11_hidden_states: torch.Tensor, |
| durations: List[float], |
| ) -> List[Dict]: |
| """Predict from pre-computed encoder outputs (for integration with Empathic-Insight-Voice-Plus). |
| |
| This avoids re-running V1.0 encoder when used alongside the emotion pipeline. |
| |
| Args: |
| v10_hidden_states: (B, 1500, 768) from BUD-E-Whisper V1.0 encoder |
| v11_hidden_states: (B, 1500, 768) from BUD-E-Whisper V1.1 encoder |
| durations: List of audio durations in seconds |
| |
| Returns: |
| List of dicts with predictions per dimension. |
| """ |
| B = v10_hidden_states.shape[0] |
| features = np.zeros((B, 3072), dtype=np.float32) |
|
|
| for j in range(B): |
| dur = durations[j] |
| n_frames = max(min(int(dur * 50), 1500), 2) |
|
|
| h11 = v11_hidden_states[j, :n_frames] |
| mid = n_frames // 2 |
| v11_first = h11[:mid].mean(dim=0).cpu().float().numpy() |
| v11_second = h11[mid:].mean(dim=0).cpu().float().numpy() |
|
|
| h10 = v10_hidden_states[j, :n_frames] |
| v10_first = h10[:mid].mean(dim=0).cpu().float().numpy() |
| v10_second = h10[mid:].mean(dim=0).cpu().float().numpy() |
|
|
| features[j] = np.concatenate([v11_first, v11_second, v10_first, v10_second]) |
|
|
| return self._classify_batch(features) |
|
|
| def format_tags( |
| self, result: Dict, format: str = "short" |
| ) -> str: |
| """Format prediction results as a comma-separated tag string. |
| |
| Args: |
| result: Single prediction dict from predict(). |
| format: "short" for 2-3 word tags, "sentences" for descriptive sentences. |
| """ |
| if "error" in result: |
| return "[error]" |
| parts = [] |
| key = "tag_short" if format == "short" else "tag_sentence" |
| sep = ", " if format == "short" else ". " |
| for dim in self.dim_names: |
| if dim in result and result[dim].get(key): |
| parts.append(result[dim][key]) |
| return sep.join(parts) |
|
|
| def get_accuracy_table(self) -> List[Dict]: |
| """Return accuracy stats for all 57 dimensions.""" |
| rows = [] |
| for dim in self.dim_names: |
| clf = self.classifiers[dim] |
| tier = "A" if clf["adj1"] >= 0.85 else ("B" if clf["adj1"] >= 0.7 else ("C" if clf["adj1"] >= 0.55 else "D")) |
| rows.append({ |
| "dim": dim, |
| "n_classes": clf["n_classes"], |
| "exact_acc": round(clf["acc"], 3), |
| "adj1_acc": round(clf["adj1"], 3), |
| "tier": tier, |
| }) |
| return rows |
|
|
|
|
| |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser( |
| description="Voice-Taxonomy-57: Classify 57 voice dimensions from audio" |
| ) |
| parser.add_argument( |
| "--input", "-i", required=True, |
| help="Audio file or directory of audio files" |
| ) |
| parser.add_argument( |
| "--output", "-o", default="results.json", |
| help="Output JSON file (default: results.json)" |
| ) |
| parser.add_argument( |
| "--model-path", "-m", default=".", |
| help="Path to model directory or HuggingFace repo ID (default: current dir)" |
| ) |
| parser.add_argument( |
| "--batch-size", "-b", type=int, default=16, |
| help="Batch size for encoder inference (default: 16)" |
| ) |
| parser.add_argument( |
| "--device", "-d", default="cuda:0", |
| help="Device (default: cuda:0)" |
| ) |
| parser.add_argument( |
| "--fp32", action="store_true", |
| help="Use fp32 instead of fp16 for inference" |
| ) |
| parser.add_argument( |
| "--format", choices=["json", "tags-short", "tags-sentences"], default="json", |
| help="Output format (default: json)" |
| ) |
| args = parser.parse_args() |
|
|
| dtype = torch.float32 if args.fp32 else torch.float16 |
| print(f"Loading model from {args.model_path}...") |
| t0 = time.time() |
| model = VoiceTaxonomy57.from_pretrained(args.model_path, device=args.device, dtype=dtype) |
| print(f"Model loaded in {time.time() - t0:.1f}s") |
|
|
| |
| input_path = Path(args.input) |
| audio_extensions = {".wav", ".mp3", ".flac", ".ogg", ".m4a", ".opus", ".wma", ".aac"} |
| if input_path.is_file(): |
| files = [str(input_path)] |
| elif input_path.is_dir(): |
| files = sorted( |
| str(p) for p in input_path.rglob("*") |
| if p.suffix.lower() in audio_extensions |
| ) |
| else: |
| print(f"Error: {args.input} not found") |
| sys.exit(1) |
|
|
| if not files: |
| print("No audio files found.") |
| sys.exit(1) |
|
|
| print(f"Processing {len(files)} audio file(s) with batch_size={args.batch_size}...") |
| t1 = time.time() |
| results = model.predict(files, batch_size=args.batch_size) |
| elapsed = time.time() - t1 |
|
|
| if isinstance(results, dict): |
| results = [results] |
|
|
| print(f"Inference complete in {elapsed:.1f}s ({elapsed / len(files):.2f}s/file)") |
|
|
| if args.format == "json": |
| output = [] |
| for path, res in zip(files, results): |
| output.append({ |
| "file": os.path.basename(path), |
| "path": path, |
| "predictions": res, |
| }) |
| with open(args.output, "w") as f: |
| json.dump(output, f, indent=2) |
| print(f"Results saved to {args.output}") |
| else: |
| fmt = "short" if args.format == "tags-short" else "sentences" |
| for path, res in zip(files, results): |
| tags = model.format_tags(res, format=fmt) |
| print(f"\n--- {os.path.basename(path)} ---") |
| print(tags) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|