Spaces:
Sleeping
Sleeping
| """ | |
| Audio Embeddings using CLAP (Contrastive Language-Audio Pretraining) | |
| Generates semantic audio embeddings that capture musical characteristics. | |
| These embeddings can be used for similarity search and attribution. | |
| Requires: laion-clap or transformers package | |
| """ | |
| import json | |
| from pathlib import Path | |
| from typing import Optional, List | |
| import numpy as np | |
| # Cache for loaded models | |
| _model_cache = {} | |
| def get_best_device() -> str: | |
| """Auto-detect the best available device for ML processing.""" | |
| try: | |
| import torch | |
| if torch.backends.mps.is_available(): | |
| return "mps" # Apple Silicon GPU | |
| elif torch.cuda.is_available(): | |
| return "cuda" # NVIDIA GPU | |
| except ImportError: | |
| pass | |
| return "cpu" | |
| def generate_embedding( | |
| audio_path: str, | |
| model: str = "laion/larger_clap_music" | |
| ) -> dict: | |
| """ | |
| Generate audio embedding using CLAP model. | |
| Args: | |
| audio_path: Path to audio file | |
| model: CLAP model to use (default: laion/larger_clap_music) | |
| Returns: | |
| dict with: | |
| - success: bool | |
| - embedding: list[float] (embedding vector) | |
| - dimension: int (embedding dimension) | |
| - model: str (model used) | |
| - error: str (if failed) | |
| """ | |
| audio_path = Path(audio_path) | |
| if not audio_path.exists(): | |
| return { | |
| "success": False, | |
| "error": f"Audio file not found: {audio_path}" | |
| } | |
| try: | |
| # Try laion-clap first (purpose-built for music) | |
| return _generate_with_laion_clap(str(audio_path), model) | |
| except ImportError: | |
| try: | |
| # Fallback to transformers CLAP | |
| return _generate_with_transformers(str(audio_path), model) | |
| except ImportError: | |
| return { | |
| "success": False, | |
| "error": "Neither laion-clap nor transformers[audio] is installed. Install with: pip install laion-clap" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Embedding generation failed: {str(e)}" | |
| } | |
| def _generate_with_laion_clap(audio_path: str, model_name: str) -> dict: | |
| """Generate embedding using laion-clap library.""" | |
| import laion_clap | |
| # Get best device | |
| device = get_best_device() | |
| # Get or create model | |
| cache_key = f"laion_clap_{device}" | |
| if cache_key not in _model_cache: | |
| # Load the music-specialized model with device support | |
| model = laion_clap.CLAP_Module(enable_fusion=False, device=device) | |
| model.load_ckpt() # Uses default music checkpoint | |
| _model_cache[cache_key] = {"model": model, "device": device} | |
| model = _model_cache[cache_key]["model"] | |
| device = _model_cache[cache_key]["device"] | |
| # Generate embedding | |
| embedding = model.get_audio_embedding_from_filelist( | |
| [audio_path], | |
| use_tensor=False | |
| ) | |
| # embedding is numpy array of shape (1, dim) | |
| embedding_list = embedding[0].tolist() | |
| return { | |
| "success": True, | |
| "embedding": embedding_list, | |
| "dimension": len(embedding_list), | |
| "model": "laion-clap-music", | |
| "device": device | |
| } | |
| def _generate_with_transformers(audio_path: str, model_name: str) -> dict: | |
| """Generate embedding using Hugging Face transformers.""" | |
| from transformers import ClapModel, ClapProcessor | |
| import torch | |
| import librosa | |
| # Get best device (MPS for Apple Silicon, CUDA for NVIDIA, else CPU) | |
| device = get_best_device() | |
| # Get or create model | |
| cache_key = f"transformers_{model_name}_{device}" | |
| if cache_key not in _model_cache: | |
| model = ClapModel.from_pretrained(model_name) | |
| model = model.to(device) | |
| model.eval() | |
| _model_cache[cache_key] = { | |
| "model": model, | |
| "processor": ClapProcessor.from_pretrained(model_name), | |
| "device": device | |
| } | |
| model = _model_cache[cache_key]["model"] | |
| processor = _model_cache[cache_key]["processor"] | |
| device = _model_cache[cache_key]["device"] | |
| # Load audio | |
| # CLAP expects 48kHz audio | |
| audio, sr = librosa.load(audio_path, sr=48000, mono=True) | |
| # Process through CLAP | |
| inputs = processor( | |
| audio=audio, | |
| sampling_rate=48000, | |
| return_tensors="pt" | |
| ) | |
| # Move inputs to device | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| audio_features = model.get_audio_features(**inputs) | |
| # Convert to list | |
| embedding_list = audio_features[0].cpu().numpy().tolist() | |
| return { | |
| "success": True, | |
| "embedding": embedding_list, | |
| "dimension": len(embedding_list), | |
| "model": model_name, | |
| "device": device | |
| } | |
| def compute_similarity(embedding1: List[float], embedding2: List[float]) -> dict: | |
| """ | |
| Compute cosine similarity between two embeddings. | |
| Args: | |
| embedding1: First embedding vector | |
| embedding2: Second embedding vector | |
| Returns: | |
| dict with: | |
| - success: bool | |
| - similarity: float (cosine similarity, -1 to 1) | |
| - error: str (if failed) | |
| """ | |
| try: | |
| e1 = np.array(embedding1) | |
| e2 = np.array(embedding2) | |
| if e1.shape != e2.shape: | |
| return { | |
| "success": False, | |
| "error": f"Embedding dimension mismatch: {e1.shape} vs {e2.shape}" | |
| } | |
| # Cosine similarity | |
| dot_product = np.dot(e1, e2) | |
| norm1 = np.linalg.norm(e1) | |
| norm2 = np.linalg.norm(e2) | |
| if norm1 == 0 or norm2 == 0: | |
| return { | |
| "success": False, | |
| "error": "Zero-norm embedding detected" | |
| } | |
| similarity = dot_product / (norm1 * norm2) | |
| return { | |
| "success": True, | |
| "similarity": float(similarity) | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Similarity computation failed: {str(e)}" | |
| } | |
| def batch_generate_embeddings( | |
| audio_paths: List[str], | |
| model: str = "laion/larger_clap_music" | |
| ) -> dict: | |
| """ | |
| Generate embeddings for multiple audio files. | |
| Args: | |
| audio_paths: List of paths to audio files | |
| model: CLAP model to use | |
| Returns: | |
| dict with: | |
| - success: bool | |
| - embeddings: list of {path, embedding, dimension} or {path, error} | |
| """ | |
| results = [] | |
| for path in audio_paths: | |
| result = generate_embedding(path, model) | |
| results.append({ | |
| "path": path, | |
| **result | |
| }) | |
| return { | |
| "success": all(r.get("success", False) for r in results), | |
| "embeddings": results | |
| } | |
| def generate_chunk_embeddings( | |
| audio_path: str, | |
| chunk_duration: float = 10.0, | |
| chunk_overlap: float = 5.0, | |
| model: str = "laion/larger_clap_music" | |
| ) -> dict: | |
| """ | |
| Generate embeddings for audio chunks (sliding window). | |
| This is the key function for chunk-based attribution - it splits audio | |
| into overlapping windows and generates an embedding for each, allowing | |
| us to match specific sections of audio. | |
| Args: | |
| audio_path: Path to audio file | |
| chunk_duration: Duration of each chunk in seconds (default: 10s) | |
| chunk_overlap: Overlap between chunks in seconds (default: 5s) | |
| model: CLAP model to use | |
| Returns: | |
| dict with: | |
| - success: bool | |
| - chunks: list of { | |
| start_time: float, | |
| end_time: float, | |
| embedding: list[float], | |
| dimension: int | |
| } | |
| - total_duration: float | |
| - chunk_count: int | |
| - error: str (if failed) | |
| """ | |
| import librosa | |
| import tempfile | |
| import soundfile as sf | |
| audio_path = Path(audio_path) | |
| if not audio_path.exists(): | |
| return { | |
| "success": False, | |
| "error": f"Audio file not found: {audio_path}" | |
| } | |
| try: | |
| # Load full audio | |
| audio, sr = librosa.load(str(audio_path), sr=48000, mono=True) | |
| total_duration = len(audio) / sr | |
| # Calculate chunk parameters | |
| chunk_samples = int(chunk_duration * sr) | |
| hop_samples = int((chunk_duration - chunk_overlap) * sr) | |
| # If audio is shorter than one chunk, process entire audio | |
| if len(audio) < chunk_samples: | |
| result = generate_embedding(str(audio_path), model) | |
| if result.get("success"): | |
| return { | |
| "success": True, | |
| "chunks": [{ | |
| "start_time": 0.0, | |
| "end_time": total_duration, | |
| "embedding": result["embedding"], | |
| "dimension": result["dimension"] | |
| }], | |
| "total_duration": total_duration, | |
| "chunk_count": 1 | |
| } | |
| else: | |
| return result | |
| # Generate chunks | |
| chunks = [] | |
| chunk_idx = 0 | |
| while chunk_idx * hop_samples + chunk_samples <= len(audio): | |
| start_sample = chunk_idx * hop_samples | |
| end_sample = start_sample + chunk_samples | |
| start_time = start_sample / sr | |
| end_time = end_sample / sr | |
| # Extract chunk audio | |
| chunk_audio = audio[start_sample:end_sample] | |
| # Save chunk to temp file for processing | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: | |
| tmp_path = tmp.name | |
| sf.write(tmp_path, chunk_audio, sr) | |
| try: | |
| # Generate embedding for chunk | |
| result = generate_embedding(tmp_path, model) | |
| if result.get("success"): | |
| chunks.append({ | |
| "start_time": round(start_time, 2), | |
| "end_time": round(end_time, 2), | |
| "embedding": result["embedding"], | |
| "dimension": result["dimension"] | |
| }) | |
| finally: | |
| # Clean up temp file | |
| import os | |
| if os.path.exists(tmp_path): | |
| os.remove(tmp_path) | |
| chunk_idx += 1 | |
| # Handle remaining audio if any | |
| remaining_start = chunk_idx * hop_samples | |
| if remaining_start < len(audio) and len(audio) - remaining_start >= sr: # At least 1 second | |
| chunk_audio = audio[remaining_start:] | |
| start_time = remaining_start / sr | |
| end_time = len(audio) / sr | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: | |
| tmp_path = tmp.name | |
| sf.write(tmp_path, chunk_audio, sr) | |
| try: | |
| result = generate_embedding(tmp_path, model) | |
| if result.get("success"): | |
| chunks.append({ | |
| "start_time": round(start_time, 2), | |
| "end_time": round(end_time, 2), | |
| "embedding": result["embedding"], | |
| "dimension": result["dimension"] | |
| }) | |
| finally: | |
| import os | |
| if os.path.exists(tmp_path): | |
| os.remove(tmp_path) | |
| return { | |
| "success": True, | |
| "chunks": chunks, | |
| "total_duration": round(total_duration, 2), | |
| "chunk_count": len(chunks) | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Chunk embedding generation failed: {str(e)}" | |
| } | |
| if __name__ == "__main__": | |
| # Test embedding generation | |
| import sys | |
| if len(sys.argv) > 1: | |
| result = generate_embedding(sys.argv[1]) | |
| # Don't print full embedding (too long), just metadata | |
| if result["success"]: | |
| print(json.dumps({ | |
| "success": True, | |
| "dimension": result["dimension"], | |
| "model": result["model"], | |
| "embedding_preview": result["embedding"][:5] + ["..."] | |
| }, indent=2)) | |
| else: | |
| print(json.dumps(result, indent=2)) | |
| else: | |
| print("Usage: python embeddings.py <audio_file>") | |