Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| #!/usr/bin/env python3 | |
| """ | |
| AI Music Attribution - ML Processor | |
| Main entry point for audio processing operations. | |
| Called from Node.js via subprocess. | |
| Usage: | |
| python processor.py <operation> <json_args> | |
| Operations: | |
| - separate: Stem separation using Demucs | |
| - fingerprint: Audio fingerprinting using Chromaprint | |
| - embed: Generate embeddings using CLAP | |
| - process_all: Run full pipeline (separate -> fingerprint -> embed) | |
| Output: | |
| JSON result to stdout | |
| """ | |
| import os | |
| # Fix OpenMP conflicts between torch and faiss | |
| # Must be set before importing any ML libraries | |
| os.environ['OMP_NUM_THREADS'] = '1' | |
| os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE' | |
| os.environ['TRANSFORMERS_VERBOSITY'] = 'error' | |
| os.environ['HF_HUB_DISABLE_PROGRESS_BARS'] = '1' | |
| import sys | |
| import json | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| from pathlib import Path | |
| # Add ml directory to path for imports | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| def main(): | |
| if len(sys.argv) < 3: | |
| print(json.dumps({ | |
| "success": False, | |
| "error": "Usage: python processor.py <operation> <json_args>" | |
| })) | |
| sys.exit(1) | |
| operation = sys.argv[1] | |
| try: | |
| args = json.loads(sys.argv[2]) | |
| except json.JSONDecodeError as e: | |
| print(json.dumps({ | |
| "success": False, | |
| "error": f"Invalid JSON args: {str(e)}" | |
| })) | |
| sys.exit(1) | |
| try: | |
| result = dispatch_operation(operation, args) | |
| print(json.dumps(result)) | |
| except Exception as e: | |
| print(json.dumps({ | |
| "success": False, | |
| "error": str(e), | |
| "operation": operation | |
| })) | |
| sys.exit(1) | |
| def dispatch_operation(operation: str, args: dict) -> dict: | |
| """Route to appropriate handler based on operation type.""" | |
| if operation == "separate": | |
| from stem_separation import separate_stems | |
| return separate_stems( | |
| input_path=args["input_path"], | |
| output_dir=args["output_dir"], | |
| model=args.get("model", "htdemucs") | |
| ) | |
| elif operation == "fingerprint": | |
| from fingerprinting import generate_fingerprint | |
| return generate_fingerprint( | |
| audio_path=args["audio_path"] | |
| ) | |
| elif operation == "embed": | |
| from embeddings import generate_embedding | |
| return generate_embedding( | |
| audio_path=args["audio_path"], | |
| model=args.get("model", "laion/larger_clap_music") | |
| ) | |
| elif operation == "embed_chunks": | |
| # Generate chunk-based embeddings for attribution | |
| from embeddings import generate_chunk_embeddings | |
| return generate_chunk_embeddings( | |
| audio_path=args["audio_path"], | |
| chunk_duration=args.get("chunk_duration", 10.0), | |
| chunk_overlap=args.get("chunk_overlap", 5.0), | |
| model=args.get("model", "laion/larger_clap_music") | |
| ) | |
| elif operation == "process_all": | |
| # Full pipeline: separate -> fingerprint each stem -> embed each stem | |
| from stem_separation import separate_stems | |
| from fingerprinting import generate_fingerprint | |
| from embeddings import generate_embedding | |
| input_path = args["input_path"] | |
| output_dir = args["output_dir"] | |
| # Step 1: Separate stems | |
| separation_result = separate_stems(input_path, output_dir) | |
| if not separation_result["success"]: | |
| return separation_result | |
| stems = separation_result["stems"] | |
| results = { | |
| "success": True, | |
| "stems": [] | |
| } | |
| # Step 2 & 3: Process each stem | |
| for stem in stems: | |
| stem_path = stem["path"] | |
| # Generate fingerprint | |
| fp_result = generate_fingerprint(stem_path) | |
| # Generate embedding | |
| embed_result = generate_embedding(stem_path) | |
| results["stems"].append({ | |
| "type": stem["type"], | |
| "path": stem["path"], | |
| "duration": stem.get("duration"), | |
| "fingerprint": fp_result.get("fingerprint") if fp_result["success"] else None, | |
| "fingerprint_error": fp_result.get("error"), | |
| "embedding": embed_result.get("embedding") if embed_result["success"] else None, | |
| "embedding_model": embed_result.get("model"), | |
| "embedding_error": embed_result.get("error") | |
| }) | |
| return results | |
| elif operation == "health": | |
| # Check if all dependencies are available | |
| return check_health() | |
| elif operation == "faiss_add": | |
| # Add embeddings to FAISS index | |
| from faiss_index import add_embeddings | |
| return add_embeddings(args.get("embeddings", [])) | |
| elif operation == "faiss_search": | |
| # Search FAISS index for similar embeddings | |
| from faiss_index import search_similar | |
| return search_similar( | |
| query_embedding=args["embedding"], | |
| k=args.get("k", 10), | |
| threshold=args.get("threshold", 0.5) | |
| ) | |
| elif operation == "faiss_stats": | |
| # Get FAISS index statistics | |
| from faiss_index import get_index_stats | |
| return get_index_stats() | |
| elif operation == "faiss_clear": | |
| # Clear FAISS index | |
| from faiss_index import clear_index | |
| return clear_index() | |
| elif operation == "fingerprint_chunks": | |
| # Generate fingerprints for audio chunks | |
| from fingerprint_index import generate_chunk_fingerprint | |
| import subprocess | |
| import json as json_mod | |
| audio_path = args["audio_path"] | |
| chunk_duration = args.get("chunk_duration", 10.0) | |
| chunk_overlap = args.get("chunk_overlap", 5.0) | |
| # Get duration | |
| result = subprocess.run( | |
| ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'json', audio_path], | |
| capture_output=True, text=True | |
| ) | |
| duration = float(json_mod.loads(result.stdout)['format']['duration']) | |
| chunks = [] | |
| start = 0.0 | |
| while start + chunk_duration <= duration: | |
| fp = generate_chunk_fingerprint(audio_path, start, chunk_duration) | |
| if fp: | |
| chunks.append({ | |
| "start_time": start, | |
| "end_time": start + chunk_duration, | |
| "fingerprint": fp | |
| }) | |
| start += chunk_overlap | |
| return { | |
| "success": True, | |
| "total_duration": duration, | |
| "chunk_count": len(chunks), | |
| "chunks": chunks | |
| } | |
| elif operation == "fp_search": | |
| # Search fingerprint index | |
| from fingerprint_index import search_similar | |
| return search_similar( | |
| query_fingerprint=args["fingerprint"], | |
| k=args.get("k", 10), | |
| threshold=args.get("threshold", 0.3) | |
| ) | |
| elif operation == "fp_stats": | |
| # Get fingerprint index statistics | |
| from fingerprint_index import get_index_stats | |
| return get_index_stats() | |
| elif operation == "style_extract": | |
| # Extract style features from audio | |
| from style_similarity import extract_style_features | |
| return extract_style_features( | |
| audio_path=args["audio_path"], | |
| duration=args.get("duration") | |
| ) | |
| elif operation == "style_chunks": | |
| # Extract chunk-level style features for granular matching | |
| from style_similarity import extract_chunk_style_features | |
| return extract_chunk_style_features( | |
| audio_path=args["audio_path"], | |
| chunk_duration=args.get("chunk_duration", 10.0), | |
| chunk_overlap=args.get("chunk_overlap", 5.0) | |
| ) | |
| elif operation == "style_search": | |
| # Search style index for similar tracks | |
| from style_similarity import search_style_similar | |
| return search_style_similar( | |
| query_features=args["features"], | |
| k=args.get("k", 10), | |
| threshold=args.get("threshold", 0.85) | |
| ) | |
| elif operation == "style_add": | |
| # Add tracks to style index | |
| from style_similarity import add_to_style_index | |
| return add_to_style_index(args.get("entries", [])) | |
| elif operation == "style_stats": | |
| # Get style index statistics | |
| from style_similarity import get_style_index_stats | |
| return get_style_index_stats() | |
| # === MERT (Music-specific embeddings) === | |
| elif operation == "mert_extract": | |
| from mert_embeddings import extract_mert_embedding | |
| return extract_mert_embedding( | |
| audio_path=args["audio_path"], | |
| duration=args.get("duration") | |
| ) | |
| elif operation == "mert_chunks": | |
| from mert_embeddings import extract_mert_chunk_embeddings | |
| return extract_mert_chunk_embeddings( | |
| audio_path=args["audio_path"], | |
| chunk_duration=args.get("chunk_duration", 10.0), | |
| chunk_overlap=args.get("chunk_overlap", 5.0) | |
| ) | |
| elif operation == "mert_search": | |
| from mert_embeddings import search_mert_similar | |
| return search_mert_similar( | |
| query_embedding=args["embedding"], | |
| k=args.get("k", 10), | |
| threshold=args.get("threshold", 0.7), | |
| percentile=args.get("percentile"), | |
| min_threshold=args.get("min_threshold", 0.88), | |
| min_distinctiveness=args.get("min_distinctiveness", 0.05) | |
| ) | |
| elif operation == "mert_add": | |
| from mert_embeddings import add_to_mert_index | |
| return add_to_mert_index(args.get("entries", [])) | |
| elif operation == "mert_stats": | |
| from mert_embeddings import get_mert_index_stats | |
| return get_mert_index_stats() | |
| elif operation == "mert_clear": | |
| from mert_embeddings import clear_mert_index | |
| return clear_mert_index() | |
| # Audio Quality Scoring | |
| elif operation == "audio_quality": | |
| from audio_quality import compute_quality_score | |
| return compute_quality_score( | |
| audio_input=args["audio_path"], | |
| duration=args.get("duration", 30.0) | |
| ) | |
| elif operation == "audio_quality_batch": | |
| from audio_quality import batch_score_directory | |
| return batch_score_directory( | |
| directory=args["directory"], | |
| extensions=tuple(args.get("extensions", [".mp3", ".wav", ".flac", ".ogg"])) | |
| ) | |
| else: | |
| return { | |
| "success": False, | |
| "error": f"Unknown operation: {operation}" | |
| } | |
| def check_health() -> dict: | |
| """Check availability of all ML dependencies.""" | |
| status = { | |
| "success": True, | |
| "demucs": False, | |
| "chromaprint": False, | |
| "clap": False, | |
| "faiss": False, | |
| "errors": [] | |
| } | |
| # Check Demucs | |
| try: | |
| import demucs | |
| status["demucs"] = True | |
| status["demucs_version"] = getattr(demucs, "__version__", "unknown") | |
| except ImportError as e: | |
| status["errors"].append(f"Demucs not available: {e}") | |
| # Check Chromaprint (fpcalc CLI) | |
| try: | |
| import subprocess | |
| result = subprocess.run(["fpcalc", "-version"], capture_output=True, text=True) | |
| if result.returncode == 0: | |
| status["chromaprint"] = True | |
| status["chromaprint_version"] = result.stdout.strip() | |
| else: | |
| status["errors"].append("fpcalc CLI not found or not working") | |
| except FileNotFoundError: | |
| status["errors"].append("fpcalc CLI not installed (install chromaprint)") | |
| except Exception as e: | |
| status["errors"].append(f"Chromaprint check failed: {e}") | |
| # Check CLAP | |
| try: | |
| import laion_clap | |
| status["clap"] = True | |
| except ImportError: | |
| try: | |
| from transformers import ClapModel | |
| status["clap"] = True | |
| status["clap_source"] = "transformers" | |
| except ImportError as e: | |
| status["errors"].append(f"CLAP not available: {e}") | |
| # Check FAISS | |
| try: | |
| import faiss | |
| status["faiss"] = True | |
| status["faiss_version"] = faiss.__version__ if hasattr(faiss, "__version__") else "unknown" | |
| except ImportError as e: | |
| status["errors"].append(f"FAISS not available: {e}") | |
| if status["errors"]: | |
| status["success"] = False | |
| return status | |
| if __name__ == "__main__": | |
| main() | |