SUNO-API-2 / app.py
MySafeCode's picture
Update app.py
a74cea0 verified
raw
history blame
19.2 kB
import gradio as gr
import requests
import os
import time
import json
import threading
# Suno API key
SUNO_KEY = os.environ.get("SunoKey", "")
if not SUNO_KEY:
print("⚠️ SunoKey not set!")
# Store task info for auto-fill and polling
current_task_info = {}
polling_active = {}
def get_audio_files(task_id):
"""Get audio files from Suno task ID"""
if not SUNO_KEY:
return "❌ Error: SunoKey not configured", []
if not task_id.strip():
return "❌ Please enter a Task ID", []
try:
resp = requests.get(
"https://api.sunoapi.org/api/v1/generate/record-info",
headers={"Authorization": f"Bearer {SUNO_KEY}"},
params={"taskId": task_id.strip()},
timeout=30
)
if resp.status_code != 200:
return f"❌ Request failed: HTTP {resp.status_code}", []
data = resp.json()
if data.get("code") != 200:
return f"❌ API error: {data.get('msg', 'Unknown')}", []
# Check status
status = data.get("data", {}).get("status", "UNKNOWN")
if status != "SUCCESS":
return f"⏳ Task status: {status}. Wait for generation to complete.", []
# Get audio files
suno_data = data.get("data", {}).get("response", {}).get("sunoData", [])
if not suno_data:
return "❌ No audio files found", []
# Create dropdown options
audio_options = []
for i, audio in enumerate(suno_data):
audio_id = audio.get("id", f"audio_{i}")
title = audio.get("title", f"Track {i+1}")
duration = audio.get("duration", 0)
prompt = audio.get("prompt", "")[:50]
display = f"{i+1}. {title} ({duration:.1f}s)"
if prompt:
display += f" - {prompt}..."
audio_options.append((display, audio_id))
return f"✅ Found {len(audio_options)} audio file(s)", audio_options
except Exception as e:
return f"❌ Error: {str(e)}", []
def submit_separation_task(task_id, audio_id, separation_type):
"""Submit separation task"""
try:
resp = requests.post(
"https://api.sunoapi.org/api/v1/vocal-removal/generate",
json={
"taskId": task_id,
"audioId": audio_id,
"type": separation_type,
"callBackUrl": "https://1hit.no/callback.php"
},
headers={
"Authorization": f"Bearer {SUNO_KEY}",
"Content-Type": "application/json"
},
timeout=30
)
if resp.status_code == 200:
data = resp.json()
# Get separation task ID
separation_task_id = None
if "taskId" in data:
separation_task_id = data["taskId"]
elif data.get("code") == 200 and "data" in data and "taskId" in data["data"]:
separation_task_id = data["data"]["taskId"]
if separation_task_id:
# Store for auto-fill
current_task_info["separation_task_id"] = separation_task_id
current_task_info["original_task_id"] = task_id
current_task_info["audio_id"] = audio_id
current_task_info["separation_type"] = separation_type
current_task_info["start_time"] = time.time()
current_task_info["attempt"] = 0
return f"✅ Task submitted!\n\n**Separation Task ID:** `{separation_task_id}`\n\n⏳ Starting auto-polling...", separation_task_id
else:
return f"❌ No task ID in response:\n{json.dumps(data, indent=2)}", None
else:
return f"❌ HTTP Error {resp.status_code}:\n{resp.text}", None
except Exception as e:
return f"❌ Error: {str(e)}", None
def poll_separation_status(separation_task_id):
"""Poll separation task status"""
try:
resp = requests.get(
"https://api.sunoapi.org/api/v1/vocal-removal/record-info",
headers={"Authorization": f"Bearer {SUNO_KEY}"},
params={"taskId": separation_task_id},
timeout=30
)
if resp.status_code == 200:
data = resp.json()
# Get status
status = "UNKNOWN"
if "status" in data:
status = data["status"]
elif "data" in data and "status" in data["data"]:
status = data["data"]["status"]
elif data.get("code") == 200:
status = "SUCCESS"
# Get results
results = {}
if "vocal_removal_info" in data:
results = data["vocal_removal_info"]
elif "data" in data and "vocal_removal_info" in data["data"]:
results = data["data"]["vocal_removal_info"]
return status, results, None
else:
return "ERROR", {}, f"HTTP {resp.status_code}: {resp.text}"
except Exception as e:
return "ERROR", {}, str(e)
def format_download_links(results, separation_task_id):
"""Format download links from results"""
if not results:
return "No download links found"
output = "## 🎵 Download Links\n\n"
# 2-stem separation
if "vocal_url" in results or "instrumental_url" in results:
if results.get("vocal_url"):
output += f"**🎤 Vocals:** [Download MP3]({results['vocal_url']})\n"
if results.get("instrumental_url"):
output += f"**🎵 Instrumental:** [Download MP3]({results['instrumental_url']})\n"
# 12-stem separation
stem_fields = [
("backing_vocals_url", "🎤 Backing Vocals"),
("bass_url", "🎸 Bass"),
("brass_url", "🎺 Brass"),
("drums_url", "🥁 Drums"),
("fx_url", "🎛️ FX/Other"),
("guitar_url", "🎸 Guitar"),
("keyboard_url", "🎹 Keyboard"),
("percussion_url", "🪘 Percussion"),
("strings_url", "🎻 Strings"),
("synth_url", "🎹 Synth"),
("woodwinds_url", "🎷 Woodwinds"),
("vocal_url", "🎤 Vocals"),
("instrumental_url", "🎵 Instrumental"),
]
for field, name in stem_fields:
if results.get(field):
output += f"**{name}:** [Download MP3]({results[field]})\n"
output += f"\n**🔗 Viewer:** [Open in Viewer](https://1hit.no/viewer.php?task_id={separation_task_id})"
return output
# Manual polling function
def manual_poll(task_id):
if not task_id:
return "❌ Enter Task ID"
status, results, error = poll_separation_status(task_id)
if error:
return f"❌ Error: {error}"
if status == "SUCCESS":
if results:
output = f"✅ **Complete!**\n\n"
output += f"**Task ID:** `{task_id}`\n\n"
output += format_download_links(results, task_id)
else:
output = f"✅ **Complete** (no direct links)\n\n"
output += f"**Task ID:** `{task_id}`\n\n"
output += f"Check: [Viewer](https://1hit.no/viewer.php?task_id={task_id})"
elif status in ["PENDING", "PROCESSING", "RUNNING"]:
output = f"⏳ **Status:** {status}\n\n"
output += f"**Task ID:** `{task_id}`\n\n"
output += "Still processing. Try again in 30 seconds."
elif status == "FAILED":
output = f"❌ **Failed**\n\n"
output += f"**Task ID:** `{task_id}`\n"
output += f"**Status:** {status}"
else:
output = f"🔄 **Status:** {status}\n\n"
output += f"**Task ID:** `{task_id}`"
return output
# Create the app
with gr.Blocks() as app:
gr.Markdown("# 🎵 Suno Stem Separator")
with gr.Row():
# Left column: Input and control
with gr.Column(scale=1):
# Step 1: Get audio files
with gr.Group():
gr.Markdown("### 1. Get Audio Files")
original_task_id = gr.Textbox(
label="Original Task ID",
placeholder="Enter Suno generation task ID",
info="From your Suno history"
)
get_audio_btn = gr.Button("📥 Get Audio Files", variant="secondary")
audio_status = gr.Markdown("Enter Task ID above")
# Step 2: Select audio file
with gr.Group():
gr.Markdown("### 2. Select Audio File")
audio_dropdown = gr.Dropdown(
label="Select Audio",
choices=[],
interactive=True,
visible=False
)
# Step 3: Start separation
with gr.Group():
gr.Markdown("### 3. Start Separation")
separation_type = gr.Radio(
label="Separation Type",
choices=[
("🎤 Vocals Only (1 credit)", "separate_vocal"),
("🎛️ Full Stems (5 credits)", "split_stem")
],
value="separate_vocal",
visible=False
)
submit_btn = gr.Button("🚀 Start Separation", variant="primary", visible=False)
submission_output = gr.Markdown("Select audio file first", visible=False)
# Right column: Results and polling
with gr.Column(scale=2):
# Auto-polling section
with gr.Group():
gr.Markdown("### 4. Auto-Polling Status")
auto_poll_status = gr.Markdown("Waiting for task submission...")
auto_poll_progress = gr.Slider(
minimum=0,
maximum=60,
value=0,
label="Polling attempts",
interactive=False,
visible=False
)
stop_poll_btn = gr.Button("⏹️ Stop Auto-Polling", variant="stop", visible=False)
# Manual polling section
with gr.Group():
gr.Markdown("### 5. Manual Polling")
poll_task_id = gr.Textbox(
label="Separation Task ID",
placeholder="Will auto-fill from current task"
)
poll_btn = gr.Button("🔍 Check Status", variant="secondary")
poll_output = gr.Markdown("Enter Task ID to check")
# Results section
with gr.Group():
gr.Markdown("### 6. Download Links")
download_output = gr.Markdown("Results will appear here")
# Viewer link
with gr.Group():
gr.Markdown("### 7. Viewer")
viewer_link = gr.Markdown(
"[Open Viewer](https://1hit.no/viewer.php)",
elem_id="viewer_link"
)
# Store current separation task ID for auto-polling
current_separation_task_id = gr.State(value="")
polling_active_flag = gr.State(value=False)
polling_thread = None
# Step 1: Get audio files
def on_get_audio(task_id):
if not task_id:
return "❌ Enter Task ID", gr.Dropdown(choices=[], visible=False), gr.Radio(visible=False), gr.Button(visible=False), gr.Markdown(visible=False)
status, options = get_audio_files(task_id)
if not options:
return status, gr.Dropdown(choices=[], visible=False), gr.Radio(visible=False), gr.Button(visible=False), gr.Markdown(visible=False)
return (
status,
gr.Dropdown(choices=options, value=options[0][1] if options else None, visible=True),
gr.Radio(visible=True),
gr.Button(visible=True),
gr.Markdown("Ready to separate!", visible=True)
)
# Step 2-3: Submit separation task
def on_submit(task_id, audio_id, sep_type, active_flag):
if not task_id or not audio_id:
return "❌ Missing Task ID or Audio ID", "", "⏳ Waiting...", gr.Slider(visible=False), gr.Button(visible=False), active_flag
status, separation_task_id = submit_separation_task(task_id, audio_id, sep_type)
if not separation_task_id:
return status, "", "⏳ Waiting...", gr.Slider(visible=False), gr.Button(visible=False), active_flag
# Start auto-polling
return (
status,
separation_task_id,
"⏳ Polling started... (attempt 0/60)",
gr.Slider(visible=True, value=0),
gr.Button(visible=True),
True # Set polling active
)
# Auto-polling update function
def update_auto_poll(separation_task_id, attempt, active_flag):
if not active_flag or not separation_task_id:
return "⏳ Polling stopped", attempt, "", False
attempt += 1
# Poll for status
status, results, error = poll_separation_status(separation_task_id)
if error:
return f"❌ Poll error: {error}", attempt, "", False
if status == "SUCCESS":
if results:
output = f"✅ **Separation Complete!**\n\n"
output += f"**Task ID:** `{separation_task_id}`\n\n"
output += format_download_links(results, separation_task_id)
return f"✅ Complete! (attempt {attempt})", attempt, output, False
else:
output = f"✅ **Processing Complete**\n\n"
output += f"**Task ID:** `{separation_task_id}`\n\n"
output += "Check callback results: "
output += f"[Viewer](https://1hit.no/viewer.php?task_id={separation_task_id})"
return f"✅ Complete! (attempt {attempt})", attempt, output, False
elif status in ["PENDING", "PROCESSING", "RUNNING"]:
status_text = f"⏳ **Status:** {status}\n\n"
status_text += f"**Task ID:** `{separation_task_id}`\n"
status_text += f"**Attempt:** {attempt}/60\n"
status_text += f"**Elapsed:** {attempt * 5} seconds\n\n"
status_text += "Polling every 5 seconds..."
if attempt >= 60:
status_text = f"⏰ **Timeout after 5 minutes**\n\n"
status_text += f"**Task ID:** `{separation_task_id}`\n"
status_text += "Check manually or wait for callback."
return status_text, attempt, "", False
return status_text, attempt, "", True
elif status == "FAILED":
status_text = f"❌ **Separation Failed**\n\n"
status_text += f"**Task ID:** `{separation_task_id}`\n"
status_text += f"**Status:** {status}"
return status_text, attempt, "", False
else:
status_text = f"🔄 **Status:** {status}\n\n"
status_text += f"**Task ID:** `{separation_task_id}`\n"
status_text += f"**Attempt:** {attempt}/60"
if attempt >= 60:
status_text = f"⏰ **Timeout after 5 minutes**\n\n"
status_text += f"**Task ID:** `{separation_task_id}`\n"
status_text += "Check manually or wait for callback."
return status_text, attempt, "", False
return status_text, attempt, "", True
# Stop polling function
def stop_polling(active_flag):
return "⏹️ Polling stopped", 0, "", False, gr.Button(visible=False)
# Connect events
get_audio_btn.click(
fn=on_get_audio,
inputs=[original_task_id],
outputs=[audio_status, audio_dropdown, separation_type, submit_btn, submission_output]
)
# Submit button starts polling
submit_btn.click(
fn=on_submit,
inputs=[original_task_id, audio_dropdown, separation_type, polling_active_flag],
outputs=[submission_output, current_separation_task_id, auto_poll_status, auto_poll_progress, stop_poll_btn, polling_active_flag]
)
# Create a separate function for polling that runs independently
def start_polling_thread(separation_task_id, attempt, active_flag):
"""Background polling thread"""
while active_flag and attempt < 60:
time.sleep(5)
attempt += 1
status, results, error = poll_separation_status(separation_task_id)
if status == "SUCCESS" or status == "FAILED" or error:
# Update UI via queue or state
break
return attempt
# Manual polling
poll_btn.click(
fn=manual_poll,
inputs=[poll_task_id],
outputs=[poll_output]
)
# Stop polling button
stop_poll_btn.click(
fn=stop_polling,
inputs=[polling_active_flag],
outputs=[auto_poll_status, auto_poll_progress, download_output, polling_active_flag, stop_poll_btn]
)
# Auto-fill poll field when separation task is submitted
def update_poll_field(separation_task_id):
if separation_task_id:
return separation_task_id
return ""
current_separation_task_id.change(
fn=update_poll_field,
inputs=[current_separation_task_id],
outputs=[poll_task_id]
)
# Simple polling timer (manual refresh)
def manual_refresh(separation_task_id, attempt, active_flag):
if active_flag and separation_task_id:
return update_auto_poll(separation_task_id, attempt, active_flag)
return "⏳ Waiting for task...", 0, "", active_flag
# Add a refresh button for manual polling updates
refresh_btn = gr.Button("🔄 Refresh Status", variant="secondary", visible=False)
# Show/hide refresh button based on polling state
def toggle_refresh_button(active_flag):
return gr.Button(visible=active_flag)
polling_active_flag.change(
fn=toggle_refresh_button,
inputs=[polling_active_flag],
outputs=[refresh_btn]
)
refresh_btn.click(
fn=manual_refresh,
inputs=[current_separation_task_id, auto_poll_progress, polling_active_flag],
outputs=[auto_poll_status, auto_poll_progress, download_output, polling_active_flag]
)
# Launch the app
if __name__ == "__main__":
print("🚀 Starting Suno Stem Separator")
print(f"🔑 SunoKey: {'✅ Set' if SUNO_KEY else '❌ Not set'}")
print("🌐 Open your browser to: http://localhost:7860")
# Launch with theme parameter in launch() method
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
title="Suno Stem Separator",
theme="soft"
)