from flask import Flask, request, jsonify, send_file, Response, send_from_directory from flask_cors import CORS import os import subprocess import tempfile import shutil from werkzeug.utils import secure_filename import uuid from pathlib import Path import time import io app = Flask(__name__, static_folder='dist', static_url_path='') CORS(app) # Configuration ALLOWED_EXTENSIONS = {'mp3', 'wav', 'flac', 'ogg', 'm4a', 'aac'} # Store separated tracks in memory temporarily active_sessions = {} def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def cleanup_old_sessions(): """Clean up sessions older than 1 hour""" current_time = time.time() expired_sessions = [] for session_id, session_data in active_sessions.items(): if session_data.get('created_at', 0) < current_time - 3600: # 1 hour expired_sessions.append(session_id) for session_id in expired_sessions: if session_id in active_sessions: # Clean up temporary directory if it exists temp_dir = active_sessions[session_id].get('temp_dir') if temp_dir and os.path.exists(temp_dir): try: shutil.rmtree(temp_dir) except: pass del active_sessions[session_id] @app.route('/api/separate', methods=['POST']) def separate_audio(): try: # Clean up old sessions cleanup_old_sessions() if 'audio' not in request.files: return jsonify({'error': 'No audio file provided'}), 400 file = request.files['audio'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 if not allowed_file(file.filename): return jsonify({'error': 'Invalid file format'}), 400 # Generate unique ID for this processing session session_id = str(uuid.uuid4()) # Create temporary directory for this session temp_dir = tempfile.mkdtemp(prefix=f"demucs_{session_id}_") try: # Save uploaded file to temp directory with clean filename original_filename = secure_filename(file.filename) # Remove any problematic characters and ensure proper extension base_name = os.path.splitext(original_filename)[0] extension = os.path.splitext(original_filename)[1].lower() # Clean the base name to avoid issues import re clean_base = re.sub(r'[^\w\-_\.]', '_', base_name) filename = f"{clean_base}{extension}" file_path = os.path.join(temp_dir, filename) file.save(file_path) # Add FFmpeg to PATH if it exists in the project directory env = os.environ.copy() ffmpeg_path = os.path.join(os.getcwd(), 'ffmpeg-7.1.1-essentials_build', 'bin') if os.path.exists(ffmpeg_path): env['PATH'] = ffmpeg_path + os.pathsep + env.get('PATH', '') # Run demucs command with output to temp directory output_dir = os.path.join(temp_dir, 'separated') cmd = ['demucs', '--device', 'cpu', '--out', output_dir, file_path] result = subprocess.run(cmd, capture_output=True, text=True, env=env) if result.returncode != 0: # Combine both stdout and stderr for complete error info error_output = "" if result.stdout: error_output += f"STDOUT: {result.stdout}\n" if result.stderr: error_output += f"STDERR: {result.stderr}" # Also log the full error for debugging print(f"Demucs failed with return code {result.returncode}") print(f"Full error output: {error_output}") return jsonify({'error': f'Demucs processing failed with return code {result.returncode}: {error_output}'}), 500 # Find the output directory (Demucs creates subdirectories) base_name = os.path.splitext(filename)[0] track_dir = None # Look for the output directory for model_dir in os.listdir(output_dir): model_path = os.path.join(output_dir, model_dir) if os.path.isdir(model_path): for potential_track_dir in os.listdir(model_path): if potential_track_dir == base_name: track_dir = os.path.join(model_path, potential_track_dir) break if track_dir: break if not track_dir or not os.path.exists(track_dir): return jsonify({'error': 'Output files not found'}), 500 # Read separated tracks into memory tracks = {} track_files = ['vocals.wav', 'drums.wav', 'bass.wav', 'other.wav'] track_data = {} for track_file in track_files: track_path = os.path.join(track_dir, track_file) if os.path.exists(track_path): track_name = track_file.replace('.wav', '') # Read file into memory with open(track_path, 'rb') as f: track_data[track_name] = f.read() # Create a URL that the frontend can use to download the file tracks[track_name] = f'/api/download/{session_id}/{track_name}' if not tracks: return jsonify({'error': 'No separated tracks found'}), 500 # Store session data in memory active_sessions[session_id] = { 'tracks': track_data, 'original_filename': original_filename, # Use original filename for display 'created_at': time.time(), 'temp_dir': temp_dir } return jsonify({ 'success': True, 'tracks': tracks, 'session_id': session_id }) except Exception as e: # Clean up temp directory on error if os.path.exists(temp_dir): shutil.rmtree(temp_dir) return jsonify({'error': f'Processing error: {str(e)}'}), 500 except Exception as e: return jsonify({'error': f'Server error: {str(e)}'}), 500 @app.route('/api/download//') def download_track(session_id, track_name): try: # Check if session exists if session_id not in active_sessions: return jsonify({'error': 'Session not found or expired'}), 404 session_data = active_sessions[session_id] # Check if track exists if track_name not in session_data['tracks']: return jsonify({'error': 'Track not found'}), 404 # Get track data from memory track_data = session_data['tracks'][track_name] original_filename = session_data['original_filename'] # Create a file-like object from the binary data track_io = io.BytesIO(track_data) # Generate download filename base_name = os.path.splitext(original_filename)[0] download_filename = f"{base_name}_{track_name}.wav" return send_file( track_io, as_attachment=True, download_name=download_filename, mimetype='audio/wav' ) except Exception as e: return jsonify({'error': f'Download error: {str(e)}'}), 500 @app.route('/api/cleanup/', methods=['POST']) def cleanup_session(session_id): """Manually clean up a session when user is done""" try: if session_id in active_sessions: # Clean up temporary directory temp_dir = active_sessions[session_id].get('temp_dir') if temp_dir and os.path.exists(temp_dir): shutil.rmtree(temp_dir) # Remove from active sessions del active_sessions[session_id] return jsonify({'success': True, 'message': 'Session cleaned up'}) else: return jsonify({'error': 'Session not found'}), 404 except Exception as e: return jsonify({'error': f'Cleanup error: {str(e)}'}), 500 @app.route('/api/health') def health_check(): active_count = len(active_sessions) return jsonify({ 'status': 'healthy', 'message': 'Demucs API is running', 'active_sessions': active_count }) # Serve React App @app.route('/') def serve_react_app(): return send_from_directory(app.static_folder, 'index.html') @app.route('/') def serve_static_files(path): if path != "" and os.path.exists(os.path.join(app.static_folder, path)): return send_from_directory(app.static_folder, path) else: return send_from_directory(app.static_folder, 'index.html') if __name__ == '__main__': import os port = int(os.environ.get('PORT', 7860)) # Hugging Face Spaces uses port 7860 print(f"Starting Demucs API server on port {port}...") print("Make sure you have Demucs installed: pip install demucs") app.run(debug=False, host='0.0.0.0', port=port)