aimusic-attribution / ml /processor.py
emresar's picture
Upload folder using huggingface_hub
6678fa1 verified
#!/usr/bin/env python3
"""
AI Music Attribution - ML Processor
Main entry point for audio processing operations.
Called from Node.js via subprocess.
Usage:
python processor.py <operation> <json_args>
Operations:
- separate: Stem separation using Demucs
- fingerprint: Audio fingerprinting using Chromaprint
- embed: Generate embeddings using CLAP
- process_all: Run full pipeline (separate -> fingerprint -> embed)
Output:
JSON result to stdout
"""
import os
# Fix OpenMP conflicts between torch and faiss
# Must be set before importing any ML libraries
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
os.environ['TRANSFORMERS_VERBOSITY'] = 'error'
os.environ['HF_HUB_DISABLE_PROGRESS_BARS'] = '1'
import sys
import json
import warnings
warnings.filterwarnings('ignore')
from pathlib import Path
# Add ml directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
def main():
if len(sys.argv) < 3:
print(json.dumps({
"success": False,
"error": "Usage: python processor.py <operation> <json_args>"
}))
sys.exit(1)
operation = sys.argv[1]
try:
args = json.loads(sys.argv[2])
except json.JSONDecodeError as e:
print(json.dumps({
"success": False,
"error": f"Invalid JSON args: {str(e)}"
}))
sys.exit(1)
try:
result = dispatch_operation(operation, args)
print(json.dumps(result))
except Exception as e:
print(json.dumps({
"success": False,
"error": str(e),
"operation": operation
}))
sys.exit(1)
def dispatch_operation(operation: str, args: dict) -> dict:
"""Route to appropriate handler based on operation type."""
if operation == "separate":
from stem_separation import separate_stems
return separate_stems(
input_path=args["input_path"],
output_dir=args["output_dir"],
model=args.get("model", "htdemucs")
)
elif operation == "fingerprint":
from fingerprinting import generate_fingerprint
return generate_fingerprint(
audio_path=args["audio_path"]
)
elif operation == "embed":
from embeddings import generate_embedding
return generate_embedding(
audio_path=args["audio_path"],
model=args.get("model", "laion/larger_clap_music")
)
elif operation == "embed_chunks":
# Generate chunk-based embeddings for attribution
from embeddings import generate_chunk_embeddings
return generate_chunk_embeddings(
audio_path=args["audio_path"],
chunk_duration=args.get("chunk_duration", 10.0),
chunk_overlap=args.get("chunk_overlap", 5.0),
model=args.get("model", "laion/larger_clap_music")
)
elif operation == "process_all":
# Full pipeline: separate -> fingerprint each stem -> embed each stem
from stem_separation import separate_stems
from fingerprinting import generate_fingerprint
from embeddings import generate_embedding
input_path = args["input_path"]
output_dir = args["output_dir"]
# Step 1: Separate stems
separation_result = separate_stems(input_path, output_dir)
if not separation_result["success"]:
return separation_result
stems = separation_result["stems"]
results = {
"success": True,
"stems": []
}
# Step 2 & 3: Process each stem
for stem in stems:
stem_path = stem["path"]
# Generate fingerprint
fp_result = generate_fingerprint(stem_path)
# Generate embedding
embed_result = generate_embedding(stem_path)
results["stems"].append({
"type": stem["type"],
"path": stem["path"],
"duration": stem.get("duration"),
"fingerprint": fp_result.get("fingerprint") if fp_result["success"] else None,
"fingerprint_error": fp_result.get("error"),
"embedding": embed_result.get("embedding") if embed_result["success"] else None,
"embedding_model": embed_result.get("model"),
"embedding_error": embed_result.get("error")
})
return results
elif operation == "health":
# Check if all dependencies are available
return check_health()
elif operation == "faiss_add":
# Add embeddings to FAISS index
from faiss_index import add_embeddings
return add_embeddings(args.get("embeddings", []))
elif operation == "faiss_search":
# Search FAISS index for similar embeddings
from faiss_index import search_similar
return search_similar(
query_embedding=args["embedding"],
k=args.get("k", 10),
threshold=args.get("threshold", 0.5)
)
elif operation == "faiss_stats":
# Get FAISS index statistics
from faiss_index import get_index_stats
return get_index_stats()
elif operation == "faiss_clear":
# Clear FAISS index
from faiss_index import clear_index
return clear_index()
elif operation == "fingerprint_chunks":
# Generate fingerprints for audio chunks
from fingerprint_index import generate_chunk_fingerprint
import subprocess
import json as json_mod
audio_path = args["audio_path"]
chunk_duration = args.get("chunk_duration", 10.0)
chunk_overlap = args.get("chunk_overlap", 5.0)
# Get duration
result = subprocess.run(
['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'json', audio_path],
capture_output=True, text=True
)
duration = float(json_mod.loads(result.stdout)['format']['duration'])
chunks = []
start = 0.0
while start + chunk_duration <= duration:
fp = generate_chunk_fingerprint(audio_path, start, chunk_duration)
if fp:
chunks.append({
"start_time": start,
"end_time": start + chunk_duration,
"fingerprint": fp
})
start += chunk_overlap
return {
"success": True,
"total_duration": duration,
"chunk_count": len(chunks),
"chunks": chunks
}
elif operation == "fp_search":
# Search fingerprint index
from fingerprint_index import search_similar
return search_similar(
query_fingerprint=args["fingerprint"],
k=args.get("k", 10),
threshold=args.get("threshold", 0.3)
)
elif operation == "fp_stats":
# Get fingerprint index statistics
from fingerprint_index import get_index_stats
return get_index_stats()
elif operation == "style_extract":
# Extract style features from audio
from style_similarity import extract_style_features
return extract_style_features(
audio_path=args["audio_path"],
duration=args.get("duration")
)
elif operation == "style_chunks":
# Extract chunk-level style features for granular matching
from style_similarity import extract_chunk_style_features
return extract_chunk_style_features(
audio_path=args["audio_path"],
chunk_duration=args.get("chunk_duration", 10.0),
chunk_overlap=args.get("chunk_overlap", 5.0)
)
elif operation == "style_search":
# Search style index for similar tracks
from style_similarity import search_style_similar
return search_style_similar(
query_features=args["features"],
k=args.get("k", 10),
threshold=args.get("threshold", 0.85)
)
elif operation == "style_add":
# Add tracks to style index
from style_similarity import add_to_style_index
return add_to_style_index(args.get("entries", []))
elif operation == "style_stats":
# Get style index statistics
from style_similarity import get_style_index_stats
return get_style_index_stats()
# === MERT (Music-specific embeddings) ===
elif operation == "mert_extract":
from mert_embeddings import extract_mert_embedding
return extract_mert_embedding(
audio_path=args["audio_path"],
duration=args.get("duration")
)
elif operation == "mert_chunks":
from mert_embeddings import extract_mert_chunk_embeddings
return extract_mert_chunk_embeddings(
audio_path=args["audio_path"],
chunk_duration=args.get("chunk_duration", 10.0),
chunk_overlap=args.get("chunk_overlap", 5.0)
)
elif operation == "mert_search":
from mert_embeddings import search_mert_similar
return search_mert_similar(
query_embedding=args["embedding"],
k=args.get("k", 10),
threshold=args.get("threshold", 0.7),
percentile=args.get("percentile"),
min_threshold=args.get("min_threshold", 0.88),
min_distinctiveness=args.get("min_distinctiveness", 0.05)
)
elif operation == "mert_add":
from mert_embeddings import add_to_mert_index
return add_to_mert_index(args.get("entries", []))
elif operation == "mert_stats":
from mert_embeddings import get_mert_index_stats
return get_mert_index_stats()
elif operation == "mert_clear":
from mert_embeddings import clear_mert_index
return clear_mert_index()
# Audio Quality Scoring
elif operation == "audio_quality":
from audio_quality import compute_quality_score
return compute_quality_score(
audio_input=args["audio_path"],
duration=args.get("duration", 30.0)
)
elif operation == "audio_quality_batch":
from audio_quality import batch_score_directory
return batch_score_directory(
directory=args["directory"],
extensions=tuple(args.get("extensions", [".mp3", ".wav", ".flac", ".ogg"]))
)
else:
return {
"success": False,
"error": f"Unknown operation: {operation}"
}
def check_health() -> dict:
"""Check availability of all ML dependencies."""
status = {
"success": True,
"demucs": False,
"chromaprint": False,
"clap": False,
"faiss": False,
"errors": []
}
# Check Demucs
try:
import demucs
status["demucs"] = True
status["demucs_version"] = getattr(demucs, "__version__", "unknown")
except ImportError as e:
status["errors"].append(f"Demucs not available: {e}")
# Check Chromaprint (fpcalc CLI)
try:
import subprocess
result = subprocess.run(["fpcalc", "-version"], capture_output=True, text=True)
if result.returncode == 0:
status["chromaprint"] = True
status["chromaprint_version"] = result.stdout.strip()
else:
status["errors"].append("fpcalc CLI not found or not working")
except FileNotFoundError:
status["errors"].append("fpcalc CLI not installed (install chromaprint)")
except Exception as e:
status["errors"].append(f"Chromaprint check failed: {e}")
# Check CLAP
try:
import laion_clap
status["clap"] = True
except ImportError:
try:
from transformers import ClapModel
status["clap"] = True
status["clap_source"] = "transformers"
except ImportError as e:
status["errors"].append(f"CLAP not available: {e}")
# Check FAISS
try:
import faiss
status["faiss"] = True
status["faiss_version"] = faiss.__version__ if hasattr(faiss, "__version__") else "unknown"
except ImportError as e:
status["errors"].append(f"FAISS not available: {e}")
if status["errors"]:
status["success"] = False
return status
if __name__ == "__main__":
main()