voice / app.py
Pepguy's picture
Update app.py
05a9187 verified
import os
import uuid
import subprocess
import requests
import base64
from urllib.parse import urlparse
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse, HTMLResponse
app = FastAPI()
# Base directory to store task files
BASE_DIR = "tasks"
os.makedirs(BASE_DIR, exist_ok=True)
def add_status(log_list, message):
log_list.append(message)
print(message) # Also log to console
@app.post("/process")
def process_audio(payload: dict):
status_log = []
# Validate input
if "url" not in payload:
raise HTTPException(status_code=400, detail="Missing 'url' in payload")
audio_url = payload["url"]
add_status(status_log, "Received URL from payload.")
# Create a unique task directory
task_id = str(uuid.uuid4())
task_dir = os.path.join(BASE_DIR, task_id)
os.makedirs(task_dir, exist_ok=True)
add_status(status_log, f"Created task directory: {task_dir}")
# Check FFmpeg version
try:
ffmpeg_ver = subprocess.run(
["ffmpeg", "-version"],
capture_output=True,
text=True,
check=True,
)
first_line = ffmpeg_ver.stdout.splitlines()[0]
add_status(status_log, f"FFmpeg version: {first_line}")
except Exception as e:
add_status(status_log, f"Failed to get FFmpeg version: {e}")
return JSONResponse(status_code=500, content={"status": status_log})
# Check Spleeter version
try:
spleeter_ver = subprocess.run(
["spleeter", "--version"],
capture_output=True,
text=True,
check=True,
)
add_status(status_log, f"Spleeter version: {spleeter_ver.stdout.strip()}")
except Exception as e:
add_status(status_log, f"Failed to get Spleeter version: {e}")
return JSONResponse(status_code=500, content={"status": status_log})
# Download the audio file
try:
r = requests.get(audio_url)
r.raise_for_status()
add_status(status_log, "Successfully downloaded audio file.")
except Exception as e:
add_status(status_log, f"Error downloading file: {e}")
return JSONResponse(status_code=400, content={"status": status_log})
# Clean filename: remove query parameters using urlparse
parsed = urlparse(audio_url)
path = parsed.path
ext = os.path.splitext(path)[1] or ".mp3"
input_filename = f"input{ext}"
input_filepath = os.path.join(task_dir, input_filename)
try:
with open(input_filepath, "wb") as f:
f.write(r.content)
add_status(status_log, f"Saved audio file to: {input_filepath}")
except Exception as e:
add_status(status_log, f"Error saving file: {e}")
return JSONResponse(status_code=500, content={"status": status_log})
if not os.path.exists(input_filepath):
add_status(status_log, "Error: Input file does not exist after download.")
return JSONResponse(status_code=500, content={"status": status_log})
# Run Spleeter using the updated syntax:
# spleeter separate -p spleeter:2stems -o <task_dir> <input_filepath>
spleeter_cmd = [
"spleeter", "separate",
"-p", "spleeter:2stems",
"-o", task_dir,
input_filepath
]
add_status(status_log, "Running Spleeter command: " + " ".join(spleeter_cmd))
try:
result = subprocess.run(spleeter_cmd, capture_output=True, text=True, check=True)
add_status(status_log, "Spleeter command output: " + result.stdout)
except subprocess.CalledProcessError as e:
add_status(status_log, "Spleeter processing failed: " + e.stderr)
return JSONResponse(status_code=500, content={"status": status_log})
# Spleeter creates an output folder with the base name of the input file.
base_name = os.path.splitext(input_filename)[0]
output_folder = os.path.join(task_dir, base_name)
vocals_file = os.path.join(output_folder, "vocals.wav")
accompaniment_file = os.path.join(output_folder, "accompaniment.wav")
if not (os.path.exists(vocals_file) and os.path.exists(accompaniment_file)):
add_status(status_log, "Error: Output files not found after processing.")
return JSONResponse(status_code=500, content={"status": status_log})
add_status(status_log, "Spleeter processing completed successfully.")
# Read output files and encode them in base64
try:
with open(vocals_file, "rb") as f:
vocals_data = f.read()
with open(accompaniment_file, "rb") as f:
accomp_data = f.read()
vocals_b64 = base64.b64encode(vocals_data).decode("utf-8")
accomp_b64 = base64.b64encode(accomp_data).decode("utf-8")
except Exception as e:
add_status(status_log, f"Error reading output files: {e}")
return JSONResponse(status_code=500, content={"status": status_log})
return JSONResponse(content={
"task_id": task_id,
"vocals": vocals_b64,
"accompaniment": accomp_b64,
"status": status_log
})
@app.get("/", response_class=HTMLResponse)
def index():
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>Spleeter Processing Test</title>
</head>
<body>
<h1>Spleeter Processing Test</h1>
<form id="spleeterForm">
Audio URL: <input type="text" name="url" size="50"/><br/><br/>
<button type="submit">Process Audio</button>
</form>
<div id="result" style="margin-top:20px;"></div>
<script>
const form = document.getElementById('spleeterForm');
form.onsubmit = async (e) => {
e.preventDefault();
const formData = new FormData(form);
const url = formData.get('url');
const response = await fetch('/process', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ url })
});
const data = await response.json();
let resultHTML = "<h3>Status Log:</h3><ul>";
for (const log of data.status) {
resultHTML += "<li>" + log + "</li>";
}
resultHTML += "</ul>";
if (data.vocals && data.accompaniment) {
const vocalsDataURI = "data:audio/wav;base64," + data.vocals;
const accompDataURI = "data:audio/wav;base64," + data.accompaniment;
resultHTML += "<h3>Vocals</h3>";
resultHTML += "<audio controls src='" + vocalsDataURI + "'></audio>";
resultHTML += "<h3>Accompaniment</h3>";
resultHTML += "<audio controls src='" + accompDataURI + "'></audio>";
}
if(data.task_id) {
resultHTML += `<p>Task ID: ${data.task_id}</p>`;
}
document.getElementById('result').innerHTML = resultHTML;
};
</script>
</body>
</html>
"""
return HTMLResponse(content=html_content)