Spaces:
Paused
Paused
| import gradio as gr | |
| import requests | |
| import os | |
| import time | |
| import json | |
| import uuid | |
| from datetime import datetime | |
| # Suno API key | |
| SUNO_KEY = os.environ.get("SunoKey", "") | |
| if not SUNO_KEY: | |
| print("⚠️ SunoKey not set!") | |
| # Store ongoing separation tasks (in-memory for demo) | |
| # In production, use a database or Redis | |
| separation_tasks = {} | |
| def get_audio_info(task_id): | |
| """Get audio information from Suno task ID""" | |
| if not SUNO_KEY: | |
| return "❌ Error: SunoKey not configured in environment variables", [] | |
| if not task_id.strip(): | |
| return "❌ Error: Please enter Task ID", [] | |
| try: | |
| resp = requests.get( | |
| "https://api.sunoapi.org/api/v1/generate/record-info", | |
| headers={"Authorization": f"Bearer {SUNO_KEY}"}, | |
| params={"taskId": task_id.strip()}, | |
| timeout=30 | |
| ) | |
| if resp.status_code != 200: | |
| return f"❌ Request failed: HTTP {resp.status_code}", [] | |
| data = resp.json() | |
| if data.get("code") != 200: | |
| return f"❌ API error: {data.get('msg', 'Unknown')}", [] | |
| # Check if task is complete | |
| status = data["data"].get("status", "PENDING") | |
| if status != "SUCCESS": | |
| return f"⏳ Task status: {status}. Please wait for generation to complete.", [] | |
| # Parse sunoData to get audio options | |
| suno_data = data["data"]["response"].get("sunoData", []) | |
| if not suno_data: | |
| return "❌ No audio data found in response", [] | |
| # Create audio options for dropdown | |
| audio_options = [] | |
| for i, audio in enumerate(suno_data): | |
| audio_id = audio.get("id", "") | |
| prompt = audio.get("prompt", "No prompt") | |
| title = audio.get("title", "Untitled") | |
| duration = audio.get("duration", 0) | |
| # Truncate long prompts for display | |
| display_prompt = prompt[:50] + "..." if len(prompt) > 50 else prompt | |
| # Create display text and store full data | |
| display_text = f"Track {i+1}: {title} ({duration:.1f}s) - {display_prompt}" | |
| audio_options.append((display_text, audio_id, audio)) | |
| if not audio_options: | |
| return "❌ No valid audio tracks found", [] | |
| # Format output message | |
| output = f"✅ **Task Found!**\n" | |
| output += f"**Task ID:** `{task_id}`\n" | |
| output += f"**Status:** {status}\n" | |
| output += f"**Found {len(audio_options)} audio track(s):**\n\n" | |
| for i, (display_text, audio_id, audio) in enumerate(audio_options): | |
| output += f"**Track {i+1}:**\n" | |
| output += f"- **ID:** `{audio_id}`\n" | |
| output += f"- **Title:** {audio.get('title', 'Untitled')}\n" | |
| output += f"- **Prompt:** {audio.get('prompt', 'No prompt')[:100]}...\n" | |
| output += f"- **Duration:** {audio.get('duration', 0):.1f}s\n" | |
| if audio.get('audioUrl'): | |
| output += f"- **Audio:** [Listen]({audio.get('audioUrl')})\n" | |
| output += "\n" | |
| output += "👇 **Select a track from the dropdown below to separate stems**" | |
| return output, audio_options | |
| except Exception as e: | |
| return f"❌ Error: {str(e)}", [] | |
| def submit_separation_task(task_id, audio_id, separation_type): | |
| """Submit stem separation task and return task ID""" | |
| try: | |
| # Generate a unique callback URL for this request | |
| callback_id = str(uuid.uuid4()) | |
| callback_url = f"https://1hit.no/callback.php?callback_id={callback_id}" | |
| resp = requests.post( | |
| "https://api.sunoapi.org/api/v1/vocal-removal/generate", | |
| json={ | |
| "taskId": task_id, | |
| "audioId": audio_id, | |
| "type": separation_type, | |
| "callBackUrl": callback_url # Using your callback endpoint | |
| }, | |
| headers={ | |
| "Authorization": f"Bearer {SUNO_KEY}", | |
| "Content-Type": "application/json" | |
| }, | |
| timeout=30 | |
| ) | |
| print(f"Separation request response: {resp.status_code}") | |
| print(f"Response text: {resp.text}") | |
| if resp.status_code != 200: | |
| return None, f"❌ Submission failed: HTTP {resp.status_code}" | |
| data = resp.json() | |
| print(f"Parsed response: {data}") | |
| # Check for different response formats | |
| if data.get("code") == 200: | |
| # New format with data object | |
| separation_task_id = data.get("data", {}).get("taskId") | |
| music_id = data.get("data", {}).get("musicId") | |
| if separation_task_id: | |
| # Store task info for polling | |
| separation_tasks[separation_task_id] = { | |
| "task_id": task_id, | |
| "audio_id": audio_id, | |
| "separation_type": separation_type, | |
| "submitted_at": datetime.now().isoformat(), | |
| "callback_id": callback_id, | |
| "music_id": music_id | |
| } | |
| return separation_task_id, None | |
| else: | |
| return None, "❌ No taskId in response" | |
| # Try alternative response format | |
| elif "taskId" in data: | |
| separation_task_id = data.get("taskId") | |
| music_id = data.get("musicId") | |
| separation_tasks[separation_task_id] = { | |
| "task_id": task_id, | |
| "audio_id": audio_id, | |
| "separation_type": separation_type, | |
| "submitted_at": datetime.now().isoformat(), | |
| "callback_id": callback_id, | |
| "music_id": music_id | |
| } | |
| return separation_task_id, None | |
| else: | |
| error_msg = data.get("msg", data.get("error", "Unknown error")) | |
| return None, f"❌ API error: {error_msg}" | |
| except Exception as e: | |
| return None, f"❌ Error submitting task: {str(e)}" | |
| def poll_separation_result(separation_task_id): | |
| """Poll for separation results using the vocal-removal/record-info endpoint""" | |
| try: | |
| check = requests.get( | |
| "https://api.sunoapi.org/api/v1/vocal-removal/record-info", | |
| headers={"Authorization": f"Bearer {SUNO_KEY}"}, | |
| params={"taskId": separation_task_id}, | |
| timeout=30 | |
| ) | |
| if check.status_code == 200: | |
| data = check.json() | |
| print(f"Polling response: {data}") | |
| if data.get("code") == 200: | |
| status = data.get("data", {}).get("status", "PENDING") | |
| return status, data.get("data", {}) | |
| else: | |
| return "ERROR", {"error": data.get("msg", "Unknown error")} | |
| else: | |
| return "ERROR", {"error": f"HTTP {check.status_code}"} | |
| except Exception as e: | |
| return "ERROR", {"error": str(e)} | |
| def separate_vocals(task_id, audio_id, separation_type): | |
| """Separate vocals and instruments from Suno tracks""" | |
| if not SUNO_KEY: | |
| yield "❌ Error: SunoKey not configured in environment variables" | |
| return | |
| if not task_id.strip() or not audio_id.strip(): | |
| yield "❌ Error: Please enter both Task ID and Audio ID" | |
| return | |
| # Validate separation type | |
| if separation_type not in ["separate_vocal", "split_stem"]: | |
| yield "❌ Error: Invalid separation type" | |
| return | |
| # Step 1: Submit separation task | |
| yield "⏳ **Submitting separation request...**\n" | |
| separation_task_id, error = submit_separation_task(task_id, audio_id, separation_type) | |
| if error: | |
| yield error | |
| return | |
| if not separation_task_id: | |
| yield "❌ Failed to get separation task ID" | |
| return | |
| # Store the separation task ID | |
| separation_task_info = separation_tasks.get(separation_task_id, {}) | |
| yield f"✅ **Separation Task Submitted!**\n\n" | |
| yield f"**Separation Task ID:** `{separation_task_id}`\n" | |
| if separation_task_info.get("music_id"): | |
| yield f"**Music ID:** `{separation_task_info.get('music_id')}`\n" | |
| yield f"**Callback URL:** `{separation_task_info.get('callback_url', 'https://1hit.no/callback.php')}`\n\n" | |
| yield "⏳ **Processing separation (this may take 1-3 minutes)...**\n" | |
| yield "_Polling API for results..._\n" | |
| # Step 2: Poll for results | |
| for attempt in range(60): # 60 attempts * 5 seconds = 5 minutes max | |
| time.sleep(5) | |
| status, result_data = poll_separation_result(separation_task_id) | |
| if status == "SUCCESS": | |
| # Parse successful result | |
| separation_info = result_data.get("vocal_removal_info", {}) | |
| if not separation_info: | |
| # Try alternative field names | |
| separation_info = result_data | |
| if not separation_info: | |
| yield "✅ Separation completed but no download links found in response\n" | |
| yield f"Check your callback endpoint: {separation_task_info.get('callback_url')}" | |
| return | |
| # Format output based on separation type | |
| output = "🎵 **Separation Complete!** 🎵\n\n" | |
| output += f"**Separation Task ID:** `{separation_task_id}`\n" | |
| output += f"**Original Task ID:** `{task_id}`\n" | |
| output += f"**Audio ID:** `{audio_id}`\n\n" | |
| if separation_type == "separate_vocal": | |
| output += "## 🎤 2-Stem Separation Results\n\n" | |
| # Try different possible field names | |
| vocal_url = (separation_info.get('vocal_url') or | |
| separation_info.get('vocal') or | |
| separation_info.get('vocals_url')) | |
| instrumental_url = (separation_info.get('instrumental_url') or | |
| separation_info.get('instrumental') or | |
| separation_info.get('accompaniment_url')) | |
| origin_url = separation_info.get('origin_url') or separation_info.get('original_url') | |
| if vocal_url: | |
| output += f"**🎤 Vocals:** [Download MP3]({vocal_url})\n" | |
| if instrumental_url: | |
| output += f"**🎵 Instrumental:** [Download MP3]({instrumental_url})\n" | |
| if origin_url: | |
| output += f"**📁 Original:** [Download MP3]({origin_url})\n" | |
| elif separation_type == "split_stem": | |
| output += "## 🎛️ 12-Stem Separation Results\n\n" | |
| # Map of possible stem field names | |
| stem_fields = { | |
| "Vocals": ["vocal_url", "vocals", "vocal", "vocals_url"], | |
| "Backing Vocals": ["backing_vocals_url", "backing_vocals", "backing"], | |
| "Drums": ["drums_url", "drums", "drum"], | |
| "Bass": ["bass_url", "bass"], | |
| "Guitar": ["guitar_url", "guitar"], | |
| "Keyboard": ["keyboard_url", "keyboard", "piano"], | |
| "Strings": ["strings_url", "strings"], | |
| "Brass": ["brass_url", "brass"], | |
| "Woodwinds": ["woodwinds_url", "woodwinds"], | |
| "Percussion": ["percussion_url", "percussion"], | |
| "Synth": ["synth_url", "synth", "synthesizer"], | |
| "FX/Other": ["fx_url", "fx", "other", "effects"], | |
| "Instrumental": ["instrumental_url", "instrumental", "accompaniment_url"], | |
| "Original": ["origin_url", "original_url", "original"] | |
| } | |
| found_stems = 0 | |
| for stem_name, possible_fields in stem_fields.items(): | |
| url = None | |
| for field in possible_fields: | |
| if field in separation_info and separation_info[field]: | |
| url = separation_info[field] | |
| break | |
| if url: | |
| found_stems += 1 | |
| output += f"**{stem_name}:** [Download MP3]({url})\n" | |
| if found_stems == 0: | |
| # If no stems found, show raw data for debugging | |
| output += "⚠️ **No stem URLs found in response. Raw data:**\n" | |
| output += f"```json\n{json.dumps(separation_info, indent=2)}\n```\n" | |
| output += f"\n⏱️ **Processing time:** {(attempt + 1) * 5} seconds\n" | |
| output += "⚠️ **Note:** Download links may expire after some time\n" | |
| output += f"📋 **Callback ID:** `{separation_task_info.get('callback_id', 'N/A')}`\n" | |
| yield output | |
| return | |
| elif status == "FAILED": | |
| error_msg = result_data.get("error", result_data.get("errorMessage", "Unknown error")) | |
| yield f"❌ **Separation failed:** {error_msg}\n" | |
| yield f"**Task ID:** `{separation_task_id}`\n" | |
| return | |
| elif status == "ERROR": | |
| error_msg = result_data.get("error", "Unknown error") | |
| yield f"⚠️ **Polling error:** {error_msg}\n" | |
| yield f"**Task ID:** `{separation_task_id}`\n" | |
| # Continue polling despite error | |
| else: | |
| # PENDING or PROCESSING | |
| yield (f"⏳ **Status:** {status}\n" | |
| f"**Attempt:** {attempt + 1}/60\n" | |
| f"**Task ID:** `{separation_task_id}`\n\n" | |
| f"_Still processing... (usually takes 1-3 minutes)_\n") | |
| yield "⏰ **Timeout after 5 minutes.**\n" | |
| yield f"The separation task is still processing.\n" | |
| yield f"**Task ID:** `{separation_task_id}`\n" | |
| yield f"**Music ID:** `{separation_task_info.get('music_id', 'N/A')}`\n" | |
| yield "You can check the status later using this Task ID.\n" | |
| yield f"Or check your callback endpoint: {separation_task_info.get('callback_url')}" | |
| # Create the app | |
| with gr.Blocks(title="Suno Stem Separator", theme="soft") as app: | |
| gr.Markdown("# 🎵 Suno Stem Separator") | |
| gr.Markdown("Separate Suno AI tracks into vocal and instrument stems") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # Step 1: Enter Task ID | |
| task_id_input = gr.Textbox( | |
| label="Original Task ID", | |
| placeholder="Example: 5c79****be8e", | |
| info="Enter your Suno generation task ID", | |
| elem_id="task_id_input" | |
| ) | |
| get_audio_btn = gr.Button("🔍 Find Audio Tracks", variant="secondary") | |
| # Will be updated after getting audio info | |
| audio_dropdown = gr.Dropdown( | |
| label="Select Audio Track to Separate", | |
| choices=[], | |
| info="Choose which audio track to process", | |
| visible=False, | |
| interactive=True | |
| ) | |
| separation_type = gr.Radio( | |
| label="Separation Type", | |
| choices=[ | |
| ("separate_vocal (2 stems - 1 credit)", "separate_vocal"), | |
| ("split_stem (12 stems - 5 credits)", "split_stem") | |
| ], | |
| value="separate_vocal", | |
| info="Choose separation mode", | |
| visible=False | |
| ) | |
| separate_btn = gr.Button("🎵 Separate Stems", variant="primary", visible=False) | |
| gr.Markdown(""" | |
| ### 📋 Workflow: | |
| 1. **Enter Task ID** → Click "Find Audio Tracks" | |
| 2. **Select audio track** from dropdown | |
| 3. **Choose separation type** | |
| 4. **Click Separate Stems** | |
| 5. **Wait 1-3 minutes** for processing | |
| 6. **Get download links** for each stem | |
| ### 💡 Tips: | |
| - Task IDs are from your Suno generation history | |
| - Each separation consumes API credits | |
| - Links may expire after some time | |
| - Processing time varies based on audio length | |
| ### 🔍 Check Status: | |
| You'll receive a Separation Task ID that you can use to: | |
| - Check status via API | |
| - Receive callback on your endpoint | |
| - Retry if needed | |
| """) | |
| with gr.Column(scale=2): | |
| status_output = gr.Markdown( | |
| label="Status", | |
| value="### 👋 Welcome!\nEnter a Task ID above to get started..." | |
| ) | |
| results_output = gr.Markdown( | |
| label="Separation Results", | |
| visible=False | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown( | |
| """ | |
| <div style="text-align: center; padding: 20px;"> | |
| <p>Powered by <a href="https://suno.ai" target="_blank">Suno AI</a> • | |
| <a href="https://sunoapi.org" target="_blank">Suno API Docs</a> • | |
| <a href="https://docs.sunoapi.org/separate-vocals" target="_blank">Stem Separation Guide</a></p> | |
| <p><small>Track separation powered by Suno API. Processing may take 1-3 minutes.</small></p> | |
| </div> | |
| """, | |
| elem_id="footer" | |
| ) | |
| # Step 1: Get audio info when button is clicked | |
| def on_get_audio(task_id): | |
| if not task_id: | |
| return ( | |
| "❌ **Please enter a Task ID**", | |
| gr.Dropdown(choices=[], visible=False), | |
| gr.Radio(visible=False), | |
| gr.Button(visible=False), | |
| gr.Markdown(visible=False) | |
| ) | |
| output, audio_options = get_audio_info(task_id) | |
| if not audio_options: | |
| # No audio found, show error | |
| return ( | |
| output, | |
| gr.Dropdown(choices=[], visible=False), | |
| gr.Radio(visible=False), | |
| gr.Button(visible=False), | |
| gr.Markdown(visible=False) | |
| ) | |
| # Create choices for dropdown | |
| choices = [(display_text, audio_id) for display_text, audio_id, _ in audio_options] | |
| return ( | |
| output, | |
| gr.Dropdown(choices=choices, value=choices[0][1] if choices else None, visible=True), | |
| gr.Radio(visible=True), | |
| gr.Button(visible=True), | |
| gr.Markdown(visible=False) | |
| ) | |
| # Step 2: Separate stems when button is clicked | |
| def on_separate_click(task_id, audio_id, separation_type): | |
| # Clear previous results | |
| yield gr.Markdown(value="", visible=False) | |
| # Start separation process | |
| for update in separate_vocals(task_id, audio_id, separation_type): | |
| yield gr.Markdown(value=update, visible=True) | |
| # Connect events | |
| get_audio_btn.click( | |
| fn=on_get_audio, | |
| inputs=[task_id_input], | |
| outputs=[status_output, audio_dropdown, separation_type, separate_btn, results_output] | |
| ) | |
| separate_btn.click( | |
| fn=on_separate_click, | |
| inputs=[task_id_input, audio_dropdown, separation_type], | |
| outputs=[results_output] | |
| ) | |
| if __name__ == "__main__": | |
| print("🚀 Starting Suno Stem Separator") | |
| print(f"🔑 SunoKey: {'✅ Set' if SUNO_KEY else '❌ Not set'}") | |
| print("🌐 Open your browser to: http://localhost:7860") | |
| app.launch(server_name="0.0.0.0", server_port=7860, share=False) |