aimusic-attribution / ml /embeddings.py
emresar's picture
Upload folder using huggingface_hub
6678fa1 verified
"""
Audio Embeddings using CLAP (Contrastive Language-Audio Pretraining)
Generates semantic audio embeddings that capture musical characteristics.
These embeddings can be used for similarity search and attribution.
Requires: laion-clap or transformers package
"""
import json
from pathlib import Path
from typing import Optional, List
import numpy as np
# Cache for loaded models
_model_cache = {}
def get_best_device() -> str:
"""Auto-detect the best available device for ML processing."""
try:
import torch
if torch.backends.mps.is_available():
return "mps" # Apple Silicon GPU
elif torch.cuda.is_available():
return "cuda" # NVIDIA GPU
except ImportError:
pass
return "cpu"
def generate_embedding(
audio_path: str,
model: str = "laion/larger_clap_music"
) -> dict:
"""
Generate audio embedding using CLAP model.
Args:
audio_path: Path to audio file
model: CLAP model to use (default: laion/larger_clap_music)
Returns:
dict with:
- success: bool
- embedding: list[float] (embedding vector)
- dimension: int (embedding dimension)
- model: str (model used)
- error: str (if failed)
"""
audio_path = Path(audio_path)
if not audio_path.exists():
return {
"success": False,
"error": f"Audio file not found: {audio_path}"
}
try:
# Try laion-clap first (purpose-built for music)
return _generate_with_laion_clap(str(audio_path), model)
except ImportError:
try:
# Fallback to transformers CLAP
return _generate_with_transformers(str(audio_path), model)
except ImportError:
return {
"success": False,
"error": "Neither laion-clap nor transformers[audio] is installed. Install with: pip install laion-clap"
}
except Exception as e:
return {
"success": False,
"error": f"Embedding generation failed: {str(e)}"
}
def _generate_with_laion_clap(audio_path: str, model_name: str) -> dict:
"""Generate embedding using laion-clap library."""
import laion_clap
# Get best device
device = get_best_device()
# Get or create model
cache_key = f"laion_clap_{device}"
if cache_key not in _model_cache:
# Load the music-specialized model with device support
model = laion_clap.CLAP_Module(enable_fusion=False, device=device)
model.load_ckpt() # Uses default music checkpoint
_model_cache[cache_key] = {"model": model, "device": device}
model = _model_cache[cache_key]["model"]
device = _model_cache[cache_key]["device"]
# Generate embedding
embedding = model.get_audio_embedding_from_filelist(
[audio_path],
use_tensor=False
)
# embedding is numpy array of shape (1, dim)
embedding_list = embedding[0].tolist()
return {
"success": True,
"embedding": embedding_list,
"dimension": len(embedding_list),
"model": "laion-clap-music",
"device": device
}
def _generate_with_transformers(audio_path: str, model_name: str) -> dict:
"""Generate embedding using Hugging Face transformers."""
from transformers import ClapModel, ClapProcessor
import torch
import librosa
# Get best device (MPS for Apple Silicon, CUDA for NVIDIA, else CPU)
device = get_best_device()
# Get or create model
cache_key = f"transformers_{model_name}_{device}"
if cache_key not in _model_cache:
model = ClapModel.from_pretrained(model_name)
model = model.to(device)
model.eval()
_model_cache[cache_key] = {
"model": model,
"processor": ClapProcessor.from_pretrained(model_name),
"device": device
}
model = _model_cache[cache_key]["model"]
processor = _model_cache[cache_key]["processor"]
device = _model_cache[cache_key]["device"]
# Load audio
# CLAP expects 48kHz audio
audio, sr = librosa.load(audio_path, sr=48000, mono=True)
# Process through CLAP
inputs = processor(
audio=audio,
sampling_rate=48000,
return_tensors="pt"
)
# Move inputs to device
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
audio_features = model.get_audio_features(**inputs)
# Convert to list
embedding_list = audio_features[0].cpu().numpy().tolist()
return {
"success": True,
"embedding": embedding_list,
"dimension": len(embedding_list),
"model": model_name,
"device": device
}
def compute_similarity(embedding1: List[float], embedding2: List[float]) -> dict:
"""
Compute cosine similarity between two embeddings.
Args:
embedding1: First embedding vector
embedding2: Second embedding vector
Returns:
dict with:
- success: bool
- similarity: float (cosine similarity, -1 to 1)
- error: str (if failed)
"""
try:
e1 = np.array(embedding1)
e2 = np.array(embedding2)
if e1.shape != e2.shape:
return {
"success": False,
"error": f"Embedding dimension mismatch: {e1.shape} vs {e2.shape}"
}
# Cosine similarity
dot_product = np.dot(e1, e2)
norm1 = np.linalg.norm(e1)
norm2 = np.linalg.norm(e2)
if norm1 == 0 or norm2 == 0:
return {
"success": False,
"error": "Zero-norm embedding detected"
}
similarity = dot_product / (norm1 * norm2)
return {
"success": True,
"similarity": float(similarity)
}
except Exception as e:
return {
"success": False,
"error": f"Similarity computation failed: {str(e)}"
}
def batch_generate_embeddings(
audio_paths: List[str],
model: str = "laion/larger_clap_music"
) -> dict:
"""
Generate embeddings for multiple audio files.
Args:
audio_paths: List of paths to audio files
model: CLAP model to use
Returns:
dict with:
- success: bool
- embeddings: list of {path, embedding, dimension} or {path, error}
"""
results = []
for path in audio_paths:
result = generate_embedding(path, model)
results.append({
"path": path,
**result
})
return {
"success": all(r.get("success", False) for r in results),
"embeddings": results
}
def generate_chunk_embeddings(
audio_path: str,
chunk_duration: float = 10.0,
chunk_overlap: float = 5.0,
model: str = "laion/larger_clap_music"
) -> dict:
"""
Generate embeddings for audio chunks (sliding window).
This is the key function for chunk-based attribution - it splits audio
into overlapping windows and generates an embedding for each, allowing
us to match specific sections of audio.
Args:
audio_path: Path to audio file
chunk_duration: Duration of each chunk in seconds (default: 10s)
chunk_overlap: Overlap between chunks in seconds (default: 5s)
model: CLAP model to use
Returns:
dict with:
- success: bool
- chunks: list of {
start_time: float,
end_time: float,
embedding: list[float],
dimension: int
}
- total_duration: float
- chunk_count: int
- error: str (if failed)
"""
import librosa
import tempfile
import soundfile as sf
audio_path = Path(audio_path)
if not audio_path.exists():
return {
"success": False,
"error": f"Audio file not found: {audio_path}"
}
try:
# Load full audio
audio, sr = librosa.load(str(audio_path), sr=48000, mono=True)
total_duration = len(audio) / sr
# Calculate chunk parameters
chunk_samples = int(chunk_duration * sr)
hop_samples = int((chunk_duration - chunk_overlap) * sr)
# If audio is shorter than one chunk, process entire audio
if len(audio) < chunk_samples:
result = generate_embedding(str(audio_path), model)
if result.get("success"):
return {
"success": True,
"chunks": [{
"start_time": 0.0,
"end_time": total_duration,
"embedding": result["embedding"],
"dimension": result["dimension"]
}],
"total_duration": total_duration,
"chunk_count": 1
}
else:
return result
# Generate chunks
chunks = []
chunk_idx = 0
while chunk_idx * hop_samples + chunk_samples <= len(audio):
start_sample = chunk_idx * hop_samples
end_sample = start_sample + chunk_samples
start_time = start_sample / sr
end_time = end_sample / sr
# Extract chunk audio
chunk_audio = audio[start_sample:end_sample]
# Save chunk to temp file for processing
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_path = tmp.name
sf.write(tmp_path, chunk_audio, sr)
try:
# Generate embedding for chunk
result = generate_embedding(tmp_path, model)
if result.get("success"):
chunks.append({
"start_time": round(start_time, 2),
"end_time": round(end_time, 2),
"embedding": result["embedding"],
"dimension": result["dimension"]
})
finally:
# Clean up temp file
import os
if os.path.exists(tmp_path):
os.remove(tmp_path)
chunk_idx += 1
# Handle remaining audio if any
remaining_start = chunk_idx * hop_samples
if remaining_start < len(audio) and len(audio) - remaining_start >= sr: # At least 1 second
chunk_audio = audio[remaining_start:]
start_time = remaining_start / sr
end_time = len(audio) / sr
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_path = tmp.name
sf.write(tmp_path, chunk_audio, sr)
try:
result = generate_embedding(tmp_path, model)
if result.get("success"):
chunks.append({
"start_time": round(start_time, 2),
"end_time": round(end_time, 2),
"embedding": result["embedding"],
"dimension": result["dimension"]
})
finally:
import os
if os.path.exists(tmp_path):
os.remove(tmp_path)
return {
"success": True,
"chunks": chunks,
"total_duration": round(total_duration, 2),
"chunk_count": len(chunks)
}
except Exception as e:
return {
"success": False,
"error": f"Chunk embedding generation failed: {str(e)}"
}
if __name__ == "__main__":
# Test embedding generation
import sys
if len(sys.argv) > 1:
result = generate_embedding(sys.argv[1])
# Don't print full embedding (too long), just metadata
if result["success"]:
print(json.dumps({
"success": True,
"dimension": result["dimension"],
"model": result["model"],
"embedding_preview": result["embedding"][:5] + ["..."]
}, indent=2))
else:
print(json.dumps(result, indent=2))
else:
print("Usage: python embeddings.py <audio_file>")