import os import uuid import subprocess import requests import base64 from urllib.parse import urlparse from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse, HTMLResponse app = FastAPI() # Base directory to store task files BASE_DIR = "tasks" os.makedirs(BASE_DIR, exist_ok=True) def add_status(log_list, message): log_list.append(message) print(message) # Also log to console @app.post("/process") def process_audio(payload: dict): status_log = [] # Validate input if "url" not in payload: raise HTTPException(status_code=400, detail="Missing 'url' in payload") audio_url = payload["url"] add_status(status_log, "Received URL from payload.") # Create a unique task directory task_id = str(uuid.uuid4()) task_dir = os.path.join(BASE_DIR, task_id) os.makedirs(task_dir, exist_ok=True) add_status(status_log, f"Created task directory: {task_dir}") # Check FFmpeg version try: ffmpeg_ver = subprocess.run( ["ffmpeg", "-version"], capture_output=True, text=True, check=True, ) first_line = ffmpeg_ver.stdout.splitlines()[0] add_status(status_log, f"FFmpeg version: {first_line}") except Exception as e: add_status(status_log, f"Failed to get FFmpeg version: {e}") return JSONResponse(status_code=500, content={"status": status_log}) # Check Spleeter version try: spleeter_ver = subprocess.run( ["spleeter", "--version"], capture_output=True, text=True, check=True, ) add_status(status_log, f"Spleeter version: {spleeter_ver.stdout.strip()}") except Exception as e: add_status(status_log, f"Failed to get Spleeter version: {e}") return JSONResponse(status_code=500, content={"status": status_log}) # Download the audio file try: r = requests.get(audio_url) r.raise_for_status() add_status(status_log, "Successfully downloaded audio file.") except Exception as e: add_status(status_log, f"Error downloading file: {e}") return JSONResponse(status_code=400, content={"status": status_log}) # Clean filename: remove query parameters using urlparse parsed = urlparse(audio_url) path = parsed.path ext = os.path.splitext(path)[1] or ".mp3" input_filename = f"input{ext}" input_filepath = os.path.join(task_dir, input_filename) try: with open(input_filepath, "wb") as f: f.write(r.content) add_status(status_log, f"Saved audio file to: {input_filepath}") except Exception as e: add_status(status_log, f"Error saving file: {e}") return JSONResponse(status_code=500, content={"status": status_log}) if not os.path.exists(input_filepath): add_status(status_log, "Error: Input file does not exist after download.") return JSONResponse(status_code=500, content={"status": status_log}) # Run Spleeter using the updated syntax: # spleeter separate -p spleeter:2stems -o spleeter_cmd = [ "spleeter", "separate", "-p", "spleeter:2stems", "-o", task_dir, input_filepath ] add_status(status_log, "Running Spleeter command: " + " ".join(spleeter_cmd)) try: result = subprocess.run(spleeter_cmd, capture_output=True, text=True, check=True) add_status(status_log, "Spleeter command output: " + result.stdout) except subprocess.CalledProcessError as e: add_status(status_log, "Spleeter processing failed: " + e.stderr) return JSONResponse(status_code=500, content={"status": status_log}) # Spleeter creates an output folder with the base name of the input file. base_name = os.path.splitext(input_filename)[0] output_folder = os.path.join(task_dir, base_name) vocals_file = os.path.join(output_folder, "vocals.wav") accompaniment_file = os.path.join(output_folder, "accompaniment.wav") if not (os.path.exists(vocals_file) and os.path.exists(accompaniment_file)): add_status(status_log, "Error: Output files not found after processing.") return JSONResponse(status_code=500, content={"status": status_log}) add_status(status_log, "Spleeter processing completed successfully.") # Read output files and encode them in base64 try: with open(vocals_file, "rb") as f: vocals_data = f.read() with open(accompaniment_file, "rb") as f: accomp_data = f.read() vocals_b64 = base64.b64encode(vocals_data).decode("utf-8") accomp_b64 = base64.b64encode(accomp_data).decode("utf-8") except Exception as e: add_status(status_log, f"Error reading output files: {e}") return JSONResponse(status_code=500, content={"status": status_log}) return JSONResponse(content={ "task_id": task_id, "vocals": vocals_b64, "accompaniment": accomp_b64, "status": status_log }) @app.get("/", response_class=HTMLResponse) def index(): html_content = """ Spleeter Processing Test

Spleeter Processing Test

Audio URL:

""" return HTMLResponse(content=html_content)