Voice-Taxonomy-57 / inference.py
ChristophSchuhmann's picture
Initial release: 57-dimension voice taxonomy classifier
74753b8 verified
#!/usr/bin/env python3
"""Voice-Taxonomy-57: Classify 57 voice dimensions from audio.
Pipeline:
Audio → 16kHz mono → WhisperFeatureExtractor → mel spectrograms
→ BUD-E-Whisper V1.0 encoder → [B, 1500, 768]
→ BUD-E-Whisper V1.1 encoder → [B, 1500, 768]
→ Duration-aware truncation → split first/second half
→ Mean pool each half → concat 4×768 → [B, 3072]
→ PCA(96) per dimension → 57 MLP classifiers → 57 predictions (0–6)
Usage:
# CLI
python inference.py --input audio_folder/ --output results.json --batch-size 16
python inference.py --input file.wav --output results.json
# Python API
from inference import VoiceTaxonomy57
model = VoiceTaxonomy57.from_pretrained("laion/Voice-Taxonomy-57")
results = model.predict("audio.wav")
results = model.predict(["a.wav", "b.mp3", "c.flac"])
"""
import argparse
import json
import os
import pickle
import subprocess
import sys
import time
import warnings
from pathlib import Path
from typing import Dict, List, Optional, Union
import numpy as np
import torch
import torch.nn as nn
warnings.filterwarnings("ignore")
# ============================================================================
# MLP classifier (must match training architecture exactly)
# ============================================================================
class MLPClassifier(nn.Module):
def __init__(self, in_dim: int, hidden_dim: int, n_classes: int):
super().__init__()
self.net = nn.Sequential(
nn.Linear(in_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, n_classes),
)
def forward(self, x):
return self.net(x)
# ============================================================================
# Audio loading utilities
# ============================================================================
def load_audio_ffmpeg(path: str, sr: int = 16000) -> Optional[np.ndarray]:
"""Load audio file to mono float32 numpy array using ffmpeg."""
try:
result = subprocess.run(
[
"ffmpeg", "-i", str(path),
"-f", "f32le", "-acodec", "pcm_f32le",
"-ar", str(sr), "-ac", "1", "pipe:1",
],
capture_output=True, timeout=30,
)
if result.returncode == 0 and result.stdout:
return np.frombuffer(result.stdout, dtype=np.float32)
except Exception:
pass
return None
def load_audio(path: str, sr: int = 16000) -> Optional[np.ndarray]:
"""Load audio file, trying ffmpeg first, then librosa as fallback."""
waveform = load_audio_ffmpeg(path, sr)
if waveform is not None:
return waveform
try:
import librosa
waveform, _ = librosa.load(path, sr=sr, mono=True)
return waveform
except Exception:
return None
# ============================================================================
# Main pipeline class
# ============================================================================
class VoiceTaxonomy57:
"""57-dimension voice taxonomy classifier.
Uses BUD-E-Whisper V1.0 + V1.1 encoders with per-dimension PCA + MLP.
"""
def __init__(
self,
classifiers: dict,
tags_short: dict,
tags_sentences: dict,
dimensions: dict,
config: dict,
device: str = "cuda:0",
dtype: torch.dtype = torch.float16,
):
self.classifiers = classifiers
self.tags_short = tags_short
self.tags_sentences = tags_sentences
self.dimensions = dimensions
self.config = config
self.device = device if torch.cuda.is_available() else "cpu"
self.dtype = dtype if self.device != "cpu" else torch.float32
self.sample_rate = config.get("sample_rate", 16000)
self.max_audio_seconds = config.get("max_audio_seconds", 30)
self.dim_names = sorted(classifiers.keys())
# Lazy-loaded encoders
self._v10_encoder = None
self._v11_encoder = None
self._feature_extractor = None
@classmethod
def from_pretrained(
cls,
path: str,
device: str = "cuda:0",
dtype: torch.dtype = torch.float16,
load_encoders: bool = True,
) -> "VoiceTaxonomy57":
"""Load from a local directory or HuggingFace repo.
Args:
path: Local directory or HuggingFace model ID (e.g. "laion/Voice-Taxonomy-57")
device: Device for encoder inference ("cuda:0", "cpu", etc.)
dtype: Encoder dtype (torch.float16 for GPU, torch.float32 for CPU)
load_encoders: If True, load Whisper encoders immediately.
If False, defer loading until first predict() call.
"""
# Resolve path: local dir or HF download
if os.path.isdir(path):
model_dir = path
else:
from huggingface_hub import snapshot_download
model_dir = snapshot_download(repo_id=path)
# Load classifier weights
clf_path = os.path.join(model_dir, "taxonomy_classifiers.pkl")
with open(clf_path, "rb") as f:
raw = pickle.load(f)
# Reconstruct MLPs
classifiers = {}
for dim, data in raw.items():
dim = str(dim) # numpy strings → Python str
mc = data["model_config"]
model = MLPClassifier(mc["in_dim"], mc["hidden_dim"], mc["n_classes"])
model.load_state_dict(data["model_state"])
model.eval()
classifiers[dim] = {
"model": model,
"pca_components": data["pca_components"], # (96, 3072)
"pca_mean": data["pca_mean"], # (3072,)
"label_to_val": {int(k): int(v) for k, v in data["label_to_val"].items()},
"unique_vals": [int(v) for v in data["unique_vals"]],
"n_classes": mc["n_classes"],
"acc": float(data["acc"]),
"adj1": float(data["adj1"]),
}
# Load metadata
with open(os.path.join(model_dir, "taxonomy_tags_short.json")) as f:
tags_short = json.load(f)
with open(os.path.join(model_dir, "taxonomy_tags_sentences.json")) as f:
tags_sentences = json.load(f)
with open(os.path.join(model_dir, "taxonomy_dimensions.json")) as f:
dimensions = json.load(f)
with open(os.path.join(model_dir, "config.json")) as f:
config = json.load(f)
instance = cls(
classifiers=classifiers,
tags_short=tags_short,
tags_sentences=tags_sentences,
dimensions=dimensions,
config=config,
device=device,
dtype=dtype,
)
if load_encoders:
instance._load_encoders()
return instance
def _load_encoders(self):
"""Load V1.0 and V1.1 Whisper encoders."""
from transformers import WhisperModel, WhisperFeatureExtractor
v10_name = self.config["whisper_models"]["v10"]
v11_name = self.config["whisper_models"]["v11"]
print(f"Loading {v10_name}...")
m10 = WhisperModel.from_pretrained(v10_name, torch_dtype=self.dtype)
self._v10_encoder = m10.encoder.to(self.device).eval()
del m10
print(f"Loading {v11_name}...")
m11 = WhisperModel.from_pretrained(v11_name, torch_dtype=self.dtype)
self._v11_encoder = m11.encoder.to(self.device).eval()
del m11
try:
self._feature_extractor = WhisperFeatureExtractor.from_pretrained(v10_name)
except OSError:
self._feature_extractor = WhisperFeatureExtractor.from_pretrained(
"openai/whisper-small"
)
torch.cuda.empty_cache() if self.device != "cpu" else None
print("Encoders loaded.")
def _ensure_encoders(self):
if self._v10_encoder is None:
self._load_encoders()
def _extract_features_batch(
self,
waveforms: List[np.ndarray],
durations: List[float],
) -> np.ndarray:
"""Extract 3072-dim features from a batch of waveforms.
Returns:
np.ndarray of shape (B, 3072)
"""
self._ensure_encoders()
B = len(waveforms)
max_samples = self.sample_rate * self.max_audio_seconds # 480000
# Pad/truncate to max_audio_seconds
padded = []
for wf in waveforms:
wf64 = wf.astype(np.float64)
if len(wf64) < max_samples:
wf64 = np.pad(wf64, (0, max_samples - len(wf64)))
else:
wf64 = wf64[:max_samples]
padded.append(wf64)
# Compute mel spectrograms
inputs = self._feature_extractor(
padded, sampling_rate=self.sample_rate, return_tensors="pt"
)
mel = inputs.input_features.to(self.device, dtype=self.dtype)
# Run both encoders
features = np.zeros((B, 3072), dtype=np.float32)
with torch.no_grad():
# V1.1 encoder (index 0 in original code = M_V11)
out_v11 = self._v11_encoder(mel).last_hidden_state
# V1.0 encoder (index 3 in original code = M_V10)
out_v10 = self._v10_encoder(mel).last_hidden_state
for j in range(B):
dur = durations[j]
n_frames = max(min(int(dur * 50), 1500), 2)
# V1.1
h11 = out_v11[j, :n_frames]
mid11 = n_frames // 2
v11_first = h11[:mid11].mean(dim=0).cpu().float().numpy() # (768,)
v11_second = h11[mid11:].mean(dim=0).cpu().float().numpy() # (768,)
# V1.0
h10 = out_v10[j, :n_frames]
mid10 = n_frames // 2
v10_first = h10[:mid10].mean(dim=0).cpu().float().numpy() # (768,)
v10_second = h10[mid10:].mean(dim=0).cpu().float().numpy() # (768,)
# Concat: [v11_first, v11_second, v10_first, v10_second]
features[j] = np.concatenate([v11_first, v11_second, v10_first, v10_second])
return features
def _classify_batch(self, features: np.ndarray) -> List[Dict]:
"""Run 57 MLP classifiers on extracted features.
Args:
features: (B, 3072) feature array
Returns:
List of B dicts, each mapping dim_name → {value, confidence, tag_short, tag_sentence}
"""
B = features.shape[0]
results = [{} for _ in range(B)]
for dim in self.dim_names:
clf = self.classifiers[dim]
# PCA projection
centered = features - clf["pca_mean"]
projected = centered @ clf["pca_components"].T # (B, 96)
# MLP forward pass
xt = torch.tensor(projected, dtype=torch.float32)
with torch.no_grad():
logits = clf["model"](xt)
pred_labels = logits.argmax(dim=1)
probs = torch.softmax(logits, dim=1)
for j in range(B):
label = pred_labels[j].item()
conf = probs[j, label].item()
val = clf["label_to_val"][label]
tag_short = self.tags_short.get(dim, {}).get(str(val), "")
tag_sent = self.tags_sentences.get(dim, {}).get(str(val), "")
results[j][dim] = {
"value": val,
"confidence": round(conf, 3),
"tag_short": tag_short,
"tag_sentence": tag_sent,
}
return results
def predict(
self,
audio: Union[str, List[str], np.ndarray, List[np.ndarray]],
batch_size: int = 16,
) -> Union[Dict, List[Dict]]:
"""Predict 57 voice taxonomy dimensions for audio file(s).
Args:
audio: Path(s) to audio file(s) or numpy waveform(s) at 16kHz mono.
batch_size: Batch size for encoder inference.
Returns:
Single dict or list of dicts with predictions per dimension.
"""
single = isinstance(audio, (str, Path, np.ndarray))
if single:
audio = [audio]
# Load audio files
waveforms = []
durations = []
valid_mask = []
for item in audio:
if isinstance(item, (str, Path)):
wf = load_audio(str(item), sr=self.sample_rate)
else:
wf = item
if wf is not None and len(wf) >= int(self.sample_rate * 0.1):
waveforms.append(wf)
durations.append(len(wf) / self.sample_rate)
valid_mask.append(True)
else:
waveforms.append(None)
durations.append(0.0)
valid_mask.append(False)
# Process in batches
all_results = [None] * len(audio)
valid_indices = [i for i, ok in enumerate(valid_mask) if ok]
valid_waveforms = [waveforms[i] for i in valid_indices]
valid_durations = [durations[i] for i in valid_indices]
for start in range(0, len(valid_waveforms), batch_size):
end = min(start + batch_size, len(valid_waveforms))
batch_wf = valid_waveforms[start:end]
batch_dur = valid_durations[start:end]
features = self._extract_features_batch(batch_wf, batch_dur)
batch_results = self._classify_batch(features)
for j, res in enumerate(batch_results):
orig_idx = valid_indices[start + j]
all_results[orig_idx] = res
# Fill failed entries
for i in range(len(all_results)):
if all_results[i] is None:
all_results[i] = {"error": "Failed to load or process audio"}
if single:
return all_results[0]
return all_results
def predict_from_encoder_outputs(
self,
v10_hidden_states: torch.Tensor,
v11_hidden_states: torch.Tensor,
durations: List[float],
) -> List[Dict]:
"""Predict from pre-computed encoder outputs (for integration with Empathic-Insight-Voice-Plus).
This avoids re-running V1.0 encoder when used alongside the emotion pipeline.
Args:
v10_hidden_states: (B, 1500, 768) from BUD-E-Whisper V1.0 encoder
v11_hidden_states: (B, 1500, 768) from BUD-E-Whisper V1.1 encoder
durations: List of audio durations in seconds
Returns:
List of dicts with predictions per dimension.
"""
B = v10_hidden_states.shape[0]
features = np.zeros((B, 3072), dtype=np.float32)
for j in range(B):
dur = durations[j]
n_frames = max(min(int(dur * 50), 1500), 2)
h11 = v11_hidden_states[j, :n_frames]
mid = n_frames // 2
v11_first = h11[:mid].mean(dim=0).cpu().float().numpy()
v11_second = h11[mid:].mean(dim=0).cpu().float().numpy()
h10 = v10_hidden_states[j, :n_frames]
v10_first = h10[:mid].mean(dim=0).cpu().float().numpy()
v10_second = h10[mid:].mean(dim=0).cpu().float().numpy()
features[j] = np.concatenate([v11_first, v11_second, v10_first, v10_second])
return self._classify_batch(features)
def format_tags(
self, result: Dict, format: str = "short"
) -> str:
"""Format prediction results as a comma-separated tag string.
Args:
result: Single prediction dict from predict().
format: "short" for 2-3 word tags, "sentences" for descriptive sentences.
"""
if "error" in result:
return "[error]"
parts = []
key = "tag_short" if format == "short" else "tag_sentence"
sep = ", " if format == "short" else ". "
for dim in self.dim_names:
if dim in result and result[dim].get(key):
parts.append(result[dim][key])
return sep.join(parts)
def get_accuracy_table(self) -> List[Dict]:
"""Return accuracy stats for all 57 dimensions."""
rows = []
for dim in self.dim_names:
clf = self.classifiers[dim]
tier = "A" if clf["adj1"] >= 0.85 else ("B" if clf["adj1"] >= 0.7 else ("C" if clf["adj1"] >= 0.55 else "D"))
rows.append({
"dim": dim,
"n_classes": clf["n_classes"],
"exact_acc": round(clf["acc"], 3),
"adj1_acc": round(clf["adj1"], 3),
"tier": tier,
})
return rows
# ============================================================================
# CLI
# ============================================================================
def main():
parser = argparse.ArgumentParser(
description="Voice-Taxonomy-57: Classify 57 voice dimensions from audio"
)
parser.add_argument(
"--input", "-i", required=True,
help="Audio file or directory of audio files"
)
parser.add_argument(
"--output", "-o", default="results.json",
help="Output JSON file (default: results.json)"
)
parser.add_argument(
"--model-path", "-m", default=".",
help="Path to model directory or HuggingFace repo ID (default: current dir)"
)
parser.add_argument(
"--batch-size", "-b", type=int, default=16,
help="Batch size for encoder inference (default: 16)"
)
parser.add_argument(
"--device", "-d", default="cuda:0",
help="Device (default: cuda:0)"
)
parser.add_argument(
"--fp32", action="store_true",
help="Use fp32 instead of fp16 for inference"
)
parser.add_argument(
"--format", choices=["json", "tags-short", "tags-sentences"], default="json",
help="Output format (default: json)"
)
args = parser.parse_args()
dtype = torch.float32 if args.fp32 else torch.float16
print(f"Loading model from {args.model_path}...")
t0 = time.time()
model = VoiceTaxonomy57.from_pretrained(args.model_path, device=args.device, dtype=dtype)
print(f"Model loaded in {time.time() - t0:.1f}s")
# Collect audio files
input_path = Path(args.input)
audio_extensions = {".wav", ".mp3", ".flac", ".ogg", ".m4a", ".opus", ".wma", ".aac"}
if input_path.is_file():
files = [str(input_path)]
elif input_path.is_dir():
files = sorted(
str(p) for p in input_path.rglob("*")
if p.suffix.lower() in audio_extensions
)
else:
print(f"Error: {args.input} not found")
sys.exit(1)
if not files:
print("No audio files found.")
sys.exit(1)
print(f"Processing {len(files)} audio file(s) with batch_size={args.batch_size}...")
t1 = time.time()
results = model.predict(files, batch_size=args.batch_size)
elapsed = time.time() - t1
if isinstance(results, dict):
results = [results]
print(f"Inference complete in {elapsed:.1f}s ({elapsed / len(files):.2f}s/file)")
if args.format == "json":
output = []
for path, res in zip(files, results):
output.append({
"file": os.path.basename(path),
"path": path,
"predictions": res,
})
with open(args.output, "w") as f:
json.dump(output, f, indent=2)
print(f"Results saved to {args.output}")
else:
fmt = "short" if args.format == "tags-short" else "sentences"
for path, res in zip(files, results):
tags = model.format_tags(res, format=fmt)
print(f"\n--- {os.path.basename(path)} ---")
print(tags)
if __name__ == "__main__":
main()