SUNO-API-2 / app.py
MySafeCode's picture
Update app.py
cc45c3f verified
raw
history blame
19.9 kB
import gradio as gr
import requests
import os
import time
import json
import uuid
from datetime import datetime
# Suno API key
SUNO_KEY = os.environ.get("SunoKey", "")
if not SUNO_KEY:
print("⚠️ SunoKey not set!")
# Store ongoing separation tasks (in-memory for demo)
# In production, use a database or Redis
separation_tasks = {}
def get_audio_info(task_id):
"""Get audio information from Suno task ID"""
if not SUNO_KEY:
return "❌ Error: SunoKey not configured in environment variables", []
if not task_id.strip():
return "❌ Error: Please enter Task ID", []
try:
resp = requests.get(
"https://api.sunoapi.org/api/v1/generate/record-info",
headers={"Authorization": f"Bearer {SUNO_KEY}"},
params={"taskId": task_id.strip()},
timeout=30
)
if resp.status_code != 200:
return f"❌ Request failed: HTTP {resp.status_code}", []
data = resp.json()
if data.get("code") != 200:
return f"❌ API error: {data.get('msg', 'Unknown')}", []
# Check if task is complete
status = data["data"].get("status", "PENDING")
if status != "SUCCESS":
return f"⏳ Task status: {status}. Please wait for generation to complete.", []
# Parse sunoData to get audio options
suno_data = data["data"]["response"].get("sunoData", [])
if not suno_data:
return "❌ No audio data found in response", []
# Create audio options for dropdown
audio_options = []
for i, audio in enumerate(suno_data):
audio_id = audio.get("id", "")
prompt = audio.get("prompt", "No prompt")
title = audio.get("title", "Untitled")
duration = audio.get("duration", 0)
# Truncate long prompts for display
display_prompt = prompt[:50] + "..." if len(prompt) > 50 else prompt
# Create display text and store full data
display_text = f"Track {i+1}: {title} ({duration:.1f}s) - {display_prompt}"
audio_options.append((display_text, audio_id, audio))
if not audio_options:
return "❌ No valid audio tracks found", []
# Format output message
output = f"✅ **Task Found!**\n"
output += f"**Task ID:** `{task_id}`\n"
output += f"**Status:** {status}\n"
output += f"**Found {len(audio_options)} audio track(s):**\n\n"
for i, (display_text, audio_id, audio) in enumerate(audio_options):
output += f"**Track {i+1}:**\n"
output += f"- **ID:** `{audio_id}`\n"
output += f"- **Title:** {audio.get('title', 'Untitled')}\n"
output += f"- **Prompt:** {audio.get('prompt', 'No prompt')[:100]}...\n"
output += f"- **Duration:** {audio.get('duration', 0):.1f}s\n"
if audio.get('audioUrl'):
output += f"- **Audio:** [Listen]({audio.get('audioUrl')})\n"
output += "\n"
output += "👇 **Select a track from the dropdown below to separate stems**"
return output, audio_options
except Exception as e:
return f"❌ Error: {str(e)}", []
def submit_separation_task(task_id, audio_id, separation_type):
"""Submit stem separation task and return task ID"""
try:
# Generate a unique callback URL for this request
callback_id = str(uuid.uuid4())
callback_url = f"https://1hit.no/callback.php?callback_id={callback_id}"
resp = requests.post(
"https://api.sunoapi.org/api/v1/vocal-removal/generate",
json={
"taskId": task_id,
"audioId": audio_id,
"type": separation_type,
"callBackUrl": callback_url # Using your callback endpoint
},
headers={
"Authorization": f"Bearer {SUNO_KEY}",
"Content-Type": "application/json"
},
timeout=30
)
print(f"Separation request response: {resp.status_code}")
print(f"Response text: {resp.text}")
if resp.status_code != 200:
return None, f"❌ Submission failed: HTTP {resp.status_code}"
data = resp.json()
print(f"Parsed response: {data}")
# Check for different response formats
if data.get("code") == 200:
# New format with data object
separation_task_id = data.get("data", {}).get("taskId")
music_id = data.get("data", {}).get("musicId")
if separation_task_id:
# Store task info for polling
separation_tasks[separation_task_id] = {
"task_id": task_id,
"audio_id": audio_id,
"separation_type": separation_type,
"submitted_at": datetime.now().isoformat(),
"callback_id": callback_id,
"music_id": music_id
}
return separation_task_id, None
else:
return None, "❌ No taskId in response"
# Try alternative response format
elif "taskId" in data:
separation_task_id = data.get("taskId")
music_id = data.get("musicId")
separation_tasks[separation_task_id] = {
"task_id": task_id,
"audio_id": audio_id,
"separation_type": separation_type,
"submitted_at": datetime.now().isoformat(),
"callback_id": callback_id,
"music_id": music_id
}
return separation_task_id, None
else:
error_msg = data.get("msg", data.get("error", "Unknown error"))
return None, f"❌ API error: {error_msg}"
except Exception as e:
return None, f"❌ Error submitting task: {str(e)}"
def poll_separation_result(separation_task_id):
"""Poll for separation results using the vocal-removal/record-info endpoint"""
try:
check = requests.get(
"https://api.sunoapi.org/api/v1/vocal-removal/record-info",
headers={"Authorization": f"Bearer {SUNO_KEY}"},
params={"taskId": separation_task_id},
timeout=30
)
if check.status_code == 200:
data = check.json()
print(f"Polling response: {data}")
if data.get("code") == 200:
status = data.get("data", {}).get("status", "PENDING")
return status, data.get("data", {})
else:
return "ERROR", {"error": data.get("msg", "Unknown error")}
else:
return "ERROR", {"error": f"HTTP {check.status_code}"}
except Exception as e:
return "ERROR", {"error": str(e)}
def separate_vocals(task_id, audio_id, separation_type):
"""Separate vocals and instruments from Suno tracks"""
if not SUNO_KEY:
yield "❌ Error: SunoKey not configured in environment variables"
return
if not task_id.strip() or not audio_id.strip():
yield "❌ Error: Please enter both Task ID and Audio ID"
return
# Validate separation type
if separation_type not in ["separate_vocal", "split_stem"]:
yield "❌ Error: Invalid separation type"
return
# Step 1: Submit separation task
yield "⏳ **Submitting separation request...**\n"
separation_task_id, error = submit_separation_task(task_id, audio_id, separation_type)
if error:
yield error
return
if not separation_task_id:
yield "❌ Failed to get separation task ID"
return
# Store the separation task ID
separation_task_info = separation_tasks.get(separation_task_id, {})
yield f"✅ **Separation Task Submitted!**\n\n"
yield f"**Separation Task ID:** `{separation_task_id}`\n"
if separation_task_info.get("music_id"):
yield f"**Music ID:** `{separation_task_info.get('music_id')}`\n"
yield f"**Callback URL:** `{separation_task_info.get('callback_url', 'https://1hit.no/callback.php')}`\n\n"
yield "⏳ **Processing separation (this may take 1-3 minutes)...**\n"
yield "_Polling API for results..._\n"
# Step 2: Poll for results
for attempt in range(60): # 60 attempts * 5 seconds = 5 minutes max
time.sleep(5)
status, result_data = poll_separation_result(separation_task_id)
if status == "SUCCESS":
# Parse successful result
separation_info = result_data.get("vocal_removal_info", {})
if not separation_info:
# Try alternative field names
separation_info = result_data
if not separation_info:
yield "✅ Separation completed but no download links found in response\n"
yield f"Check your callback endpoint: {separation_task_info.get('callback_url')}"
return
# Format output based on separation type
output = "🎵 **Separation Complete!** 🎵\n\n"
output += f"**Separation Task ID:** `{separation_task_id}`\n"
output += f"**Original Task ID:** `{task_id}`\n"
output += f"**Audio ID:** `{audio_id}`\n\n"
if separation_type == "separate_vocal":
output += "## 🎤 2-Stem Separation Results\n\n"
# Try different possible field names
vocal_url = (separation_info.get('vocal_url') or
separation_info.get('vocal') or
separation_info.get('vocals_url'))
instrumental_url = (separation_info.get('instrumental_url') or
separation_info.get('instrumental') or
separation_info.get('accompaniment_url'))
origin_url = separation_info.get('origin_url') or separation_info.get('original_url')
if vocal_url:
output += f"**🎤 Vocals:** [Download MP3]({vocal_url})\n"
if instrumental_url:
output += f"**🎵 Instrumental:** [Download MP3]({instrumental_url})\n"
if origin_url:
output += f"**📁 Original:** [Download MP3]({origin_url})\n"
elif separation_type == "split_stem":
output += "## 🎛️ 12-Stem Separation Results\n\n"
# Map of possible stem field names
stem_fields = {
"Vocals": ["vocal_url", "vocals", "vocal", "vocals_url"],
"Backing Vocals": ["backing_vocals_url", "backing_vocals", "backing"],
"Drums": ["drums_url", "drums", "drum"],
"Bass": ["bass_url", "bass"],
"Guitar": ["guitar_url", "guitar"],
"Keyboard": ["keyboard_url", "keyboard", "piano"],
"Strings": ["strings_url", "strings"],
"Brass": ["brass_url", "brass"],
"Woodwinds": ["woodwinds_url", "woodwinds"],
"Percussion": ["percussion_url", "percussion"],
"Synth": ["synth_url", "synth", "synthesizer"],
"FX/Other": ["fx_url", "fx", "other", "effects"],
"Instrumental": ["instrumental_url", "instrumental", "accompaniment_url"],
"Original": ["origin_url", "original_url", "original"]
}
found_stems = 0
for stem_name, possible_fields in stem_fields.items():
url = None
for field in possible_fields:
if field in separation_info and separation_info[field]:
url = separation_info[field]
break
if url:
found_stems += 1
output += f"**{stem_name}:** [Download MP3]({url})\n"
if found_stems == 0:
# If no stems found, show raw data for debugging
output += "⚠️ **No stem URLs found in response. Raw data:**\n"
output += f"```json\n{json.dumps(separation_info, indent=2)}\n```\n"
output += f"\n⏱️ **Processing time:** {(attempt + 1) * 5} seconds\n"
output += "⚠️ **Note:** Download links may expire after some time\n"
output += f"📋 **Callback ID:** `{separation_task_info.get('callback_id', 'N/A')}`\n"
yield output
return
elif status == "FAILED":
error_msg = result_data.get("error", result_data.get("errorMessage", "Unknown error"))
yield f"❌ **Separation failed:** {error_msg}\n"
yield f"**Task ID:** `{separation_task_id}`\n"
return
elif status == "ERROR":
error_msg = result_data.get("error", "Unknown error")
yield f"⚠️ **Polling error:** {error_msg}\n"
yield f"**Task ID:** `{separation_task_id}`\n"
# Continue polling despite error
else:
# PENDING or PROCESSING
yield (f"⏳ **Status:** {status}\n"
f"**Attempt:** {attempt + 1}/60\n"
f"**Task ID:** `{separation_task_id}`\n\n"
f"_Still processing... (usually takes 1-3 minutes)_\n")
yield "⏰ **Timeout after 5 minutes.**\n"
yield f"The separation task is still processing.\n"
yield f"**Task ID:** `{separation_task_id}`\n"
yield f"**Music ID:** `{separation_task_info.get('music_id', 'N/A')}`\n"
yield "You can check the status later using this Task ID.\n"
yield f"Or check your callback endpoint: {separation_task_info.get('callback_url')}"
# Create the app
with gr.Blocks(title="Suno Stem Separator", theme="soft") as app:
gr.Markdown("# 🎵 Suno Stem Separator")
gr.Markdown("Separate Suno AI tracks into vocal and instrument stems")
with gr.Row():
with gr.Column(scale=1):
# Step 1: Enter Task ID
task_id_input = gr.Textbox(
label="Original Task ID",
placeholder="Example: 5c79****be8e",
info="Enter your Suno generation task ID",
elem_id="task_id_input"
)
get_audio_btn = gr.Button("🔍 Find Audio Tracks", variant="secondary")
# Will be updated after getting audio info
audio_dropdown = gr.Dropdown(
label="Select Audio Track to Separate",
choices=[],
info="Choose which audio track to process",
visible=False,
interactive=True
)
separation_type = gr.Radio(
label="Separation Type",
choices=[
("separate_vocal (2 stems - 1 credit)", "separate_vocal"),
("split_stem (12 stems - 5 credits)", "split_stem")
],
value="separate_vocal",
info="Choose separation mode",
visible=False
)
separate_btn = gr.Button("🎵 Separate Stems", variant="primary", visible=False)
gr.Markdown("""
### 📋 Workflow:
1. **Enter Task ID** → Click "Find Audio Tracks"
2. **Select audio track** from dropdown
3. **Choose separation type**
4. **Click Separate Stems**
5. **Wait 1-3 minutes** for processing
6. **Get download links** for each stem
### 💡 Tips:
- Task IDs are from your Suno generation history
- Each separation consumes API credits
- Links may expire after some time
- Processing time varies based on audio length
### 🔍 Check Status:
You'll receive a Separation Task ID that you can use to:
- Check status via API
- Receive callback on your endpoint
- Retry if needed
""")
with gr.Column(scale=2):
status_output = gr.Markdown(
label="Status",
value="### 👋 Welcome!\nEnter a Task ID above to get started..."
)
results_output = gr.Markdown(
label="Separation Results",
visible=False
)
gr.Markdown("---")
gr.Markdown(
"""
<div style="text-align: center; padding: 20px;">
<p>Powered by <a href="https://suno.ai" target="_blank">Suno AI</a> •
<a href="https://sunoapi.org" target="_blank">Suno API Docs</a> •
<a href="https://docs.sunoapi.org/separate-vocals" target="_blank">Stem Separation Guide</a></p>
<p><small>Track separation powered by Suno API. Processing may take 1-3 minutes.</small></p>
</div>
""",
elem_id="footer"
)
# Step 1: Get audio info when button is clicked
def on_get_audio(task_id):
if not task_id:
return (
"❌ **Please enter a Task ID**",
gr.Dropdown(choices=[], visible=False),
gr.Radio(visible=False),
gr.Button(visible=False),
gr.Markdown(visible=False)
)
output, audio_options = get_audio_info(task_id)
if not audio_options:
# No audio found, show error
return (
output,
gr.Dropdown(choices=[], visible=False),
gr.Radio(visible=False),
gr.Button(visible=False),
gr.Markdown(visible=False)
)
# Create choices for dropdown
choices = [(display_text, audio_id) for display_text, audio_id, _ in audio_options]
return (
output,
gr.Dropdown(choices=choices, value=choices[0][1] if choices else None, visible=True),
gr.Radio(visible=True),
gr.Button(visible=True),
gr.Markdown(visible=False)
)
# Step 2: Separate stems when button is clicked
def on_separate_click(task_id, audio_id, separation_type):
# Clear previous results
yield gr.Markdown(value="", visible=False)
# Start separation process
for update in separate_vocals(task_id, audio_id, separation_type):
yield gr.Markdown(value=update, visible=True)
# Connect events
get_audio_btn.click(
fn=on_get_audio,
inputs=[task_id_input],
outputs=[status_output, audio_dropdown, separation_type, separate_btn, results_output]
)
separate_btn.click(
fn=on_separate_click,
inputs=[task_id_input, audio_dropdown, separation_type],
outputs=[results_output]
)
if __name__ == "__main__":
print("🚀 Starting Suno Stem Separator")
print(f"🔑 SunoKey: {'✅ Set' if SUNO_KEY else '❌ Not set'}")
print("🌐 Open your browser to: http://localhost:7860")
app.launch(server_name="0.0.0.0", server_port=7860, share=False)