""" Audio Embeddings using CLAP (Contrastive Language-Audio Pretraining) Generates semantic audio embeddings that capture musical characteristics. These embeddings can be used for similarity search and attribution. Requires: laion-clap or transformers package """ import json from pathlib import Path from typing import Optional, List import numpy as np # Cache for loaded models _model_cache = {} def get_best_device() -> str: """Auto-detect the best available device for ML processing.""" try: import torch if torch.backends.mps.is_available(): return "mps" # Apple Silicon GPU elif torch.cuda.is_available(): return "cuda" # NVIDIA GPU except ImportError: pass return "cpu" def generate_embedding( audio_path: str, model: str = "laion/larger_clap_music" ) -> dict: """ Generate audio embedding using CLAP model. Args: audio_path: Path to audio file model: CLAP model to use (default: laion/larger_clap_music) Returns: dict with: - success: bool - embedding: list[float] (embedding vector) - dimension: int (embedding dimension) - model: str (model used) - error: str (if failed) """ audio_path = Path(audio_path) if not audio_path.exists(): return { "success": False, "error": f"Audio file not found: {audio_path}" } try: # Try laion-clap first (purpose-built for music) return _generate_with_laion_clap(str(audio_path), model) except ImportError: try: # Fallback to transformers CLAP return _generate_with_transformers(str(audio_path), model) except ImportError: return { "success": False, "error": "Neither laion-clap nor transformers[audio] is installed. Install with: pip install laion-clap" } except Exception as e: return { "success": False, "error": f"Embedding generation failed: {str(e)}" } def _generate_with_laion_clap(audio_path: str, model_name: str) -> dict: """Generate embedding using laion-clap library.""" import laion_clap # Get best device device = get_best_device() # Get or create model cache_key = f"laion_clap_{device}" if cache_key not in _model_cache: # Load the music-specialized model with device support model = laion_clap.CLAP_Module(enable_fusion=False, device=device) model.load_ckpt() # Uses default music checkpoint _model_cache[cache_key] = {"model": model, "device": device} model = _model_cache[cache_key]["model"] device = _model_cache[cache_key]["device"] # Generate embedding embedding = model.get_audio_embedding_from_filelist( [audio_path], use_tensor=False ) # embedding is numpy array of shape (1, dim) embedding_list = embedding[0].tolist() return { "success": True, "embedding": embedding_list, "dimension": len(embedding_list), "model": "laion-clap-music", "device": device } def _generate_with_transformers(audio_path: str, model_name: str) -> dict: """Generate embedding using Hugging Face transformers.""" from transformers import ClapModel, ClapProcessor import torch import librosa # Get best device (MPS for Apple Silicon, CUDA for NVIDIA, else CPU) device = get_best_device() # Get or create model cache_key = f"transformers_{model_name}_{device}" if cache_key not in _model_cache: model = ClapModel.from_pretrained(model_name) model = model.to(device) model.eval() _model_cache[cache_key] = { "model": model, "processor": ClapProcessor.from_pretrained(model_name), "device": device } model = _model_cache[cache_key]["model"] processor = _model_cache[cache_key]["processor"] device = _model_cache[cache_key]["device"] # Load audio # CLAP expects 48kHz audio audio, sr = librosa.load(audio_path, sr=48000, mono=True) # Process through CLAP inputs = processor( audio=audio, sampling_rate=48000, return_tensors="pt" ) # Move inputs to device inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): audio_features = model.get_audio_features(**inputs) # Convert to list embedding_list = audio_features[0].cpu().numpy().tolist() return { "success": True, "embedding": embedding_list, "dimension": len(embedding_list), "model": model_name, "device": device } def compute_similarity(embedding1: List[float], embedding2: List[float]) -> dict: """ Compute cosine similarity between two embeddings. Args: embedding1: First embedding vector embedding2: Second embedding vector Returns: dict with: - success: bool - similarity: float (cosine similarity, -1 to 1) - error: str (if failed) """ try: e1 = np.array(embedding1) e2 = np.array(embedding2) if e1.shape != e2.shape: return { "success": False, "error": f"Embedding dimension mismatch: {e1.shape} vs {e2.shape}" } # Cosine similarity dot_product = np.dot(e1, e2) norm1 = np.linalg.norm(e1) norm2 = np.linalg.norm(e2) if norm1 == 0 or norm2 == 0: return { "success": False, "error": "Zero-norm embedding detected" } similarity = dot_product / (norm1 * norm2) return { "success": True, "similarity": float(similarity) } except Exception as e: return { "success": False, "error": f"Similarity computation failed: {str(e)}" } def batch_generate_embeddings( audio_paths: List[str], model: str = "laion/larger_clap_music" ) -> dict: """ Generate embeddings for multiple audio files. Args: audio_paths: List of paths to audio files model: CLAP model to use Returns: dict with: - success: bool - embeddings: list of {path, embedding, dimension} or {path, error} """ results = [] for path in audio_paths: result = generate_embedding(path, model) results.append({ "path": path, **result }) return { "success": all(r.get("success", False) for r in results), "embeddings": results } def generate_chunk_embeddings( audio_path: str, chunk_duration: float = 10.0, chunk_overlap: float = 5.0, model: str = "laion/larger_clap_music" ) -> dict: """ Generate embeddings for audio chunks (sliding window). This is the key function for chunk-based attribution - it splits audio into overlapping windows and generates an embedding for each, allowing us to match specific sections of audio. Args: audio_path: Path to audio file chunk_duration: Duration of each chunk in seconds (default: 10s) chunk_overlap: Overlap between chunks in seconds (default: 5s) model: CLAP model to use Returns: dict with: - success: bool - chunks: list of { start_time: float, end_time: float, embedding: list[float], dimension: int } - total_duration: float - chunk_count: int - error: str (if failed) """ import librosa import tempfile import soundfile as sf audio_path = Path(audio_path) if not audio_path.exists(): return { "success": False, "error": f"Audio file not found: {audio_path}" } try: # Load full audio audio, sr = librosa.load(str(audio_path), sr=48000, mono=True) total_duration = len(audio) / sr # Calculate chunk parameters chunk_samples = int(chunk_duration * sr) hop_samples = int((chunk_duration - chunk_overlap) * sr) # If audio is shorter than one chunk, process entire audio if len(audio) < chunk_samples: result = generate_embedding(str(audio_path), model) if result.get("success"): return { "success": True, "chunks": [{ "start_time": 0.0, "end_time": total_duration, "embedding": result["embedding"], "dimension": result["dimension"] }], "total_duration": total_duration, "chunk_count": 1 } else: return result # Generate chunks chunks = [] chunk_idx = 0 while chunk_idx * hop_samples + chunk_samples <= len(audio): start_sample = chunk_idx * hop_samples end_sample = start_sample + chunk_samples start_time = start_sample / sr end_time = end_sample / sr # Extract chunk audio chunk_audio = audio[start_sample:end_sample] # Save chunk to temp file for processing with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: tmp_path = tmp.name sf.write(tmp_path, chunk_audio, sr) try: # Generate embedding for chunk result = generate_embedding(tmp_path, model) if result.get("success"): chunks.append({ "start_time": round(start_time, 2), "end_time": round(end_time, 2), "embedding": result["embedding"], "dimension": result["dimension"] }) finally: # Clean up temp file import os if os.path.exists(tmp_path): os.remove(tmp_path) chunk_idx += 1 # Handle remaining audio if any remaining_start = chunk_idx * hop_samples if remaining_start < len(audio) and len(audio) - remaining_start >= sr: # At least 1 second chunk_audio = audio[remaining_start:] start_time = remaining_start / sr end_time = len(audio) / sr with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: tmp_path = tmp.name sf.write(tmp_path, chunk_audio, sr) try: result = generate_embedding(tmp_path, model) if result.get("success"): chunks.append({ "start_time": round(start_time, 2), "end_time": round(end_time, 2), "embedding": result["embedding"], "dimension": result["dimension"] }) finally: import os if os.path.exists(tmp_path): os.remove(tmp_path) return { "success": True, "chunks": chunks, "total_duration": round(total_duration, 2), "chunk_count": len(chunks) } except Exception as e: return { "success": False, "error": f"Chunk embedding generation failed: {str(e)}" } if __name__ == "__main__": # Test embedding generation import sys if len(sys.argv) > 1: result = generate_embedding(sys.argv[1]) # Don't print full embedding (too long), just metadata if result["success"]: print(json.dumps({ "success": True, "dimension": result["dimension"], "model": result["model"], "embedding_preview": result["embedding"][:5] + ["..."] }, indent=2)) else: print(json.dumps(result, indent=2)) else: print("Usage: python embeddings.py ")