#!/usr/bin/env python3 """ AI Music Attribution - ML Processor Main entry point for audio processing operations. Called from Node.js via subprocess. Usage: python processor.py Operations: - separate: Stem separation using Demucs - fingerprint: Audio fingerprinting using Chromaprint - embed: Generate embeddings using CLAP - process_all: Run full pipeline (separate -> fingerprint -> embed) Output: JSON result to stdout """ import os # Fix OpenMP conflicts between torch and faiss # Must be set before importing any ML libraries os.environ['OMP_NUM_THREADS'] = '1' os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE' os.environ['TRANSFORMERS_VERBOSITY'] = 'error' os.environ['HF_HUB_DISABLE_PROGRESS_BARS'] = '1' import sys import json import warnings warnings.filterwarnings('ignore') from pathlib import Path # Add ml directory to path for imports sys.path.insert(0, str(Path(__file__).parent)) def main(): if len(sys.argv) < 3: print(json.dumps({ "success": False, "error": "Usage: python processor.py " })) sys.exit(1) operation = sys.argv[1] try: args = json.loads(sys.argv[2]) except json.JSONDecodeError as e: print(json.dumps({ "success": False, "error": f"Invalid JSON args: {str(e)}" })) sys.exit(1) try: result = dispatch_operation(operation, args) print(json.dumps(result)) except Exception as e: print(json.dumps({ "success": False, "error": str(e), "operation": operation })) sys.exit(1) def dispatch_operation(operation: str, args: dict) -> dict: """Route to appropriate handler based on operation type.""" if operation == "separate": from stem_separation import separate_stems return separate_stems( input_path=args["input_path"], output_dir=args["output_dir"], model=args.get("model", "htdemucs") ) elif operation == "fingerprint": from fingerprinting import generate_fingerprint return generate_fingerprint( audio_path=args["audio_path"] ) elif operation == "embed": from embeddings import generate_embedding return generate_embedding( audio_path=args["audio_path"], model=args.get("model", "laion/larger_clap_music") ) elif operation == "embed_chunks": # Generate chunk-based embeddings for attribution from embeddings import generate_chunk_embeddings return generate_chunk_embeddings( audio_path=args["audio_path"], chunk_duration=args.get("chunk_duration", 10.0), chunk_overlap=args.get("chunk_overlap", 5.0), model=args.get("model", "laion/larger_clap_music") ) elif operation == "process_all": # Full pipeline: separate -> fingerprint each stem -> embed each stem from stem_separation import separate_stems from fingerprinting import generate_fingerprint from embeddings import generate_embedding input_path = args["input_path"] output_dir = args["output_dir"] # Step 1: Separate stems separation_result = separate_stems(input_path, output_dir) if not separation_result["success"]: return separation_result stems = separation_result["stems"] results = { "success": True, "stems": [] } # Step 2 & 3: Process each stem for stem in stems: stem_path = stem["path"] # Generate fingerprint fp_result = generate_fingerprint(stem_path) # Generate embedding embed_result = generate_embedding(stem_path) results["stems"].append({ "type": stem["type"], "path": stem["path"], "duration": stem.get("duration"), "fingerprint": fp_result.get("fingerprint") if fp_result["success"] else None, "fingerprint_error": fp_result.get("error"), "embedding": embed_result.get("embedding") if embed_result["success"] else None, "embedding_model": embed_result.get("model"), "embedding_error": embed_result.get("error") }) return results elif operation == "health": # Check if all dependencies are available return check_health() elif operation == "faiss_add": # Add embeddings to FAISS index from faiss_index import add_embeddings return add_embeddings(args.get("embeddings", [])) elif operation == "faiss_search": # Search FAISS index for similar embeddings from faiss_index import search_similar return search_similar( query_embedding=args["embedding"], k=args.get("k", 10), threshold=args.get("threshold", 0.5) ) elif operation == "faiss_stats": # Get FAISS index statistics from faiss_index import get_index_stats return get_index_stats() elif operation == "faiss_clear": # Clear FAISS index from faiss_index import clear_index return clear_index() elif operation == "fingerprint_chunks": # Generate fingerprints for audio chunks from fingerprint_index import generate_chunk_fingerprint import subprocess import json as json_mod audio_path = args["audio_path"] chunk_duration = args.get("chunk_duration", 10.0) chunk_overlap = args.get("chunk_overlap", 5.0) # Get duration result = subprocess.run( ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'json', audio_path], capture_output=True, text=True ) duration = float(json_mod.loads(result.stdout)['format']['duration']) chunks = [] start = 0.0 while start + chunk_duration <= duration: fp = generate_chunk_fingerprint(audio_path, start, chunk_duration) if fp: chunks.append({ "start_time": start, "end_time": start + chunk_duration, "fingerprint": fp }) start += chunk_overlap return { "success": True, "total_duration": duration, "chunk_count": len(chunks), "chunks": chunks } elif operation == "fp_search": # Search fingerprint index from fingerprint_index import search_similar return search_similar( query_fingerprint=args["fingerprint"], k=args.get("k", 10), threshold=args.get("threshold", 0.3) ) elif operation == "fp_stats": # Get fingerprint index statistics from fingerprint_index import get_index_stats return get_index_stats() elif operation == "style_extract": # Extract style features from audio from style_similarity import extract_style_features return extract_style_features( audio_path=args["audio_path"], duration=args.get("duration") ) elif operation == "style_chunks": # Extract chunk-level style features for granular matching from style_similarity import extract_chunk_style_features return extract_chunk_style_features( audio_path=args["audio_path"], chunk_duration=args.get("chunk_duration", 10.0), chunk_overlap=args.get("chunk_overlap", 5.0) ) elif operation == "style_search": # Search style index for similar tracks from style_similarity import search_style_similar return search_style_similar( query_features=args["features"], k=args.get("k", 10), threshold=args.get("threshold", 0.85) ) elif operation == "style_add": # Add tracks to style index from style_similarity import add_to_style_index return add_to_style_index(args.get("entries", [])) elif operation == "style_stats": # Get style index statistics from style_similarity import get_style_index_stats return get_style_index_stats() # === MERT (Music-specific embeddings) === elif operation == "mert_extract": from mert_embeddings import extract_mert_embedding return extract_mert_embedding( audio_path=args["audio_path"], duration=args.get("duration") ) elif operation == "mert_chunks": from mert_embeddings import extract_mert_chunk_embeddings return extract_mert_chunk_embeddings( audio_path=args["audio_path"], chunk_duration=args.get("chunk_duration", 10.0), chunk_overlap=args.get("chunk_overlap", 5.0) ) elif operation == "mert_search": from mert_embeddings import search_mert_similar return search_mert_similar( query_embedding=args["embedding"], k=args.get("k", 10), threshold=args.get("threshold", 0.7), percentile=args.get("percentile"), min_threshold=args.get("min_threshold", 0.88), min_distinctiveness=args.get("min_distinctiveness", 0.05) ) elif operation == "mert_add": from mert_embeddings import add_to_mert_index return add_to_mert_index(args.get("entries", [])) elif operation == "mert_stats": from mert_embeddings import get_mert_index_stats return get_mert_index_stats() elif operation == "mert_clear": from mert_embeddings import clear_mert_index return clear_mert_index() # Audio Quality Scoring elif operation == "audio_quality": from audio_quality import compute_quality_score return compute_quality_score( audio_input=args["audio_path"], duration=args.get("duration", 30.0) ) elif operation == "audio_quality_batch": from audio_quality import batch_score_directory return batch_score_directory( directory=args["directory"], extensions=tuple(args.get("extensions", [".mp3", ".wav", ".flac", ".ogg"])) ) else: return { "success": False, "error": f"Unknown operation: {operation}" } def check_health() -> dict: """Check availability of all ML dependencies.""" status = { "success": True, "demucs": False, "chromaprint": False, "clap": False, "faiss": False, "errors": [] } # Check Demucs try: import demucs status["demucs"] = True status["demucs_version"] = getattr(demucs, "__version__", "unknown") except ImportError as e: status["errors"].append(f"Demucs not available: {e}") # Check Chromaprint (fpcalc CLI) try: import subprocess result = subprocess.run(["fpcalc", "-version"], capture_output=True, text=True) if result.returncode == 0: status["chromaprint"] = True status["chromaprint_version"] = result.stdout.strip() else: status["errors"].append("fpcalc CLI not found or not working") except FileNotFoundError: status["errors"].append("fpcalc CLI not installed (install chromaprint)") except Exception as e: status["errors"].append(f"Chromaprint check failed: {e}") # Check CLAP try: import laion_clap status["clap"] = True except ImportError: try: from transformers import ClapModel status["clap"] = True status["clap_source"] = "transformers" except ImportError as e: status["errors"].append(f"CLAP not available: {e}") # Check FAISS try: import faiss status["faiss"] = True status["faiss_version"] = faiss.__version__ if hasattr(faiss, "__version__") else "unknown" except ImportError as e: status["errors"].append(f"FAISS not available: {e}") if status["errors"]: status["success"] = False return status if __name__ == "__main__": main()